Java并发集合_LinkedBlockingQueue原理分析

在上一章我们讲解了ArrayBlockingQueue,用数组形式实现的阻塞队列。

数组的长度在创建时就必须确定,如果数组长度小了,那么ArrayBlockingQueue队列很容易就被阻塞,如果数组长度大了,就容易浪费内存。

而队列这个数据结构天然适合用链表这个形式,而LinkedBlockingQueue就是使用链表方式实现的阻塞队列。

一. 链表实现

1.1 Node内部类

    /**
     * 链表的节点,同时也是通过它来实现一个单向链表
     */
    static class Node<E> {
        E item;

        // 指向链表的下一个节点
        Node<E> next;

        Node(E x) { item = x; }
    }

有一个变量e储存数据,有一个变量next指向下一个节点的引用。它可以实现最简单地单向列表。

1.2 怎样实现链表

     /**
     * 它的next指向队列头,这个节点不存储数据
     */
    transient Node<E> head;

    /**
     * 队列尾节点
     */
    private transient Node<E> last;

要实现链表,必须有两个变量,一个表示链表头head,一个表示链表尾last。head和last都会在LinkedBlockingQueue对象创建的时候被初始化。

last = head = new Node<E>(null);

注意这里用了一个小技巧,链表头head节点并没有存放数据,它指向的下一个节点,才真正存储了链表中第一个数据。而链表尾last的确储存了链表最后一个数据。

1.3 插入和删除节点

  /**
     * 向队列尾插入节点
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread(); // 当前线程肯定获取了putLock锁
        // 将原队列尾节点的next引用指向新节点node,然后再将node节点设置成队列尾节点last
        // 这样就实现了向队列尾插入节点
        last = last.next = node;
    }

在链表尾插入节点很简单,将原队列尾last的下一个节点next指向新节点node,再将新节点node赋值给队列尾last节点。这样就实现了插入一个新节点。

    // 移除队列头节点,并返回被删除的节点数据
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread(); // 当前线程肯定获取了takeLock锁
        // assert head.item == null;

        Node<E> h = head;
        // first节点中才存储了队列中第一个元素的数据
        Node<E> first = h.next;
        h.next = h; // help GC
        // 设置新的head值,相当于删除了first节点。因为head节点本身不储存数据
        head = first;
        // 队列头的数据
        E x = first.item;
        // 移除原先的数据
        first.item = null;
        return x;
    }

要注意head并不是链表头,它的next才是指向链表头,所以删除链表头也很简单,就是将head.next赋值给head,然后返回原先head.next节点的数据。

删除的时候,就要注意链表为空的情况。head.next的值使用enqueue方法添加的。当head.next==last的时候,表示已经删除到最后一个元素了,当head.next==null的时候,就不能删除了,因为链表已经为空了。这里没有做判断,是因为在调用dequeue方法的地方已经做过判断了。

二. 同步锁ReentrantLock和条件Condition

因为阻塞队列在队列为空和队列已满的情况下,都必须阻塞等待,那么就天然需要两个条件。而为了保证多线程并发安全,又需要一个同步锁。这个在ArrayBlockingQueue中已经说过了。
这里我们来说说LinkedBlockingQueue不一样的地方。

    /** 独占锁,用于处理插入队列操作的并发问题,即put与offer操作 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列不满的条件Condition,它是由putLock锁生成的 */
    private final Condition notFull = putLock.newCondition();

    /** 独占锁,用于处理删除队列头操作的并发问题,即take与poll操作 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列不为空的条件Condition, 它使用takeLock锁生成的 */
    private final Condition notEmpty = takeLock.newCondition();

2.1 putLock与takeLock

我们发现使用了两把锁:

  1. putLock 同步所有插入元素的操作,即put与offer系列方法的操作。
  2. takeLock 同步删除和获取元素的操作,即take与poll系列方法操作。

