序言
AQS可以说是JAVA源码中必读源码之一。同时它也是JAVA大厂面试的高频知识点之一。认识并了解它,JAVA初中升高级工程师必备知识点之一。
AQS是AbstractQueuedSynchronizer的简称,它也是JUC包下众多非原生锁实现的核心。
一:AQS基础概况
AQS是基于CLH队列算法改进实现的锁机制。大体逻辑是AQS内部有一个链型队列,队列结点类是AQS的一个内部类Node,形成一个类似如下Sync Queue(记住这个名词)
可以看出,一个Node除了前后结点的索引外,还维护了一个Thread对象,一个int的waitStatus。
Thread对象就是处于竞争队列中的线程对象本身。
waitStatus表示当前竞争结点的状态,这里暂且忽略掉。
处于队首的,即Head所指向的结点,即为获取到锁的结点。释放锁即为出队,后续结点则成为队首,即获取到锁
Tips:这里帮大家理解一个事情,每一个ReentrantLock实例都有且只有一个AQS实例,一个AQS实例维护一个Sync Queue。所以说,当我们的业务代码中的多个线程对同一个ReentrantLock实例进行锁竞争操作时,其实际就是对同一个Sync Queue的队列进行入队、出队操作。
二:AQS调用入口----ReentrantLock
我们在用ReentrantLock时,代码通常如下:
ReentrantLock lock = new ReentrantLock();
Runnable runnable = new Runnable() {
public void run() {
lock.lock();
Sys.out(Thread.currentThread().name() + "抢到锁");
lock.unlock();
}
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t2.start();
t1.start();
可见线程t1,t2竞争lock这个锁。
lock.lock()
抢锁
lock.unlock()
释放锁
来看入口函数lock
public void lock() {
sync.lock();
}
sync
是ReentrantLock
内的一个继承了AQS
的抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
}
抽象类的具体实现是另两个内部类NonFairSync
FairSync
,分别代表非公平锁、公平锁。
我们系统来看下继承图
ReentrantLock中的sync实例,默认是在构造函数中初始化的,
public ReentrantLock() {
sync = new NonfairSync();
}
那我们就看默认的NonFairSync
的实现逻辑。
NonFairSync
当我们调用ReentrantLock.lock()
时,直接调用到的是NonFairSync.lock()
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState(0, 1)
通过CAS(CAS是啥,这里不讲,参考乐观锁。具体Google吧)方式,设置AQS的int state
字段为1。AQS内部就是通过这个state
是否为0来判断当前锁是否已经被线程获取到。
返回true
,则说明获取锁成功,设置当前锁的独占线程setExclusiveOwnerThreaThread.currentThread());
否则,acquire(1)
尝试获d(取锁。这个方法会自旋、阻塞,一直到获取锁成功为止。这里,传进去的参数1,就参考乐观锁的版本字段,同时,这里,它还记录了可重入锁重复获取到锁的次数。只有释放同样次数才能最终释放锁。
具体看AQS的acquire()
的逻辑
AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
要注意,这个方法是忽略线程中断的。
先看tryAcquire(arg)
,这个方法是AQS留给子实现类的口子,具体实现看NonFairSync
,它的实现里直接调用了Sync.nonfairTryAcquire()
,
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到方法内,先是尝试获取state = 0
时的初始锁,如果失败,判断当前锁是否是被当前线程获取,是的话,将acquire
时传入的参数累加到state
字段上。在这里,这个字段就是用来记录重复获取锁的次数。
获取失败则返回false
回到acquire
在获取失败,返回false
后,才会继续调用 &&
右边的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法。
先看addWaiter
private Node addWaiter(Node mode) {
1: Node node = new Node(Thread.currentThread(), mode);
2: // Try the fast path of enq; backup to full enq on failure
3: Node pred = tail;
4: if (pred != null) {
5: node.prev = pred;
6: if (compareAndSetTail(pred, node)) {
7: pred.next = node;
8: return node;
9: }
10: }
11: enq(node);
12: return node;
13: }
利用传进来的Node.EXCLUSIVE
表示的排斥锁参数以及当前线程实例初始化新Node
,第4-10
行代码是在Sync Queue
队列内有竞争线程时进入。为空时会走到enq(node)
,这里是在竞争为空时将竞争线程入队的操作。
然后返回当前竞争的线程node
此时Sync Queue
如图
我们假设node_1是已经获取到锁的结点,node_2即为我们当前操作的结点。
返回acquire
接着看
addWaiter
返回的node
直接作为参数给了acquireQueued
,这个方法就是主要的node
竞争锁方法。
final boolean acquireQueued(final Node node, int arg) {
// 获取锁成功失败标记
boolean failed = true;
try {
// 当前竞争线程的中断标记
boolean interrupted = false;
// 自旋竞争锁,竞争不到锁的话,线程又没有中断
// 则一直在这儿循环
for (;;) {
// 获取当前线程的前驱结点
final Node p = node.predecessor();
// 如果前驱结点是头结点,则尝试去tryAcquire
// 这个逻辑我们之前看过,当前线程未获取到锁的情况下
// 在AQS的state字段不为0时,则返回false
if (p == head && tryAcquire(arg)) {
// 进入到这里,说明要不就是当前线程在重复获取
// 要不就是前边的结点释放锁,state 归0,这里获取到
setHead(node);
p.next = null; // help GC
// 标识竞争锁成功
failed = false;
// 这个方法不响应线程中断,但是会返回线程在竞争锁过程中
// 中断标记返回
return interrupted;
}
// 若获取锁失败,则到这里。这里的逻辑主要在If判断
// 的两个方法中,用来将当前线程挂起的,具体逻辑
// 看下面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
park就是停下的意思。所以这个方法从名字上也比较好理解,就是
挂起线程并且检查线程的中断状态。这里要注意,LockSupport.part(this)
方法是会在线程中断时自动唤醒的
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这个方法传入了当前竞争结点及其前驱结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱结点的等待状态。这里只需要记住,我们这里考虑的是
// 非共享锁、非公平锁的AQS。所以,只需要确保当前竞争
// 结点的前驱结点状态为SIGNAL就好。剩下的状态,
// 与我们此时研究的情况而言没有用
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前驱结点的status为0,则将其改为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
从shouldParkAfterFailedAcquire
可以看出来,在确保前驱结点status为SIGNAL
时,就可以放心的去unsafe.park()
了。之所以要为SIGNAL
,是因为这个状态含义为:当前结点OVER时要唤醒后继结点。
所以不难推出,我们的结点现在就park
在那了。等他前驱结点释放锁,或者自己interrupt来唤醒,但因为这个方法是无视中断的,所以即使interrupt了,只是设置了一个标记位,但仍然在循环中。
这里假设前驱结点获取锁后释放,则当前结点在parkAndCheckInterrupt()
方法中被唤醒,而后再次循环for(;;)
,这次会在第一个if
中就进入,当前结点获取到锁,然后重置Head
指向的结点等,返回当前线程的中断标记。
返回acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
if
中的selfInterrupt()
方法只是去重新设置当前线程的中断标记位。这是因为获取线程中断状态的方法,在返回状态字段的同时,也会重置字段,所以需要标记后重新设置相应的值。
下面我们看下AQS释放锁的接口方法
ReentrantLock.unlock
public void unlock() {
sync.release(1);
}
追进去
ReentrantLock.release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
是在NonFairLock
中的实现的,如果是释放成功,则在Head
存在并且状态不为0(其实可以理解为值为SIGNAL
时)去唤醒Head
的后继结点。
NonFailLock.tryRelease
下面看下NonFairLock
的tryRelease
方法的实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可以看出,这个tryRelease
其实就是去判断下是不是当前线程拥有锁,是的话,判断下当前的释放锁是否完全释放,因为锁可以重复获取,完全释放的话,就设置state
为0,代表AQS的锁已经被释放了。