更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》
阻塞队列 Blocking Queue
- 当队列空时,获取元素的线程会等待
- 当队列满时,存储元素的线程会等待
提供的方法:
-
插入元素:
- add(e):抛出异常
- offer(e):返回特殊值
- put(e):一直阻塞
- offer(e,time,unit):超时退出
-
移除元素:
- remove():抛出异常
- poll():返回特殊值
- take():一直阻塞
- poll(time,unit):超时退出
JDK 7 提供了 7 个阻塞队列
-
ArrayBlockingQueue :一个由数组结构组成的 有界 阻塞队列。
- 此队列按照先进先出(FIFO)的原则对元素进行排序。
- 默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
-
LinkedBlockingQueue :一个由链表结构组成的 有界 阻塞队列。
- 此队列按照先进先出(FIFO)的原则对元素进行排序。
-
PriorityBlockingQueue :一个支持优先级排序的 无界 阻塞队列。
- 默认情况下元素采取自然顺序排列,也可以通过比较器 comparator 来指定元素的排序规则。元素按照升序排列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接。
关于 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式,可以参考 链接。
利用阻塞队列实现生产者消费者模式,代码如下:
public class BlockingQueue_Test {
private static final int MAX_CAPACITY = 10;
private static ArrayBlockingQueue<Object> goods = new ArrayBlockingQueue<Object>(MAX_CAPACITY);
public static void main(String[] args) {
(new ProducerThread()).start();
(new ConsumerThread()).start();
}
static class ProducerThread extends Thread {
public void run() {
while (true) {
// 每隔 1000 毫秒生产一个商品
try {
Thread.sleep(1000);
goods.put(new Object());
System.out.println("Produce goods, total: " + goods.size());
} catch (InterruptedException e) {
}
}
}
}
static class ConsumerThread extends Thread {
public void run() {
while (true) {
// 每隔 500 毫秒消费一个商品
try {
Thread.sleep(500);
goods.take();
System.out.println("Consume goods, total: " + goods.size());
} catch (InterruptedException e) {
}
}
}
}
}
阻塞队列的实现原理
以 ArrayBlockingQueue
为例,实际上使用了 ReentrantLock 和 Condition。
构造方法:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
插入元素,如果队列已满,则阻塞 notFull.await();:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
移除元素,如果队列已空,则阻塞 notEmpty.await();:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}