BlockingQueue详解--一文让你彻底明白阻塞队列
下文笔者讲述BlockingQueue的相关说明
检索元素
BlockingQueue简介
BlockingQueue是一个接口,它位于java.util.concurrent包下面
BlockingQueue用于处理并发生产者和消费者的问题
BlockingQueue是线程安全的类
无论何时只有一个线程对其进行take或put操作
BlockingQueue还提供超时机制
Queue队列类型
一、无限队列(unbounded queue):可无限增长 二、有限队列(bounded queue):定义最大容量
队列数据结构
队列实质是一种存储数据的结构 可使用链表或数组实现 队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列 队列主要操作分为:入队(EnQueue)与出队(Dequeue)
常见的4种阻塞队列
ArrayBlockingQueue:
由数组支持的有界队列
LinkedBlockingQueue:
由链接节点支持的可选有界队列
PriorityBlockingQueue:
由优先级堆支持的无界优先级队列
DelayQueue:
由优先级堆支持的、基于时间的调度队列
ArrayBlockingQueue
队列基于数组实现 容量大小在创建ArrayBlockingQueue对象时已定义好数据结构
队列创建
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
应用场景
在线程池中有比较多的应用 生产者消费者场景 队列工作原理 基于ReentrantLock保证线程安全 根据Condition实现队列满时的阻塞
LinkedBlockingQueue
是一个基于链表的无界队列(理论上有界)
//创建队列 BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); blockingQueue 的容量将设置为 Integer.MAX_VALUE 。 向无限队列添加元素的所有操作都将永远不会阻塞[注意这里不是说不会加锁保证线程安全]因此它可以增长到非常大的容量。 无限队列需注意,消费者应该像添加消息一样快速消费消息,否则内存会撑爆,出现OutOfMemory
DelayQueue
由优先级堆支持的、 基于时间的调度队列 内部基于无界队列PriorityQueue实现 而无界队列基于数组的扩容实现。 队列创建: BlockingQueue<String> blockingQueue = new DelayQueue(); 入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口 应用场景: 电影票 工作原理: 队列内部会根据时间优先级进行排序。延迟类线程池周期执行。
BlockingQueue API
BlockingQueue 接口的所有方法可以分为两大类 负责向队列添加元素的方法和检索这些元素的方法 在队列满/空的情况下 来自这两个组的每个方法的行为都不同。添加元素
| 方法 | 备注 |
| add() | 如果插入成功则返回 true 否则抛出 IllegalStateException 异常 |
| put() | 将指定的元素插入队列 当队列满了 那么会阻塞直到有空间插入 |
| offer() | 如果插入成功则返回 true 否则返回false |
| offer(E e, long timeout, TimeUnit unit) | 尝试将元素插入队列 当队列已满 则会阻塞直到有空间插入 |
| 方法 | 备注 |
| take() | 获取队列的头部元素并将其删除 当队列为空 则阻塞并等待元素变为可用 |
| poll(long timeout, TimeUnit unit) | 检索并删除队列的头部 如有必要 等待指定的等待时间以使元素可用 如果超时,则返回 null |
多线程生产者-消费者示例
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue中 将创建 16 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。 从生产者向消费者发出信号的好方法生产者代码
@Slf4j
public class NumbersProducer implements Runnable {
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumbersProducer(BlockingQueue<Integer> numbersQueue,
int poisonPill,
int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
}
@Override
public void run() {
try {
generateNumbers ();
} catch (InterruptedException e) {
Thread.currentThread ().interrupt ();
}
}
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put (ThreadLocalRandom.current ().nextInt (100));
}
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put (poisonPill);
}
}
}
消费者代码
@Slf4j
public class NumbersConsumer implements Runnable {
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
}
@Override
public void run() {
try {
while (true) {
Integer number = queue.take ();
if (number.equals (poisonPill)) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread ().interrupt ();
}
}
}
例:创建4个生产者,N个消费者
public class Main {
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime ().availableProcessors ();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<> (BOUND);
for (int i = 1; i < N_PRODUCERS; i++) {
new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer)).start ();
}
for (int j = 0; j < N_CONSUMERS; j++) {
new Thread (new NumbersConsumer (queue, poisonPill)).start ();
}
new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer + mod)).start ();
}
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


