JUC PriorityBlockingQueue

一、 前言

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,直接遍历队列元素是无序的。

二、 PriorityBlockingQueue类图结构

image.png

如图PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLockOffset是用来在扩容队列时候做cas的,目的是保证只有一个线程可以进行扩容。
由于这是一个优先级队列所以有个比较器comparator用来比较元素大小。lock独占锁对象用来控制同时只能有一个线程可以进行入队出队操作。notEmpty条件变量用来实现take方法阻塞模式。这里没有notFull 条件变量是因为这里的put操作是非阻塞的,为啥要设计为非阻塞的是因为这是无界队列。
最后PriorityQueue q用来搞序列化的。

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
 
    /**
     * 默认容量.
     */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
 
    /**
     * 最大容量.
     */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
    /**
     * 内部堆数组, 保存实际数据, 可以看成一颗二叉树:
     * 对于顶点queue[n], queue[2*n+1]表示左子结点, queue[2*(n+1)]表示右子结点.
     */
    private transient Object[] queue;
 
    /**
     * 队列中的元素个数.
     */
    private transient int size;
 
    /**
     * 比较器, 如果为null, 表示以元素自身的自然顺序进行比较(元素必须实现Comparable接口).
     */
    private transient Comparator<? super E> comparator;
 
    /**
     * 全局锁.
     */
    private final ReentrantLock lock;
 
    /**
     * 当队列为空时,出队线程在该条件队列上等待.
     */
    private final Condition notEmpty;
 
    // ...
}

如下构造函数,默认队列容量为11,默认比较器为null;


 private static final int DEFAULT_INITIAL_CAPACITY = 11;

 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

/**
 * 从已有集合构造队列.
 * 如果已经集合是SortedSet或者PriorityBlockingQueue, 则保持原来的元素顺序
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true;     // true if not known to be in heap order
    boolean screen = true;      // true if must screen for nulls
 
    if (c instanceof SortedSet<?>) {                        // 如果是有序集合
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false;
    } else if (c instanceof PriorityBlockingQueue<?>) {     // 如果是优先级队列
        PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class)   // exact match
            heapify = false;
    }
 
    Object[] a = c.toArray();
    int n = a.length;
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {    // 校验是否存在null元素
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)    // 堆排序
        heapify();
}

重点是第四种构造器,可以看到,PriorityBlockingQueue内部也是利用了ReentrantLock来保证并发访问时的线程安全。
PriorityBlockingQueue如果不指定容量,默认容量为11,内部数组queue其实是一种二叉树,后续我们会详细介绍。

三、 offer操作

在队列插入一个元素,由于是无界队列,所以一直为成功返回true;

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;

    //如果当前元素个数>=队列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

    try {
        Comparator<? super E> cmp = comparator;

        //默认比较器为null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自定义比较器(3)
            siftUpUsingComparator(n, e, array, cmp);

        //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程
        size = n + 1;(9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

主流程比较简单,下面看看两个主要函数

private void tryGrow(Object[] array, int oldCap) {
    // 扩容和入队/出队可以同时进行, 所以先释放全局锁
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;

    //cas成功则扩容,allocationSpinLock置1表示正在扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用cas控制只有一个线程可以扩容成功。我的理解是为了性能,因为扩容时候是需要花时间的,如果这些操作时候还占用锁那么其他线程在这个时候是不能进行出队操作的,也不能进行入队操作,这大大降低了并发性。

所以在扩容前释放锁,这允许其他出队线程可以进行出队操作,但是由于释放了锁,所以也允许在扩容时候进行入队操作,这就会导致多个线程进行扩容会出现问题,所以这里使用了一个spinlock用cas控制只有一个线程可以进行扩容,失败的线程调用Thread.yield()让出cpu,目的意在让扩容线程扩容后优先调用lock.lock重新获取锁,但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。

那copy元素数据到新数组为啥放到获取锁后面那?原因应该是因为可见性问题,因为queue并没有被volatile修饰。另外有可能在扩容时候进行了出队操作,如果直接拷贝可能看到的数组元素不是最新的。而通过调用Lock后,获取的数组则是最新的,并且在释放锁前 数组内容不会变化。

具体建堆算法:

/**
 * 将元素x插入到array[k]的位置.
 * 然后按照元素的自然顺序进行堆调整——"上浮",以维持"堆"有序.
 * 最终的结果是一个"小顶堆".
 */
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //队列元素个数>0则判断插入位置,否者直接入队(7)
    while (k > 0) {
        // 相当于(k-1)除2, 就是求k结点的父结点索引parent
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // 如果插入的结点值大于父结点, 则退出
        if (key.compareTo((T) e) >= 0)
            break;
        // 否则,交换父结点和当前结点的值
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

siftUpComparable方法的作用其实就是堆的“上浮调整”,可以把堆可以想象成一棵完全二叉树,每次插入元素都链接到二叉树的最右下方,然后将插入的元素与其父结点比较,如果父结点大,则交换元素,直到没有父结点比插入的结点大为止。这样就保证了堆顶(二叉树的根结点)一定是最小的元素。(注:以上仅针对“小顶堆”)。

下面用图说话模拟下过程:
假设队列容量为2

  • 第一次offer(2)时候
image.png

执行(1)为false所以执行(2),由于k=n=size=0;所以执行(8)元素入队,然执行(9)size+1;
现在队列状态:

image.png
  • 第二次offer(4)时候
    执行(1)为false,所以执行(2)由于k=1,所以进入while循环,parent=0;e=2;key=4;key>e所以break;然后把4存到数据下标为1的地方,这时候队列状态为:
image.png
  • 第三次offer(6)时候
    执行(1)为true,所以调用tryGrow,由于2<64所以newCap=2 + (2+2)=6;然后创建新数组并拷贝,然后调用siftUpComparable;k=2>0进入循环 parent=0;e=2;key=6;key>e所以break;然后把6放入下标为2的地方,现在队列状态:
image.png
  • 第四次offer(1)时候
    执行(1)为false,所以执行(2)由于k=3,所以进入while循环,parent=0;e=2;key=1; key<e;所以把2复制到数组下标为3的地方,然后k=0退出循环;然后把2存放到下标为0地方,现在状态:
image.png

四、 poll操作

在队列头部获取并移除一个元素,如果队列为空,则返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

主要看dequeue

private E dequeue() {

    //队列为空,则返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {

        //获取队头元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //获取对尾元素,并值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//cmp=null则调用这个,把对尾元素位置插入到0位置,并且调整堆为最小堆(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;(4)
        return result;
    }
}

    /**
 * 堆的"下沉"调整.
 * 删除array[k]对应的结点,并重新调整堆使其有序.
 *
 * @param k     待删除的位置
 * @param x     待比较的健
 * @param array 堆数组
 * @param n     堆的大小
 */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];(5)
                int right = child + 1;(6)
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)(8)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;(9)
        }
    }