按照上面的说法,可能会出现同时插入元素和删除元素的操作,那么就不会出现问题么?
我们来具体分析一个下,对于队列来说操作分为三种:

  1. 在队列尾插入元素。即put与offer系列方法,它们会涉及两个变量,队列中元素个数count,和队列尾节点last。(不会涉及head节点)
  2. 移除队列头元素。即take与poll系列方法,它们会涉及两个变量,队列中元素个数count,和head节点。(不会涉及队列尾节点last)
  3. 查看队列头元素。即 element()与peek()方法,它们会涉及两个变量,队列中元素个数count,和head节点。(不会涉及队列尾节点last)

因此使用putLock锁来保持last变量的安全,使用takeLock锁来保持head变量的安全。
对于都涉及了队列中元素个数count变量,所以使用AtomicInteger来保证并发安全问题。

   /** 队列中元素的个数,这里使用AtomicInteger变量,保证并发安全问题 */
    private final AtomicInteger count = new AtomicInteger();

2.2 notFull与notEmpty

  1. notFull 是由putLock锁生成的,因为当插入元素时,我们要判断队列是不是已满。
  2. notEmpty 是由takeLock锁生成的,因为当删除元素时,我们要判断队列是不是为空。

2.3 控制流程

当插入元素时:

  1. 先使用putLock.lockInterruptibly()保证只有一个线程进行插入操作.
  2. 然后利用count变量,判断队列是否已满.
  3. 满了就调用notFull.await()方法,让当前线程等待。因为notFull是由putLock产生的,这里已经获取到锁了,所以可以调用await方法。
  4. 没满就调用 enqueue方法,向队列尾插入新元素。
  5. 调用count.getAndIncrement()方法,将队列中元素个数加一,并保证多线程并发安全。
  6. 调用signalNotEmpty方法,唤醒正在等待获取元素的线程。

当删除元素时:

  1. 先使用takeLock.lockInterruptibly()保证只有一个线程进行删除操作.
  2. 然后利用count变量,判断队列是否为空.
  3. 队列为空就调用notEmpty.await()方法,让当前线程等待。因为notEmpty是由takeLock产生的,这里已经获取到锁了,所以可以调用await方法。
  4. 没满就调用 dequeue方法,删除队列头元素。
  5. 调用count.getAndDecrement()方法,将队列中元素个数减一,并保证多线程并发安全。
  6. 调用signalNotFull方法,唤醒正在等待插入元素的线程。

