一.简介
- AbstractQueuedSynchronizer是并发类诸如ReentrantLock、CountDownLatch、Semphore的核心
- CAS算法是AbstractQueuedSynchronizer的核心
二.结构
//阻塞队列头
private transient volatile Node head;
//阻塞队列尾
private transient volatile Node tail;
//锁的参数,默认为0,不为零则已加锁
private volatile int state;
static final class Node {
//共享锁
static final Node SHARED = new Node();
//独占锁
static final Node EXCLUSIVE = null;
//Node的四种状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//Node的四种状态
volatile int waitStatus;
//Node的前后节点
volatile Node prev;
volatile Node next;
Node nextWaiter;
}
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
}
* 供子类实现方法
三.独占模式
- 1.试图获取锁(tryAcquire)若失败,则进入acquireQueued方法获取阻塞队列
- 2.获取阻塞队列首先将该线程加入该阻塞队列(addWaiter):阻塞队列head没有线程,是个空节点,新加入的线程排在队尾
- 3.队列构建好了,下一步就是在必要的时候从队列里面拿出一个Node了(acquireQueued)
- 独占模式取队列首先获得当前Node的pre节点,只有pre节点为head节点才获取再次尝试获取锁(tryAcquire),若此时获取锁失败或pre节点不为head节点时,让当前线程阻塞。
private final boolean parkAndCheckInterrupt() {
//阻塞该线程
LockSupport.park(this);
return Thread.interrupted();
}
- 释放锁流程:
- 如果tryRelease(arg)后state=0,则释放锁
1 public final boolean release(int arg) {
2 if (tryRelease(arg)) {
3 Node h = head;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);
6 return true;
7 }
8 return false;
9 }
四.共享模式
- 共享模式下的acquire和独占模式下的acquire大部分逻辑差不多,最大的差别在于tryAcquireShared成功之后,独占模式的acquire是直接将当前节点设置为head节点即可,共享模式会执行setHeadAndPropagate方法,独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if( proagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
五.Condition通知/等待队列
- 1构建通知/等待队列(addConditionWaiter()):
先拿到队列(Condition构建出来的也是一个队列)中最后一个等待者lastWaiter,如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node;如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node;无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面。与阻塞对列相比,head不为null。new出来的Node的状态都是CONDITION。
- 2.释放Node的状态(fullyRelease),将state变为0。
- 3.判断Node是否在AbstractQueuedSynchronizer构建的队列中,不在则用阻塞该线程LockSupport.park(this)
- 4.要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么。
-
signal()实现原理
- 将Condition队列变为阻塞对列(AQS对列)
- 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
- 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列,当前节点通过CAS机制将waitStatus置为SIGNAL
- 某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
参考:再谈AbstractQueuedSynchronizer2:共享模式与基于Condition的等待/通知机制实现