下面用图说话模拟下过程:

  • 第一次调用poll()
    首先执行(1) result=1;然后执行(2)x=2;这时候队列状态
image.png

然后执行(3)后状态为:


image.png

执行(4)后的结果:

image.png

下面重点说说siftDownComparable这个屌屌的建立最小堆的算法:
首先说下思想,其中k一开始为0,x为数组里面最后一个元素,由于第0个元素为树根,被出队时候要被搞掉,所以建堆要从它的左右孩子节点找一个最小的值来当树根,子树根被搞掉后,会找子树的左右孩子最小的元素来代替,直到树节点为止,还不明白,没关系,看图说话:
假如当前队列元素:

image.png

那么对于树为:

image.png

这时候如果调用了poll();那么result=2;x=11;现在树为:


image.png

然后看leftChildVal = 4;rightChildVal = 6; 4<6;所以c=4;也就是获取根节点的左右孩子值小的那一个; 然后看11>4也就是key>c;然后把c放入树根,现在树为:


image.png

然后看根的左边孩子4为根的子树我们要为这个字树找一个根节点
看leftChildVal = 8;rightChildVal = 10; 8<10;所以c=8;也就是获取根节点的左右孩子值小的那一个; 然后看11>8也就是key>c;然后把c放入树根,现在树为:

image.png

这时候k=3;half=3所以推出循环,执行(9)后结果为:


image.png

这时候队列为:


image.png

五、 put操作

内部调用的offer,由于是无界队列,所以不需要阻塞

public void put(E e) {
    offer(e); // never need to block
}

六、 take操作

获取队列头元素,如果队列为空则阻塞。


public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

这里是阻塞实现,阻塞后直到入队操作调用notEmpty.signal 才会返回。

七、 size操作

获取队列元个数,由于加了独占锁所以返回结果是精确的

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

八、总结

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素而不是在队列里面停留时间最长的原始,并且可以定制优先级的规则,内部通过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,当当前元素个数>=最大容量时候会通过算法扩容,当队列任务里面的任务由优先级时候本队列比较实用。

PriorityBlockingQueue类似于ArrayBlockingQueue内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队,另外前者只使用了一个notEmpty条件变量而没有notFull这是因为前者是无界队列,当put时候永远不会处于await所以也不需要被唤醒,并且take方法由于是阻塞方法,所以是可被中断的,其他方法对中断标志不理会。

值得注意的是为了避免在扩容操作时候其他线程不能进行出队操作,实现上使用了先释放锁,然后通过cas保证同时只有一个线程可以扩容成功。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容