AQS(AbstractQueuedSynchronizer)
node
node是等待队列(双向队列)的节点分别有tail与head,变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。
- CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
- SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
- CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
独占
- acquire
如果获取资源直接返回,如果没有则加入等待队列尾部,并进行获取位置,并挂起waiting,直至获取到资源
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire
需要自行实现的方法,尝试获取锁 - addWaiter
把该线程加入等待队列tail,并标记为独占模式 - acquireQueued
使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; //默认是没有中断
for (;;) {//自旋
final Node p = node.predecessor();//获取当前节点的前一个节点
if (p == head && tryAcquire(arg)) {//如果当前节点是第二个节点,那么尝试去获取资源,返回成功标志
setHead(node);//设置当前节点为head
p.next = null; // help GC
failed = false;
return interrupted;
}
//检查状态是否已经排到与挂起,如果没有则进行挂起操作,此处会挂起waiting
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//没有挂起,并且当前状态是interrupted为true
}
} finally {
if (failed)//如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//等待前节点signal通知,不需要做处理,正在park也不需要再次park
return true;
if (ws > 0) {
//前节点没用了,那么就继续找正常节点,然后排在它后面
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//设置前一个节点为signal,释放资源的时候通知
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//park当前线程
return Thread.interrupted();//返回当前线程是否中断状态
}
- selfInterrupt
中断自己
- release
尝试释放资源,如果释放成功则唤醒后续节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //其实就是当前节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//判断当前节点可用,修改当前节点状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//获取下一个节点
if (s == null || s.waitStatus > 0) {//如果取消了,比如timeout
s = null;
//从最后一个开始判断是否可用
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒线程
}
共享
- acquireShared
尝试获取资源,如果获取不到资源则添加到等待队列
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//添加到tail为共享模式
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- releaseShared