先推荐篇写AQS的不错的文章:《从ReentrantLock的实现看AQS的原理及应用》、《一文了解AQS(AbstractQueuedSynchronizer)》、《AQS及其组件的核心原理》
AQS 的核心作用是:定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题(线程阻塞等待唤醒的机制,无锁状态加锁,有锁状态将线程放在等待队列排队获取锁),能够极大地减少实现工作;这种有效等待,相比于其他死等待或休眠机制、一方面减少了CPU空耗,另一方面机制能力很强,可以满足非常多并发控制的场景。
//类关系图
//重要的属性
方法和属性值 | 含义 |
---|---|
waitStatus | 当前节点在队列中的状态 |
prev | 前驱指针 |
next | 后继指针 |
thread | 表示处于该节点的线程 |
nextWaiter | 指向下一个处于CONDITION状态的节点 |
predecessor | 返回前驱节点,没有的话抛出npe |
waitStatus有下面几个枚举值:
方法和属性值 | 含义 |
---|---|
CANCELLED | 为1,表示线程获取锁的请求已经取消了 |
SIGNAL | 为-1,表示线程已经准备好了,就等资源释放了 |
CONDITION | 为-2,表示节点在等待队列中,节点线程等待唤醒 |
PROPAGATE | 为-3,当前线程处在SHARED情况下,该字段才会使用 |
0 | 当一个Node被初始化的时候的默认值 |
//非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
//上锁的方法,final表明该方法不能被重写
final void lock() {
//调用aqs的cas方法来修改state,这个为什么是1,重入锁:每次会对state+1
//这儿是直接cas获取,没有判断可重入性,该方法只会适用于首次获取锁,如果获取失败,会走下面的acquire(1)
if (compareAndSetState(0, 1))
//记录那个线程获得了锁,用于后面重入比对线程
setExclusiveOwnerThread(Thread.currentThread());
else
//获取失败会再次尝试获取
acquire(1);
}
//上面的acquire(1)调用的是syn类acquire(int arg)方法,然后又会回调到子类的这个方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
AQS类的方法,该方法非常重要,是独占锁获取锁的顶层入库
public final void acquire(int arg) {
//如上面说的,先调用子类获取,若失败,就会添加到队列
//这里面涉及了四个方法tryAcquire;addWaiter;acquireQueued;selfInterrupt
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
1.tryAcquire,上面说了最终会调用子类的nonfairTryAcquire实现,代码如下(尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);)
//#java.util.concurrent.locks.ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取aqs类的state状态; 这儿有个知识点就是volatile关键字,只>有可见性和禁止重排序,没有原子性,这也是为啥不能保证线程安全的>>原因
int c = getState();
//再次判断state状态,虽然第一次获取失败了,到这儿可能已经被释放了,所以再次判断
if (c == 0) {
//这个if里面的类容和第一次获取锁一致
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//不等于0的时候,说明已经有线程线程持有资源了,进行重入性判断,就是比对线程对象是否相同
else if (current == getExclusiveOwnerThread()) {
//重入对象的计数+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//更新state
setState(nextc);
return true;
}
return false;
}
2.addWaiter方法:将该线程加入等待队列的尾部,并标记为独占模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//将尾结点赋给pred,目的是腾位置,后面方便挂最新的mode到CLH队尾
Node pred = tail;
//说明CLH中有其他node节点在排队,pre == null说明等待队列中没有元素
if (pred != null) {
//把新的node节点的prev指向之前的tail,这样就把新节点的前驱指针挂在之前的队尾了,相当于挂了一半上去了
//最新理解:为啥先挂当前节点的前驱,而不是直接node = tail.next;因为如果要开始就先挂后驱,那就需要把接下来的compareAndSetTail(pred, node)和 pred.next = node;设置为原子操作才行,不然在多线程环境下数据混乱
node.prev = pred;
// 另一半也需要挂上去,需要把当前的队尾更新(即更新tail指针),涉及并发;如果修改失败说明被别的线程修改了;
// 如果当前节点node设置为队尾失败,那就只挂了一半上去,实际上的队尾节点是并发的另一个线程
if (compareAndSetTail(pred, node)) {
// 非原子性操作,由于之前是先挂的前驱指针,所以这句话运行完,当前node就成功入队,且队尾tail为当前节点,这也是为啥后面唤醒node需要从尾部遍历的原因之一
pred.next = node;
return node;
}
}
// 什么时候到这儿,tail节点为空,或者cas设置tail失败
//最新理解:先说问题,为什么要有这个方法:因为挂另一半上去的时候,由于并发失败了,当前节点需要最终挂上去,就通过该方法自旋,不断CAS重试,直到挂上去(即成为尾部节点)
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋,直到入队尾成功
for (;;) {
Node t = tail;
// tail为空,说明当前队列里面没有元素
if (t == null) { // Must initialize
// 初始化的头结点,并不是当前线程节点,而是调用了无参构造函数的节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//该操作和上一个方法一致,加到队尾
node.prev = t;
// 更新tail指针,指向当前node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.acquireQueued:使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false(总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断))
final boolean acquireQueued(final Node node, int arg) {
//是否拿到资源的标记;默认失败
boolean failed = true;
try {
//线程在等待过程中是否中断的标记
boolean interrupted = false;
// 自旋,什么时候自旋结束?“前置节点是头结点,且当前线程获取锁成功”
// 一直自旋会导致cpu资源浪费,所以会对当前节点的前置节点进行状态判断觉得是否要将线程挂起,是否挂起要看shouldParkAfterFailedAcquire方法
for (;;) {
// 获取前驱,该方法里面就是把prev给返回
final Node p = node.predecessor();
// 如果前驱p是head,即该结点在队列中第二位,由于CLH队列的头结点是虚节点,为空,没有实际的线程信息,所以当前node就是正儿八经的首位
//所以跳出循环的条件:前置节点是头结点,且当前线程获取锁成功
if (p == head && tryAcquire(arg)) {
// 获取到资源后,将head指向该结点;该方法里面将node赋值给head,并把该节点的线程和prev置空
// 相当于指针移动到当前node,本质就是将当前节点置为虚节点,把之前的虚节点引用断开)
// 为什么不是直接remove当前节点,而是把head指向当前节点,这个操作的目的是什么
// 1.因为当前节点的里面的waitStatus 后面还有用,如果移除,需要找个地方保存下来;2.如果移除当前节点,需要把head和当前节点的后继连起来,需要变更4个前驱后继的引用,但是直接把当前节点设置成头结点,只有三次。
setHead(node);
p.next = null; // help GC
//改变标志位,成功获取资源
failed = false;
return interrupted;
}
// 执行到这儿说明有两个可能,1:p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)2:p不为头结点;
// 这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。那怎么判断是否需要阻塞呢,就要看下面这个方法了
// parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
4.shouldParkAfterFailedAcquire:靠前驱节点判断当前线程是否应该被阻塞。白话文就是你前面个哥们都在排队等待被唤醒,你不阻塞,在哪儿瞎折腾啥。(ws == Node.SIGNAL(-1))将会被挂起阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的节点状态
int ws = pred.waitStatus;
// 说明前驱结点处于待唤醒状态,当前节点node肯定可以安全的阻塞,所以返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 通过枚举值我们知道waitStatus>0仅仅只有取消状态(该逻辑就是优化队列,剔除取消的节点)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//运行到这里,说明前驱节点处于0、CONDITION或者PROPAGATE状态下
//此时该节点需要被置为SIGNAL状态,等待被唤醒。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
5.取消node节点
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 设置该节点不关联任何线程
node.thread = null;
// Skip cancelled predecessors
// 跳过取消的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 执行到这一步,说明已经优化队列了,把取消的前驱节点清除出队
// 将未取消的前驱节点的后继节点拿到
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
//根据当前节点存在的位置,可能存在三种可能
//1.当前节点是尾节点。
//2. 当前节点是Head的后继节点。
//3. 当前节点不是Head的后继节点,也不是尾节点。
// If we are the tail, remove ourselves.
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
// 将前驱节点的next指向null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 如果前驱节点不是head节点(即node不在正儿八经的首位),1:判断前驱节点是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
//(pred != head)说明前驱不是head,当前要取消的node位于队列中
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 把前驱的preNext指向node的next,这样node就不在队列里面了
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Q:某个线程获取锁失败的后续流程是什么呢?
A:看本文的流程图:尝试CAS再获取一次,失败后自旋到同步队列尾部,然后自旋去获取锁(涉及清除取消的节点,进入阻塞挂起)。Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
A:是CLH变体的FIFO双端队列。Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
A:前置节点为头结点。Q:那获取锁成功之后不是应该把当前节点直接remove移除掉,为什么是把 head 指向成当前节点,这个是什么操作呢?
A:当前节点获取锁成功(不再需要继续for 循环或阻塞等待锁),可以开始执行后续的业务逻辑了。这时候不是将当前节点从队列移除,而是设置为头结点,二方面原因:
因为当前节点的里面的waitStatus 后面还有用,如果移除,需要找个地方保存下来;
如果移除当前节点,需要把head和当前节点的后继连起来,需要变更4个前驱后继的引用,但是直接把当前节点设置成头结点,只有三次。
Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放。
Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?
A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。
Q:shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
A:是在cancelAcquire中产生的;如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
Q:被挂起阻塞的线程是什么时候进行恢复的?
A:首先被唤醒调用的方法是unparkSuccessor(Node node) ;线程唤醒发生在取消请求时cancelAcquire(),或释放锁时对unparkSuccessor()的调用。
Q:唤醒节点的逻辑是从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点,为什么需要从后往前遍历
A:如果是从前往后找,由于极端情况下addWaiter入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。
在enq方法插入新节点时,可能存在旧尾节点的next指针还未指向新节点的情况;
在shouldParkAfterFailedAcquire方法中,当移除CANCEL状态的节点时,也存在next指针还未指向后续节点的情况
Q:对ReentrantLock非公平性的理解
A:首次去获取锁及获取锁失败重试都是非公平的,都会和队列里面的竞争,当加入队列后,相对于整个FIFO队列的node节点来说,又是公平的,但在队列里面获取锁的时候又会同其他线程竞争,又体现了非公平性
Q:ReentrantLock与synchronized的区别
A: ReentrantLock具有以下优势
- ReentrantLock有公平锁
- ReentrantLock尝试加锁,获取锁带超时时间
- ReentrantLock获取锁响应中断
- 底层实现:synchronized 是 JVM层面的锁,是Java关键字,通过monitor对象来完成(monitorenter与monitorexit),ReentrantLock 是从jdk1.5以来(java.util.concurrent.locks.Lock)提供的API层面的锁
- 锁的对象:synchronzied 锁的是对象,锁是保存在对象头里面的,根据对象头数据来标识是否有线程获得锁/争抢锁;ReentrantLock 是根据volatile 变量 state 标识锁的获得/争抢。
- 实现机制:synchronized 的实现涉及到锁的升级,具体为无锁、偏向锁、自旋锁、向内核态申请重量级锁,ReentrantLock实现则是通过利用CAS(CompareAndSwap)自旋机制保证线程操作的原子性和volatile 保证数据可见性以实现锁的功能。
- 释放锁方式:synchronized 不需要用户去手动释放锁,synchronized 代码执行完后系统会自动释放锁;ReentrantLock 需要用户去手动释放锁,如果没有手动释放锁,就可能导致死锁现象。一般通过lock() 和unlock() 方法配合 try/finally 语句块来完成。
- 带条件的锁:synchronized不能绑定条件;ReentrantLock 可以绑定Condition 结合await()/singal() 方法实现线程的精准唤醒,而不是像synchronized通过Object类的wait()/notify()/notifyAll() 方法要么随机唤醒一个线程要么唤醒全部线程。
总结:
ReentrantLock 与 synchronized的区别:
- synchronized 属于非公平锁,ReentrantLock 支持公平锁、非公平锁;
- synchronized 不支持尝试加锁,ReentrantLock 支持尝试加锁,支持带超时时间的尝试加锁;
- synchronized 不支持响应中断的获取锁,ReentrantLock 提供了响应中断的加锁方法;
- synchronized 不支持带条件,ReentrantLock支持;
- 其他底层实现上、实现机制上、锁的对象上、释放锁的方式上也有区别。