Java LinkedBlockingQueue

本章介绍JUC包中的LinkedBlockingQueue。
目录

  • LinkedBlockingQueue介绍
  • LinkedBlockingQueue原理和数据结构
  • LinkedBlockingQueue函数列表
  • LinkedBlockingQueue源码分析(JDK1.7.0_40版本)]
  • 创建
  • 添加
  • 取出
  • 遍历
  • LinkedBlockingQueue示例

LinkedBlockingQueue介绍

LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

LinkedBlockingQueue原理和数据结构

LinkedBlockingQueue的数据结构,如下图所示:

img

说明:
(1) LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
(2) LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
(3) LinkedBlockingQueue是通过单链表实现的。
3.a) head是链表的表头。取出数据时,都是从表头head处插入。
3.b) last是链表的表尾。新增数据时,都是从表尾last处插入。
3.c) count是链表的实际大小,即当前链表中包含的节点个数。
3.d) capacity是列表的容量,它是在创建链表时指定的。
3.e) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。

LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。 此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。

-- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。

-- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

关于ReentrantLock 和 Condition的内容,可以参考之前的章节。

LinkedBlockingQueue函数列表

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。LinkedBlockingQueue(int capacity)
// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

LinkedBlockingQueue源码分析(JDK1.7.0_40版本)

下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面进行分析。

创建

下面以LinkedBlockingQueue(int capacity)来进行说明。

public LinkedBlockingQueue(int capacity) { 
  if (capacity <= 0) throw new IllegalArgumentException();
  this.capacity = capacity; last = head = new Node<E>(null);
}

说明:
(01) capacity是“链式阻塞队列”的容量。
(02) head和last是“链式阻塞队列”的首节点和尾节点。
它们在LinkedBlockingQueue中的声明如下:

// 容量
private final int capacity;
// 当前数量
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head; 
// 链表的表头
private transient Node<E> last; 
// 链表的表尾
// 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmptyprivate 
final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

链表的节点定义如下:

static class Node<E> { 
  E item; 
  // 数据 
  Node<E> next; 
  // 下一个节点的指针 
  Node(E x) { 
    item = x; 
  }
}

添加

下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。

public boolean offer(E e) { 
  if (e == null) throw new NullPointerException(); 
  // 如果“队列已满”,则返回false,表示插入失败。 
  final AtomicInteger count = this.count;
  if (count.get() == capacity) return false; 
  int c = -1; 
  // 新建“节点e” 
  Node<E> node = new Node(e); 
  final ReentrantLock putLock = this.putLock; 
  // 获取“插入锁putLock” 
  putLock.lock(); 
  try { 
    // 再次对“队列是不是满”的进行判断。 
    // 若“队列未满”,则插入节点。 
    if (count.get() < capacity) { 
      // 插入节点 enqueue(node);
      // 将“当前节点数量”+1,并返回“原始的数量” 
      c = count.getAndIncrement(); 
      // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。 
      if (c + 1 < capacity)  notFull.signal(); 
    } 
  } finally { 
      // 释放“插入锁putLock” 
      putLock.unlock();
  } 

  // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程 
  if (c == 0) 
    signalNotEmpty(); 
    return c >= 0;
  }

说明:offer()的作用很简单,就是将元素E添加到队列的末尾。

enqueue()的源码如下:

private void enqueue(Node<E> node) { 
  // assert 
  putLock.isHeldByCurrentThread(); 
  // assert 
  last.next == null; 
  last = last.next = node;
}

enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!

signalNotEmpty()的源码如下:

private void signalNotEmpty() { 
  final ReentrantLock takeLock = this.takeLock; 
  takeLock.lock(); 
  try { 
    notEmpty.signal(); 
  } finally { 
    takeLock.unlock(); 
  }
}

signalNotEmpty()的作用是唤醒notEmpty上的等待线程。

取出

下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。

public E take() throws InterruptedException { 
  E x; 
  int c = -1; 
  final AtomicInteger count = this.count; 
  final ReentrantLock takeLock = this.takeLock; 
  // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常 
  takeLock.lockInterruptibly(); 
  try { 
    // 若“队列为空”,则一直等待。 
    while (count.get() == 0) { 
      notEmpty.await(); 
    } 
    // 取出元素 
    x = dequeue(); 
    // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 
    c = count.getAndDecrement();
    if (c > 1) notEmpty.signal();
  } finally { 
    // 释放“取出锁” 
    takeLock.unlock(); 
  } 
  // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。 
  if (c == capacity) 
    signalNotFull(); 
  return x;
}

说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。

dequeue()的源码如下:

private E dequeue() { 
  // assert 
  takeLock.isHeldByCurrentThread(); 
  // assert 
  head.item == null; 
  Node<E> h = head; 
  Node<E> first = h.next; 
  h.next = h; 
  // help GC 
  head = first; 
  E x = first.item; 
  first.item = null; 
  return x;
}

dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。

signalNotFull()的源码如下:

private void signalNotFull() { 
  final ReentrantLock putLock = this.putLock; 
  putLock.lock(); 
  try { 
    notFull.signal(); 
  } finally { 
    putLock.unlock(); 
  }
}

signalNotFull()的作用就是唤醒notFull上的等待线程。

遍历

下面对LinkedBlockingQueue的遍历方法进行说明。

public Iterator<E> iterator() { return new Itr();}

iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

private class Itr implements Iterator<E> { 
  // 当前节点 
  private Node<E> current; 
  // 上一次返回的节点 
  private Node<E> lastRet; 
  // 当前节点对应的值 
  private E currentElement; 
  Itr() { 
    // 同时获取“插入锁putLock” 和 “取出锁takeLock” 
    fullyLock(); 
    try { 
      // 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点 
      current = head.next; 
      if (current != null) 
        currentElement = current.item; 
      } finally { 
        // 释放“插入锁putLock” 和 “取出锁takeLock” 
        fullyUnlock(); 
      }
    } 
    // 返回“下一个节点是否为null” 
    public boolean hasNext() { return current != null; } 

    private Node<E> nextNode(Node<E> p) { 
      for (;;) { 
        Node<E> s = p.next; 
        if (s == p) return head.next; 
        if (s == null || s.item != null) 
          return s; 
        p = s; 
      }
    } 

    // 返回下一个节点
    public E next() { 
      fullyLock(); 
      try { 
        if (current == null) throw new NoSuchElementException(); 
        E x = currentElement; 
        lastRet = current; 
        current = nextNode(current); 
        currentElement = (current == null) ? null : current.item; 
        return x; 
      } finally { 
        fullyUnlock(); 
      } 
    } 

    // 删除下一个节点 
    public void remove() { 
      if (lastRet == null) throw new IllegalStateException(); 
      fullyLock(); 
      try { 
        Node<E> node = lastRet; 
        lastRet = null; 
        for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { 
          if (p == node) { unlink(p, trail); break; }
         } 
      } finally { 
        fullyUnlock(); 
      } 
    }
}

LinkedBlockingQueue示例

import java.util.*;
import java.util.concurrent.*;
/** LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。 
* 下面是“多个线程同时操作并且遍历queue”的示例 
* (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。 
* (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 
*  @author skywang 
*/
public class LinkedBlockingQueueDemo1 { 
  // TODO: queue是LinkedList对象时,程序会出错。
  //private static Queue<String> queue = new LinkedList<String>();
  private static Queue<String> queue = new LinkedBlockingQueue<String>(); 
  public static void main(String[] args) { 
    // 同时启动两个线程对queue进行操作! 
    new MyThread("ta").start(); 
    new MyThread("tb").start(); 
  } 

  private static void printAll() { 
    String value; 
    Iterator iter = queue.iterator(); 
    while(iter.hasNext()) { 
      value = (String)iter.next(); 
      System.out.print(value+", "); 
    } 
    System.out.println(); 
  } 

  private static class MyThread extends Thread { 
    MyThread(String name) { 
      super(name); 
    } 

    @Override 
    public void run() { 
      int i = 0; 
      while (i++ < 6) { 
        // “线程名” + "-" + "序号" 
        String val = Thread.currentThread().getName()+i;
        queue.add(val); 
        // 通过“Iterator”遍历queue。 
        printAll(); 
      } 
    } 
}

(某一次)运行结果:


tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, ta3, 
tb1, ta1, ta2, ta3, ta4, 
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, 
ta4, tb1, ta5, ta1, ta6, 
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, 
ta5, ta6, tb2, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,

结果说明:
示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。

当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 201,784评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,745评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,702评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,229评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,245评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,376评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,798评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,471评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,655评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,485评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,535评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,235评论 3 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,793评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,863评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,096评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,654评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,233评论 2 341

推荐阅读更多精彩内容

  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,653评论 0 44
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,780评论 1 19
  • 佳木斯的冬夜零下三十多度,賊冷贼冷。午夜一点多我和父亲下了火车,为省住宿费,就在售票处等天亮去桦南...
    胡长溪阅读 308评论 0 3
  • 做了一个梦 梦见下雨了 我总是梦见下雨 我走在路上没有带伞 刚好走到你家门前 想去你家借伞 门没关 我走进去 你还...
    c2cf7f6e850e阅读 224评论 0 0
  • 我想去每一个地方 记录下时间留下的史书 落花不是无情物 此处无声胜有声 幽僻处可有人行 悠然自得 你的眼中 是否 ...
    林夕曦阅读 139评论 0 2