LinkedBlockingQueue 源码分析 (基于Java 8)

1. LinkedBlockingQueue定义

一个基于链表实现的有固定容量的 FIFO 的阻塞队列, 队列中存在最久的元素存在于 head.next 节点上(PS: head 节点永远存在, 且是一个 dummy 节点), 存储时间最短的节点存储在tail上; 通常情况下 LinkedBlockingQueue 的吞吐量要好于 ArrayBlockingQueue.
主要特点:

  1. 基于两个lock的 queue, putLock, takeLock; 并且两个锁都有相关联的 condition 用于相应的 await; 每次进行 put/offer 或 take/poll 之后会根据queue的容量进行判断是否需要进行对应的唤醒
  2. 队列中总是存在一个 dummy 节点, 每次 poll 节点时获取的是 head.next 节点中的值
2. LinkedBlockingQueue 基本属性

queue中的数据存储在 Node 对象中, 且 Node 具有以下的特点:

  1. head.item 永远是 null, head是一个 dummy 节点, 所以进行 poll 时获取的是 head.next 的值
  2. tail.next = null
/** Linked list node class */
/**
 * Linked 的数据节点, 这里有个特点, LinkedBlockingQueue 开始构建时会创建一个dummy节点(类似于 ConcurrentLinkedQueue)
 * 而整个队列里面的头节点都是 dummy 节点
 * @param <E>
 */
static class Node<E>{
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    /**
     * 在进行 poll 时 会将 node.next = node 来进行 help gc
     * next == null 指的是要么是队列的 tail 节点
     */
    Node<E> next;
    Node(E x){
        item = x;
    }
}

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list
 * Invariant: head.item == null
 * Head 节点的不变性 head.item == null <- 这是一个 dummy 节点(dummy 节点什么作用呢, 主要是方便代码, 不然的话需要处理一些 if 的判断, 加大代码的复杂度, 尤其是非阻塞的实现)
 */
transient Node<E> head;

/**
 * Tail of linked list
 * Invariant: last.next == null
 * Tail 节点的不变性 last.next == null <- 尾节点的 next 是 null
 */
private transient Node<E> last;

/** ReentrantLock Condition 的常见使用方式 */
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
3. LinkedBlockingQueue 构造函数

LinkedBlockingQueue的构造函数比较简单, 主要是初始化一下容量(默认 Integer.MAX_VALUE), 及 head, tail

/**
 * Creates a {@code KLinkedBlockingQueue} with the given (fixed) capacity
 *
 * @param capacity the capacity of this queue
 * @throws IllegalArgumentException if {@code capacity} is not greater
 *                                  than zero
 */
