阻塞队列(BlockingQueue)

  BlockingQueue是java.until.concurrent包下的一个重要的数据结构,其继承于Queue接口,Queue继承于Collection。BlockingQueue提供了对数据的线程安全操作,其方法如下:

/**
  * Inserts the specified element into this queue if   it is possible to do so
  * immediately without violating capacity  restrictions, returning
  * {@code true} upon success and throwing an {@code  IllegalStateException}
  * if no space is currently available.
  *往队列中插入一个特定元素,如果空间允许,一旦成功就返回true,如果当前没有 
  可用空间,则抛出IllegalStateException异常
  ...
 */
boolean add(E e);

/**
 * Inserts the specified element into this queue if it is possible to do
 * so immediately without violating capacity restrictions.
 * When using a capacity-restricted queue, this method is generally
 * preferable to {@link #add}, which can fail to insert an element only
 * by throwing an exception.
 *往队列中插入一个元素,如果没有空间限制,在使用有容量限制的队列时,该方法比
 add()方法更可取,因为add方法失败时只是抛出异常,而offer会返回false
 ...
 */
boolean offer(E e);


/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *往队列中插入一个元素,与前两个方法不同的是,如果当前没有空间可用,则阻塞当  
 *前线程,直至有有空间
...
 */
void put(E e) throws InterruptedException;


/**
 * Inserts the specified element into this queue, waiting up to the
 * specified wait time if necessary for space to become available.
 往队列中插入元素,可以设置等待时间,如果无空间可用,则等待指定时间。若指定   
 时间后还没有空间可用,则返回false
 *
...
 */
boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;


/**
 * Retrieves and removes the head of this queue,   waiting if necessary
 * until an element becomes available.
 *获取并从队列中移除排在首位的元素,如果队列为空,则阻塞当前线程直到有新的元素加入
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;


/**
 * Retrieves and removes the head of this queue, waiting up to the
 * specified wait time if necessary for an element to become available.
 *获取并移除队列中处在首位的数据,如果队列为空,则等待指定时间,等待指定时间后,若不为空,则返回对首数据,否则返回null
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element is available
 * @throws InterruptedException if interrupted while waiting
 */
E poll(long timeout, TimeUnit unit)throws InterruptedException;

boolean remove(Object o);  //从队列中移除指定元素


/**
 * Returns the number of additional elements that this queue can ideally
 * (in the absence of memory or resource constraints) accept without
 * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
 * limit.
 *返回队列中元可容纳元素的最大数量,如果没有内部限制,则返回Integer.MAX_VALUE
 *
 * @return the remaining capacity
 */
int remainingCapacity();


public boolean contains(Object o);   
//判断队列中是否包含某特定元素


int drainTo(Collection<? super E> c);  
//移除队列中的所有元素并加入到指定集合中,返回传输的元素个数


int drainTo(Collection<? super E> c, int maxElements);   
//从这个队列中最多删除指定数量的可用元素,并将它们添加到给定的集合中

  BlockingQueue是一个接口,java并发包下提供了其多种实现:
1.ArrayBlockingQueue

ArrayBlockingQueue是一个基于数组的先进先出有界阻塞队列,其内部维护一个定长数组:

/** The queued items */  
final Object[] items;  //存放元素的数组

/** items index for next take, poll, peek or remove */
int takeIndex;   //队首元素的索引

/** items index for next put, offer, or add */
int putIndex;  //下一个加入的元素存放的索引

  由于ArrayBlockingQueue放入数据和获取数据使用的是同一个锁对象,因此两者无法真正并行运行,创建ArrayBlockingQueue对象时,可以控制对象的内部所是否采用公平锁,默认采用非公平锁.
2.LinkedBlockingQueue

LinkedBlockingQueue是一个基于单向链表的阻塞队列,其内部也维护一个数据缓冲队列(由链表组成)。只有缓冲队列区容量达到最大 缓存容量时,才会阻塞生产者队列,直到消费者线程从队列中消费一份数据。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部采用不同的锁来放入数据和获取数据,因此在高并发场合可以更高效地处理数据。
需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

3.DelayQueue

DelayQueue是一个无界的阻塞队列,队列中的元素只有当其指定的延迟时间到了,才能从队列中获取该元素。因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。其内部实现使用一个优先队列,调用offer方法时,将delay对象加入到优先队列中:

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true}
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

调用take方法从优先队列中将first移出,如果没有达到延迟时间,则进行await处理:

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

4.PriorityBlockingQueue

PriorityBlockingQueue是一个基于优先级的无界阻塞队列,里面的对象必须实现Comparable接口,队列通过这个接口的compare方法确定对象的优先级,内部控制线程同步的锁采用的是公平锁。
该队列offer方法如下:

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 * 由于是无界的,故该方法始终返回true,且不会阻塞
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;

该队列take方法如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

当队列为空时,会阻塞当前线程,知道有新的数据加入队列

5.SynchronousQueue

SynchronousQueue是一种无缓冲的等待队列,内部使用公平锁,其容量为0,不允许存放null元素。任何一个对SynchronousQueue的写需要等待一个对SynchronousQueue的读,类似于在CSP和Ada中使用的会合通道。与其说他是一个队列,不如说他是一个数据交换通道。

使用场景:
SynchronousQueue非常适合于切换设计,其中一个运行在一个线程中的对象必须与在另一个线程中运行的对象同步,以便传递一些信息、事件或任务

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,406评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,976评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,302评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,366评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,372评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,457评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,872评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,521评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,717评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,523评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,590评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,299评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,859评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,883评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,127评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,760评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,290评论 2 342

推荐阅读更多精彩内容

  • 该随笔内容完全引自http://wsmajunfeng.iteye.com/blog/1629354一. 前言在新...
    抓兔子的猫阅读 497评论 0 11
  • 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线...
    端木轩阅读 997评论 0 2
  • 此刻 我抱着你 你坐在腰凳上 这几天 你都不要自己躺下睡 要在我的后背 或者我的怀抱里入睡 即便昨天是母亲节 也没...
    漫漫无忧阅读 165评论 3 5
  • 昨天学习了叫兽的「七招让你的文案说人话」,今天就来应用下,尝试着修改腾讯儿童管家的部分文案,不足之处,还请多指教。...
    小万管家阅读 609评论 0 5
  • 带女儿到楼下小花园玩足球,一脚踢出去,小足球就被一群小孩给抢走了。 那几个孩子比我家女儿大些,风风火火地追着球跑来...
    爱分享的丁丁妈阅读 126评论 0 0