还要注意一下,Condition的signal和await方法必须在获取锁的情况下调用。因此就有了signalNotEmpty和signalNotFull方法:

    /**
     * 唤醒在notEmpty条件下等待的线程,即移除队列头时,发现队列为空而被迫等待的线程。
     * 注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了takeLock.lock()方法。
     * 当队列中插入元素(即put或offer操作),那么队列肯定不为空,就会调用这个方法。
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 唤醒在notFull条件下等待的线程,即队列尾添加元素时,发现队列已满而被迫等待的线程。
     * 注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了putLock.lock()方法
     * 当队列中删除元素(即take或poll操作),那么队列肯定不满,就会调用这个方法。
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

三. 插入元素方法

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 记录插入操作前元素的个数
        int c = -1;
        // 创建新节点node
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //表示队列已满,那么就要调用notFull.await方法,让当前线程等待
            while (count.get() == capacity) {
                notFull.await();
            }
            // 向队列尾插入新元素
            enqueue(node);
            // 将当前队列元素个数加1,并返回加1之前的元素个数。
            c = count.getAndIncrement();
            // c + 1 < capacity表示队列未满,就唤醒可能等待插入操作的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c == 0表示插入之前,队列是空的。队列从空到放入一个元素时,
        // 才唤醒等待删除的线程
        // 防止频繁获取takeLock锁,消耗性能
        if (c == 0)
            signalNotEmpty();
    }

以put方法为例,大体流程与我们前面介绍一样,这里有一个非常怪异的代码,当插入完元素时,如果发现队列未满,那么调用notFull.signal()唤醒等待插入的线程。

大家就很疑惑了,一般来说,这个方法应该放在删除元素(take系列的方法里),因为当我们删除一个元素,那么队列肯定是未满的,那么调用notFull.signal()方法,唤醒等待插入的线程。

这里这么做主要是因为调用signal方法,必须先获取对应的锁,而在take系列的方法里使用的锁是takeLock,那么想调用notFull.signal()方法,必须先获取putLock锁,这样的话会性能就会下降,所以用了另一种方式。

  1. 首先我们应该知道signal方法,当有线程在这个条件下等待时,才会唤醒其中一个线程,当没有线程等待时,这个方法相当于什么都没做。所以这个方法的意义是可能会唤醒等待的一个线程。
  2. 当队列未满时,我们都调用notFull.signal()尝试去唤醒一个等待插入线程。而且这里已经获取putLock锁了,所以不耗时。
  3. 但是有一个问题,当队列已满的时候,所有插入操作的线程,都会等待,就没有机会调用notFull.signal()方法,那么唤醒这些等待线程呢?
  4. 唤醒这些线程的启动条件,必须是由删除元素操作触发的,因为只有删除队列才会不满。因为在take方法中 if (c == capacity) signalNotFull();

四. 删除队列头元素

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //表示队列为空,那么就要调用notEmpty.await方法,让当前线程等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 删除队列头元素,并返回它
            x = dequeue();
            // 返回当前队列个数,然后将队列个数减一
            c = count.getAndDecrement();
            // c > 1表示队列不为空,就唤醒可能等待删除操作的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        /**
         * c == capacity表示删除操作之前,队列是满的。只有从满队列中删除一个元素时,
         * 才唤醒等待插入的线程
         * 防止频繁获取putLock锁,消耗性能
         */
        if (c == capacity)
            signalNotFull();
        return x;
    }

为什么调用notEmpty.signal()方法原因,对比一下我们在插入元素方法中的解释。

五. 查看队列头元素

    // 查看队列头元素
    public E peek() {
        // 队列为空,返回null
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 获取队列头节点first
            Node<E> first = head.next;
            // first == null表示队列为空,返回null
            if (first == null)
                return null;
            else
                // 返回队列头元素
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

查看队列头元素,涉及到head节点,所以必须使用takeLock锁。

六. 其他重要方法

6.1 remove(Object o)方法

    // 从队列中删除指定元素o
    public boolean remove(Object o) {
        if (o == null) return false;
        // 因为不是删除列表头元素,所以就涉及到head和last两个变量,
        // putLock与takeLock都要加锁
        fullyLock();
        try {
            // 遍历整个队列,p表示当前节点,trail表示当前节点的前一个节点
            // 因为是单向链表,所以需要记录两个节点
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                // 如果找到了指定元素,那么删除节点p
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            fullyUnlock();
        }
    }

从列表中删除指定元素,因为删除的元素不一定在列表头,所以可能会head和last两个变量,所以必须同时使用putLock与takeLock两把锁。因为是单向链表,需要一个辅助变量trail来记录前一个节点,这样才能删除当前节点p。

6.2 unlink(Node<E> p, Node<E> trail) 方法

    // 删除当前节点p,trail代表p的前一个节点
    void unlink(Node<E> p, Node<E> trail) {
        // 将当前节点的数据设置为null
        p.item = null;
        // 这样就在链表中删除了节点p
        trail.next = p.next;
        // 如果节点p是队列尾last,而它被删除了,那么就要将trail设置为last
        if (last == p)
            last = trail;
        // 将元素个数减一,如果原队列是满的,那么就调用notFull.signal()方法
        // 其实这个不用判断直接调用的,因为这里肯定获取了putLock锁
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }

要在链表中删除一个节点p,只需要将p的前一个节点trail的next指向节点p的下一个节点next。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容