public KLinkedBlockingQueue(int capacity){
    if(capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity; // 指定 queue 的容量
    last = head = new Node<E>(null); // 默认的在 queue 里面 创建 一个 dummy 节点
}
4. 添加元素 put方法

put 方法是将元素添加到队列尾部, queue满时进行await, 添加成功后容量还未满, 则进行 signal

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary for space to become available
 *
 *  将元素加入到 queue 的尾部
 * @param e
 * @throws InterruptedException
 */
public void put(E e) throws InterruptedException{
    if(e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negativeto indicate failure unless set.
    // 有趣的 变量 c 下面会有对它的讲解
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLocK = this.putLock;
    final AtomicInteger count = this.count;  // 获取 queue 的数量 count (这是 count 只可能 减, 不能增)
    putLocK.lockInterruptibly(); // 获取 put 的lock
    try {
        /**
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards
         */
        /**
         * 若 queue 的容量满了 则进行 await,直到有人进行通知
         * 那何时进行通知呢?
         * 有两种情况进行通知,
         *      (1) 有线程进行 put/offer 成功后且 (c + 1) < capacity 时
         *      (2) 在线程进行 take/poll 成功 且 (c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量)
         */

        while(count.get() == capacity){     // 容量满了, 进行等待
            notFull.await();
        }
        enqueue(node);                        // 进行节点的入队操作
        c = count.getAndIncrement();          // 进行节点个数的增加1, 返回原来的值
        if(c + 1 < capacity){               // 说明 现在的 put 操作后 queue 还没满
            notFull.signal();               // 唤醒其他在睡的线程
        }

    }finally {
        putLock.unlock();                   // 释放锁
    }
    if(c == 0){                             // c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
        signalNotEmpty();
    }
}

/**
 * Links node at end of queue
 * 节点 入队列 (PS: 这里因为有个 dummy 节点, 不需要判空 <- 现在有点感觉 dummy 节点的作用了吧)
 * @param node the node
 */
private void enqueue(Node<E> node){
    // assert putLock.isHeldByCurrentThread()
    // assert last.next == null
    last = last.next = node;
}

/**
 * Signals a waiting take. Called only from put/offer (which do not
 * otherwise ordinarily lock takeLock.)
 */
private void signalNotEmpty(){
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    }finally {
        takeLock.unlock();
    }
}

代码的注释中基本把操作思想都说了, 有几个注意的地方

  1. 当queue满时, 会调用 notFull.await() 进行等待, 而相应的唤醒的地方有两处, 一个是 "有线程进行 put/offer 成功后且 (c + 1) < capacity 时", 另一处是 "在线程进行 take/poll 成功 且 (c == capacity) (PS: 这里的 c 指的是 在进行 take/poll 之前的容量)"
  2. 代码中的 "signalNotEmpty" 这时在原来queue的数量 c (getAndIncrement的返回值是原来的值) ==0 时对此时在调用 take/poll 方法的线程进行唤醒
5. 添加元素offer 方法

offer与put都是添加元素到queue的尾部, 只不过 put 方法在队列满时会进行阻塞, 直到成功; 而 offer 操作在容量满时直接返回 false.

/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available
 *
 *  支持中断和超时的 offer 节点
 *
 * @param e
 * @param timeout
 * @param unit
 * @return {@code true} if successful, or {@code false} if
 *          the specified waiting time elapses before space is available
 * @throws InterruptedException
 */
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException{
    if(e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;     // 获取 put lock
    final AtomicInteger count = this.count;         // 获取 queue 的容量
    putLock.lockInterruptibly();
    try {
        while(count.get() == capacity){             // queue的容量满了进行 带 timeout 的 await
            if(nanos <= 0){                           //  用光了 timeout 直接 return false
                return false;
            }
            nanos = notFull.awaitNanos(nanos);      // 直接 await (PS: 返回值 nanos <= 0 说明 等待是超时了, 正常 await 并且 被 signal nanos > 0; 具体详情会在 Condition 那一篇中详细说明)
        }
        enqueue(new Node<E>(e));                    // 节点若队列
        c = count.getAndIncrement();                // 获取入队列之前的容量
        if(c + 1 < capacity){                     // c + 1 < capacity 说明 现在的 offer 成功后 queue 还没满
            notFull.signal();                     // 唤醒其他正在 await 的线程
        }
    }finally {
        putLock.unlock();                           // 释放锁
    }
    if(c == 0){
        signalNotEmpty();                            // c == 0 说明 原来queue是空的, 所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
    }
    return true;
}

offer 整个操作和 put 差不多, 唯一变化的是多了一个 notFull.awaitNanos(nanos), 这个函数的返回值若是负数, 则说明等待超时, 则直接 return false (关于 Condition.awaitNanos 方法会在后续再说)

6. 获取queue头元素 take 方法

此方法是获取 queue 中呆着时间最长的节点的值(head.next)

/**
 * 取走 queue 中呆着时间最长的节点的 item (其实就是 head.next.item 的值)
 * @return
 * @throws InterruptedException
 */
public E take() throws InterruptedException{
    E x;
    int c = -1;
    final AtomicInteger count = this.count;          // 获取 queue 的容量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();                      // 获取 lock
    try {
        while(count.get() == 0){                      // queue 为空, 进行 await
            notEmpty.await();
        }
        x = dequeue();                                 // 将 head.next.item 的值取出, head = head.next
        c = count.getAndDecrement();                   // queue 的容量计数减一
        if(c > 1){
            notEmpty.signal();                        // c > 1 说明 进行 take 后 queue 还有值
        }
    }finally {
        takeLock.unlock();                              // 释放 lock
    }
    if(c == capacity){                                // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        signalNotFull();
    }
    return x;
}

/**
 * Removes a node from head of queue
 * 节点出队列 这里有个注意点 head 永远是 dummy 节点, dequeue 的值是 head.next.item 的值
 * 在 dequeue 后 将 原  head 的后继节点设置为 head(成为了 dummy 节点)
 * @return the node
 */
private E dequeue(){
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;       // 这里的 head 是一个 dummy 节点
    Node<E> first = h.next; // 获取真正的节点
    h.next = h;             // help GC
    head = first;           // 重行赋值 head
    E x = first.item;       // 获取 dequeue 的值
    first.item = null;      // 将 item 置 空
    return x;
}

/** Signal a waiting put. Called only from take/poll */
private void signalNotFull(){
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    }finally {
        putLock.unlock();
    }
}

操作过程: 将 head.next 的值取出, 将 head.next 设置为新的head; 操作的步骤比较少, 只有两处 condition 的唤醒需要注意一下:

  1. 当 take 结束时, 判断 queue 是否还有元素 (c > 1) 来进行 notEmpty.signal()
  2. 当 take 结束时, 判断原先的容量是否已经满 (c == capacity) 来决定是否需要调用 signalNotFull 进行唤醒此刻还在等待 put/offer 的线程
7. 获取queue头元素 poll 方法

poll 与 take 都是获取头节点的元素, 唯一的区别是 take在queue为空时进行await, poll 则直接返回

/**
 * 带 timeout 的poll 操作, 获取 head.next.item 的值
 * @param timeout
 * @param unit
 * @return
 * @throws InterruptedException
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException{
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);             //  计算超时时间
    final AtomicInteger count = this.count;       // 获取 queue 的容量
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();                   // 获取 lock
    try{
        while(count.get() == 0){                   // queue 为空, 进行 await
            if(nanos <= 0){                        // timeout 用光了, 直接 return null
                return null;
            }
            nanos = notEmpty.awaitNanos(nanos);   // 调用 condition 进行 await, 在 timeout之内进行 signal -> nanos> 0
        }
        x = dequeue();                             // 节点出queue
        c = count.getAndDecrement();               // 计算器减一
        if(c > 1){                                 // c > 1 说明 poll 后 容器内还有元素, 进行 换新 await 的线程
            notEmpty.signal();
        }
    }finally {
        takeLock.unlock();                         // 释放锁
    }
    if(c == capacity){                           // c == capacity 说明一开始 queue 是满的, 调用 signalNotFull 进行唤醒一下 put/offer 的线程
        signalNotFull();
    }
    return x;
}
8. 删除queue元素 remove 方法
/**
 * Removes a single instance of the specified element from this queue,
 * if it is present. More formally, removes an element {@code e} such
 * that {@code o.equals(e)}, if this queue contains one or more such
 * elements
 * Returns {@code true} if this queue contained the specified element
 * (or equivalently, if this queue changed as a result of the call)
 *
 * 删除 queue 中的节点
 *
 * @param o element to be removed from this queue, if present
 * @return {@code true} if this queue changed as a result of the call
 */
public boolean remove(Object o){
    if(o == null) return false;
    fullyLock();                                         // 获取所有锁

    try {
        for(Node<E> trail = head, p = trail.next;     // 进行变量的初始化 trail是 p 的前继节点
                p != null;
                trail = p, p = p.next){
            if(o.equals(p.item)){
                unlink(p, trail);                      // 调用 unlink 进行删除
                return true;
            }
        }
        return false;
    }finally {
        fullyUnlock();                                  // 释放所有锁
    }
}

/** Unlinks interior Node p with predecessor trail */
/**
 * 直接将这个方法看做是 将 节点 p 从 queue 中进行删除
 * @param p
 * @param trail
 */
void unlink(Node<E> p, Node<E> trail){
    // assert isFullLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee
    p.item = null;                      // 删除 p.item
    trail.next = p.next;                // 删除节点 p
    if(last == p){                      // 若节点p 是last, 则将p的前继节点trail置为 last
        last = trail;
    }
    if(count.getAndDecrement() == capacity){    // count.getAndDecrement() == capacity 说明 queue 在删除节点之前是满的, 所以唤醒一下在 put/offer 的线程
        notFull.signal();
    }
}

remove的代码比较少, 有两次需要注意:

  1. trail 是 节点 p 的前继节点
  2. 删除结束后会判断之前的容量是否是满的来决定 调用notFull.signal() 进行线程唤醒操作
9. 总结

LinkedBlockingQueue 是一个基于链表实现的阻塞queue, 它的性能好于 ArrayBlockingQueue, 但是差于 ConcurrentLinkeQueue; 并且它非常适于生产者消费者的环境中, 比如 Executors.newFixedThreadPool() 就是基于这个队列的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容