基础概念
Queue:基本上,队列就是一个满足先进先出(FIFO)原则的数据结构。
一般来说适用于缓冲、并发访问的场景,之前Volley的分析中就大量用到了queue。
首先讲一下Queue接口,再针对常用的子类去分析具体实现。看一下Queue提供的方法:
可以看到Queue提供了三组方法,分别是添加、移除、查询操作,前一组方法在出现错误时抛出异常,后面一组会有一个返回值。这就完成了对queue的基本定义。
上图是Queue接口的继承关系图,下面我们看一下BlockingQueue接口提供的方法:
从代码中可以看到,BlockingQueue提供的方法都会抛出一个InterruptedException异常,完成了对阻塞队列的定义。
再针对Deque做一个介绍,Dequeue是Queue的子接口,是双向队列支持从两个端点检索和插入元素,因此Deque既可以支持FIFO形式也可以支持LIFO形式:
有了上面的这些概念之后,下面我会把这些集合分成四类进行分析。
没实现阻塞接口的Queue
内置的不阻塞队列:PriorityQueue 和 ConcurrentLinkedQueue。
PriorityQueue
先说一下PriorityQueue的特性,该队列是用数组实现,在不指定大小的情况下默认为11,数组大小可以动态增加,不是线程安全的,如果要保证线程安全需要使用PriorityBlockingQueue。不允许使用null元素,插入方法(offer()、poll()、remove() 、add()方法)时间复杂度为O(log(n)),remove(Object)和contains(Object)时间复杂度为O(n)。
检索方法(peek、element 和 size)时间复杂度为常量。
PriorityQueue实质上是一个最小堆(在没指定Comparator的情况下),对元素采用的是堆排序,头是按指定排序方式的最小元素,堆排序只能保证根是最大(最小),整个堆并不是有序的。也不满足先进先出的定义,具体我们可以在他插入元素或者移除元素的操作中看出来,下面分析一下插入的流程:
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
这个实际上就是堆排序的实现过程,queue[0]为根节点,queue[1]、queue[2]为左右节点,queue[3]、queue[4]为queue[1]左右节点,以此类推。关于堆排序处理的具体分析可以参考这篇博客:https://blog.csdn.net/qq_21492635/article/details/73105580
至于队列出栈的时候,输出的是有序的队列,具体可以查看poll方法,输出queue[0]后,siftDownComparable方法会对堆进行重新排序,具体规则是取队尾的元素与新的根节点元素进行比较然后依次的往下面的节点对比,如果原队尾节点小于或等于对比节点,则替换位置,否则继续循环左边的树。因为最小堆只保证子节点比父节点要小,但是两个子节点是没有必然关系的,而这个siftDownComparable方法能够保证输出时的有序。
上面提到了PriorityQueue是会自动扩容的,这里就讲一下PriorityQueue的扩容算法,重点都已经注释:
private void grow(int minCapacity) {
int oldCapacity = queue.length;
//如果原来的容器大小小于64,新的容器将会是2*oldCapacity + 2,
//否则,将会扩容50%,新的大小是1.5*oldCapacity
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
//这里会校验一下传入的参数,如果需要扩容的大小比Integer.MAX_VALUE-8还要大,会直接返回 //Integer.MAX_VALUE否则还是返回Integer.MAX_VALUE-8
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
//这里会对数组进行复制 返回新的数组
queue = Arrays.copyOf(queue, newCapacity);
}
所以从上面扩容的操作来看,我们初始化容器的时候应该定义一个合适范围足够大的数,这样可以避免频繁的进行扩容操作,牺牲空间提高性能。
ConcurrentLinkedQueue ConcurrentLinkedQueue是一个基于链表的无界非阻塞队列,并且是线程安全的,它采用的是先进先出的规则,当我们增加一个元素时,它会添加到队列的末尾,当我们取一个元素时,它会返回一个队列头部的元素。
这里需要提到一下实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,阻塞队列就是通过使用加锁的阻塞算法实现的;另一种非阻塞的实现方式则可以使用循环CAS(比较并交换)的方式来实现。而现在讲的ConcurrentLinkedQueue就是使用CAS算法实现的,非阻塞线程安全队列。
首先从ConcurrentLinkedQueue的构造方法看起,它有两个构造方法,一个无参一个参数为集合类。前者会创建空的head、tail,后者会根据数据构造一个链表结构,具体存放数据的是Node,包含了一个数据域item和next指针:
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
而这其中具体的操作是由sun.misc.Unsafe完成的,这是SUN未开源的类,能提供相当强大的操作,能够直接操作内存,但是又不提供专业的文档,sun本身就不推荐使用这个类。一般应用级的代码都不应该使用它,框架级别的,用这个的很多,在这里我们只是简单介绍一下。
在讲ConcurrentLinkedQueue的插入、查询之前,还需要了解CAS算法,这个是java.util.concurrent包的基础。
CAS
CAS:Compare and Swap, 翻译成比较并交换。
java.util.concurrent包中借助CAS实现了区别于synchronouse同步锁的一种乐观锁(悲观锁:数据被事物处理持保守态度,在整个数据处理的过程中将数据出于锁定状态;乐观锁:在修改数据提交的时候对数据进行冲突检测,如果冲突则让用户处理,常见的是对数据添加版本号冲突重试这样的方式实现,总而言之在对于频繁读的情况下使用乐观锁,写则使用悲观锁https://blog.csdn.net/feeltouch/article/details/78330797)。
CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
有了这些概念后我们看一下插入方法:
public boolean offer(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));
//初始化t、p为队尾元素
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
//如果p为最后一个节点
// p is last node
//这里实际的操作是通过cas操作,将p的next更新为newNode
if (casNext(p, null, newNode)) {
//成功的CAS操作会让e成为队列中的元素
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
//每操作两次会更新队尾,具体原因会在下面分析
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
//CPS竞争失败 重试
}
//以下两段代码 在底下具体分析
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
//每两次操作 更新
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
关于插入操作的CAS操作部分比较好理解,就是队尾的next为空时更新,但是另外两个判断分支第一次看会有点难以理解。首先要明白一个概念,就是tail是延迟刷新的,我们模拟一下offer(1)的操作,只会走到CAS操作,这里tail是没有被更新的,也就是说就算添加了第一个元素,head和tail还是同一个对象。具体情况如图:
当插入第二个元素时,此时p还是head所以会走这个代码分支:
p = (p != t && t != (t = tail)) ? t : q;
如果在单线程情况下这里显然p==t,但是!=不是原子操作,它是可以被中断的。在执行“!=”时,会先取到左边t的值,再去执行t=tail,如果并发环境下,可能存在获取到了左边的t后,tail的值被更新,这样就会存在t!=t的情况存在。这种情况下,我们应该更新p为链表尾部,如果没有的话我们列表的尾部就指向p。这段代码也是一样:
p = (t != (t = tail)) ? t : head;
如果出现队尾被修改,则更新p,如果没有则返回head,从头部开始查找。至于何时p==q,这就是碰到了哨兵节点(next指向自己)的情况,主要表示要删除的节点或者空节点,当遇到哨兵节点时,无法通过next取得后续的节点,这个时候很可能直接返回head,让链表从头部开始遍历。也有可能返回t,这样就进行了一次“打赌”,使用新的tail作为链表尾部(即t),这样避免了重新查找tail的开销。
具体情况我们可以设想一下,实际上开始循环时,p和tail内存指向是同一个对象,并发环境下,在运行
Node<E> q = p.next;
这段代码之前,如果其他线程完成了offer操作,且tail已经被更新,那这个时候是能拿到p.next的,如果执行到
else if (p == q)
这个判断时,tail又被其他线程又完成了offer操作,这个时候tail可能会与q是同一个对象。
下面说下poll方法,poll方法的作用就是删除头结点,返回被删除的值。
poll的实现原理其实跟offer比较类似,具体细节参照offer理解
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//如果头结点的item不为空 且 CAS操作成功 则返回item
if (item != null && casItem(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
总结一下,ConcurrentLinkedQueue的线程安全是通过其插入、删除时采取CAS操作来保证的。不会出现同一个tail结点的next指针被多个同时插入的结点所抢夺的情况出现。
实现阻塞接口的Queue
由于实现了阻塞接口的队列核心在于ReentrantLock,所以我们现需要对ReentrantLock进行了解。
ReentrantLock
首先看一下ReentrantLock类有哪些东西:
ReentrantLock中定义了两种锁,NonfairSync、FairSync,一个是非公平锁,一个是公平锁。而两种锁的区别在于:
1、公平锁能保证:老的线程排队使用锁,新线程仍然排队使用锁。
2、非公平锁保证:老的线程排队使用锁,但是无法保证新线程抢占已经在排队的线程的锁。
具体实现如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
通过阅读代码可以明显发现其中不同,lock时,非公平锁会先去尝试直接CAS操作修改状态抢锁失败后在nonfairTryAcquire()方法中还会再次尝试一次。而公平锁会去优先通过hasQueuedPredecessors()方法判断队列中是否有等待的队列(是否有前驱节点),当没有前驱节点的时候才会通过CAS修改同步状态变量。至于另一个if分支,其实是实现一个重入锁机制,在锁状态为0时,重新尝试获取锁。如果已经被占用,那么做一次是否当前线程为占用锁的线程的判断,如果是一样的那么进行计数,当然在锁的relase过程中会进行递减,保证锁的正常释放。
AQS中的方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里的addWaiter()方法实际上就是构造一个node,把当前线程关联起来,添加到队尾。这里的重点在于acquireQueued():
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
这个方法实现了Node节点线程的自旋,主要是检查当前节点是不是head的next节点,如果是的话就尝试获取锁并返回。
其实需要里的是shouldParkAfterFailedAcquire()方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
现在简要的分析一下shouldParkAfterFailedAcquire流程,首先判断node节点的前驱节点pre当前状态,如果为SIGNAL,则代表已经park了(park是通过LockSupport类对线程进行阻塞)。如果状态值大于0则代表此节点的线程已经cancel,这个时候会进入循环,不断的把节点前移,丢弃掉cancel状态的node。如果node状态值=<0,则将该node状态值置为SIGNAL。
由于调用这个方法的是一个死循环,所以这个方法最终会返回true,如果在这个循环过程中一直没有获取锁成功,之后就会调用parkAndCheckInterrupt对线程进行阻塞,然后返回线程的阻塞状态。
这样设计就避免了在争夺锁时候不断的循环检查浪费资源。
下面看一下unlock方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease会获取当前的state减去1,这里我们上面也讲过了ReentrantLock是有重入机制的,所以当state - 1直到等于0的时候,才去释放锁。然后这个时候获取head,不为空且状态不为0时调用unparkSuccessor:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这里会把头节点的状态置为0(这个时候非公平锁在有新进程的情况下是有机会获取到锁的),然后获取next节点判断其合法性,如果s是正常则执行unpark操作,对应之前我们的park,使线程继续执行。如果next节点不合规,则从队列的尾部向前遍历,找到合规的节点置为s。
Condition
由于阻塞队列中Lock是和Condition一起使用完成读写过程的,所以一并讲一下Condition。
我们知道在Object类中提供了wait、notify方法用于线程的阻塞、唤醒。而Condition是更加灵活可定时,可轮询,可中断,公平队列,及非块结构的锁。具体的用法大家在源码中也可以看到,这里不再赘述。
至此ReentrantLock部分已经讲完,ReentrantLock具有公平锁和非公平锁两种模式,公平锁是按照FIFO的模式进行锁的竞争,而非公平锁是无序竞争,刚释放锁的时候新线程是能较快获取到锁的,但是队列中的线程只能等待。
但是通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。所以非公平锁的吞吐量是大于公平锁的,这也是为什么JDK将非公平锁作为默认的实现。
PriorityBlockingQueue
PriorityBlockingQueue这个跟我们上面所讲的PriorityQueue是比较类似的,之前在分析Volley的文章中也有提到过,所以这里不再做详解。
最大的不同之处在于有一个ReentrantLock,在对数据进行操作时会有加锁操作,完成后释放。所以理解PriorityBlockingQueue的关键在于理解ReentrantLock。其中使用Condition,明确的指定唤醒读线程。
ArrayBlockingQueue
这个逻辑比较简单,内部维护的是一个数组,在初始化队列的时候需要传入容器的size,这里也不赘述了。
Dequeue
LinkedList
待更新
LinkedBlockingDeque
待更新
参考:
https://www.cnblogs.com/gnivor/p/4841191.html
http://www.cnblogs.com/Joe-Go/p/9757394.html
http://www.cnblogs.com/zhimingyang/p/5702752.html
https://www.cnblogs.com/xrq730/p/4979021.html
https://www.cnblogs.com/yulinfeng/p/6899316.html