一、ReentrantLock与AQS简介
在Java5.0之前,在协调对共享对象的访问时可以使用的机制只有synchronized和volatile。Java5.0增加了一种新的机制:ReentrantLock。ReentrantLock并不是一种替代内置加锁的方法,而是作为一种可选择的高级功能。ReentrantLock实现了Lock接口,提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁方法都是显式的。
我们基本不会直接使用AQS,AQS是一个构建锁和同步器的框架,许多同步器都可以通过AQS很容易高效的构造出来,基本能够满足绝大多数情况的需求。不仅ReentrantLock,Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask也是基于AQS构建的。AQS解决了实现同步器的大量细节,等待线程采用FIFO队列操作顺序;还负责管理同步器类中的状态 ,可以通过getState,setState以及compareAndSetState方法来操作。
二、ReentrantLock使用示例
public class ReentrantLockTest1 {
public static ReentrantLock lock = new ReentrantLock();
int count = 0;
public void run (){
lock.lock();//加锁
try {
for(int i = 0; i < 1000; i++){
count++;
}
} finally {
lock.unlock();//释放锁
}
}
}
以上代码通过lock来实现 count++的原子操作,lock.lock()用来获取锁,lock.unlock()用来释放锁。那么多线程下,如何保证同步操作?如何释放锁?如何判断没有竞争到锁的线程处于等待状态?什么时候唤醒等待线程?
三、ReentrantLock同步类实现(以下为核心代码摘录)
ReentrantLock是独占锁,有公平锁和非公平锁的策略实现。贴上ReentrantLock的源码来看下,具体看注释吧。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync; //同步器提供所有的实现机制,属于内部类,供内部调用
/**
* 自定义一个同步器类,从它派生一个公平和非公平版本。其中state为持有锁的次数。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {//这就是传说的AQS,下面会详细提到
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();//供外部类实现
/**
* 获取非公平锁,供tryAcquire调用
* 如果锁没有被其他线程持有,则获取锁并立即返回将锁同步状态state设置为1。
* 如果当前线程持有锁,则同步状态state +1且立即返回。
* 如果锁由另一个线程持有,则当前线程成为禁用线程调度目的和处于休眠状态,直到获得锁,此时锁持有计数设置为1
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//没有线程持有锁时
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);//设置为独占资源
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//当前线程持有锁时
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//设置同步状态值state
return true;
}
return false;
}
/**
* Sync重写AQS tryRelease方法
* 如果当前线程是锁的持有者,则同步状态state -1,如果state=0,锁释放;
* 如果当前线程不是锁的持有者,抛出异常IllegalMonitorStateException
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//当前线程是否独占资源
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
......
}
//-----以下方法的实现都依赖继承于同步器Sync的实现!-----------
/**
* 非公平锁对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//如果锁没有被其他线程持有,则获取锁并立即返回将锁同步状态state设置为1。
final void lock() {
if (compareAndSetState(0, 1)) //state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
else
acquire(1);
}
//非公平锁版本下重写AQS的tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* 公平锁版本下重写AQS的tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 创建ReentrantLock非公平或公平锁实例
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
* 获取锁方法,供外部服务调用
*/
public void lock() {
sync.lock();
}
/**
* 尝试释放锁,供外部服务调用
*/
public void unlock() {
sync.release(1);
}
/**
* 是否有线程持有该锁,有返回true,无则返回false,供外部服务调用
*/
public boolean isLocked() {
return sync.isLocked();
}
......
}
ReentrantLock的lock、unlock、isLocked方法在实现时要依赖自定义的Sync内部类(Sync依赖于AQS),Sync类提供了基本的同步器机制的实现,比如加锁方法nonfairTryAcquire和解锁方法tryRelease,还派生出两个公平策略的类如FairSync和NonfairSync。在sync通过实现state的值来判断是否可以允许获取和释放,基本通过getState,setState以及compareAndSetState来操作的同步器的状态。至于线程的排队、等待、唤醒等,AQS都已经实现好了。
通过这小节我们知道同步器是基于AQS构建的,实现了AQS最基本的操作包括各种形式的获取操作和释放操作。根据同步器不同,AQS获取操作可以是一种独占操作(ReentrantLock),也可以是非独占操作(Semaphore、CountDownLatch)。如果同步器支持独占获取操作,那么需要实现一些保护方法:tryAcquire、tryRelease和isHeldExclusively等,而对于共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等方法。那么AQS如何做进行线程排队、等待、唤醒呢?
四、AQS源码解析
AQS解决了实现同步器的大量细节,等待线程采用FIFO双向队列操作顺序。本节摘录AQS独占模式下的两个入口方法来展开:acquire(获取锁)和release(释放锁)
1.节点node
AQS内部维护了一个FIFO双向等待队列结构,head是队列头节点,tail是队列尾节点。双向链表结构里有两个指针,pre指针指向前任节点,next指针指向后继节点。当线程竞争失败时会创建一个包括此线程的Node节点加入等待队列尾部。等待队列如下图:
2.加锁
4.2.1 acquire方法
acquire是独占模式下获取锁的入口方法,acquire源码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire方法流程:
- 1、tryAcquire在第3节提到,如果没有线程持有锁或者是当前线程持有锁时,会获取资源直接返回;
- 2、如果锁被其他线程持有,会调用addWaiter方法构造Node节点加入等待队列尾部;
- 3、acquireQueued方法来以独占的不可中断的方式获取已经在队列中的线程,如果自旋过程被中断过,返回true,否则返回false;
- 4、如果自旋过程中被中断过,acquireQueued方法不会中止,会在最终获取线程后,再调用selfInterrput(),中断当前线程;
- 5、如果在获取队列中的线程的过程中出现异常且获取锁失败时,会取消线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。
下面详细挖下acquire方法里的几个方法:
4.2.1.1 addWaiter
//为当前线程以给定模式创建排队节点。Node.EXCLUSIVE为独占模式, Node.SHARED共享模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 直接入队尾,这种情况表明队列中已有等待的节点
Node pred = tail;
if (pred != null) {
node.prev = pred;//把当前节点前任节点指向队尾节点
if (compareAndSetTail(pred, node)) {//cas添加到同步队列
pred.next = node;//把旧队尾的后置节点指向当前节点,此时当前节点变成新的队尾节点
return node;
}
}
enq(node);//tail为null时,走enq方法初始化队列(刚开始)或者添加到队尾
return node;
}
4.2.1.2 enq
private Node enq(final Node node) {
for (;;) {//坚持不断的自旋,直到加入队列
Node t = tail;
if (t == null) { //刚开始队列为空,设置新节点为head节点,并把tail节点也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常加入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.2.1.3 acquireQueued
通过addWaiter设置好队列后,下一步就看线程如何获取锁了。acquireQueued方法就是以独占的不可中断的方式获取已经在队列中的线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标记是否成功获取锁
try {
boolean interrupted = false;//标记是否中断过
for (;;) {//自旋获得锁
final Node p = node.predecessor();//获取前任节点
if (p == head && tryAcquire(arg)) {//节点为head时才有资格去获取锁
setHead(node);//旧head节点已经拿到锁,把当前节点高为新head
p.next = null; // 方便原head节点GC(setHead中已经把node.prev=null,这标志着没有指针指向head了,而head也没有后继指针了,可以回收了)
failed = false;//成功获得锁
return interrupted;
}
//如果没有拿到锁,根据前任节点waitStatus状态判断是否需要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//执行挂起,判断当前线程是否有中断标志
interrupted = true;
}
} finally {//如果在获取队列中的线程的过程中出现异常且锁获取失败,会取消线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)
if (failed)
cancelAcquire(node);
}
}
以上代码显示出,只有head节点有资格去获取锁,获取锁成功后会把当前节点设置为新head,同时让GC回收旧的head节点对象 。如果在获取队列中的线程的过程中出现异常且锁获取失败,会取消线程获取操作(把该线程节点的waitStatus标记为CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。那如果不是头节点或者别的线程仍然没有释放锁怎么办呢?还需看shouldParkAfterFailedAcquire方法和parkAndCheckIntrrupt方法。
4.2.1.4 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前任节点状态
if (ws == Node.SIGNAL)//如果是SIGNAL状态,可以正常阻塞了,因为当前节点知道会被正常通知/唤醒
return true;
if (ws > 0) {//前任状态为CANCELLED时(只有CANCELLED状态>0)
do {//无限次循环,直到找到正常的前任节点状态后跳出循环
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前任节点为0 or PROPAGATE状态,设置前任为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前任结点的状态不是SIGNAL,那么必须保证前任为SIGNAL,以便自己被正常唤醒。
4.2.1.5 parkAndCheckInterrupt
LockSupport.part(this)将线程挂起到waittng状态,真正进入阻塞状态,它需要uppark()、interrupt()方法来中断它,以此实例FIFO队列阻塞操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//进入阻塞状态
return Thread.interrupted();//当前线程是否被中断,Thread.interrupted()会清除当前线程的中断标记位(如果当前线程已中断,第一次调用这个方法的返回值是true,第二次调用这个方法的返回值为false)
}
看完shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),再回答上面acquireQueued方法后来提出的问题(如果不是头节点或者别的线程仍然没有释放锁怎么办呢?):没有获得锁的线程会去查看下自己的前任节点是否是SIGNAL状态,在保证前任节点为SIGNAL状态下,自己真正进入到waiting状态,准备被唤醒。唤醒的方法可以是uppark()、interrupt(),唤醒后再次尝试获取锁,以此类推,直到获取锁为止。获取锁成功后会把当前节点设置为新head,同时让GC回收旧的head节点对象。在此过程中还会判断线程有没有被中断过,如果被中断过一次,会在acquire方法中自我中断(锁释放时会中断线程,中断的操作比较复杂,请另行参考线程中断的文章)。
3.释放锁
4.3.1 release方法
上面探索了如何加锁及队列阻塞的操作,下面再看看如何释放锁。如上第二节中锁释放调用unlock(),这个方法会进入AQS的release方法。AQS release是独占模式下释放锁的入口方法,release源码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {//如果state=0时,可以释放锁
Node h = head;
if (h != null && h.waitStatus != 0)//waitStatus为0情况,只有在新建节点时才会初始化为0,而新建的节点没有后继节点,不需要执行唤醒
unparkSuccessor(h);//唤醒等待队列里的下个线程
return true;
}
return false;
}
release方法流程:
- 1、tryRelease在第3节提到,如果当前线程是锁的持有者,则同步状态state -1,如果state=0,锁才会释放;如果当前线程不是锁的持有者,抛出异常IllegalMonitorStateException
- 2、unparkSuccessor方法用于唤醒等待队列中下一个线程。
- 3、为什么h.waitStatus !=0时才会执行唤醒unparkSuccessor方法呢?head节点的waitStatus=0会怎么样?waitStatus为0,只有在addWaiter方法中新建节点时才会初始化为0,其他节点已经在shouldParkAfterFailedAcquire方法里调整过了(参考4.2.1.4,那时head的waitStatus=SIGNAL)。而新建的节点没有后继节点,也就不需要唤醒了。
4.3.1.1 unparkSuccessor方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//ws小于0,将设置头节点watistatus为0
Node s = node.next;//唤醒对象为后继节点
if (s == null || s.waitStatus > 0) {//后继节点为空或取消场景
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//从队尾向前找waitstatus <= 0的节点
if (t.waitStatus <= 0)//注意此处循环没有return,会一直往前找,找到离head最近的节点
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒后继节点
}
unparkSuccessor首先会唤醒head的下一个节点,如果将被唤醒节点不为空,则直接通过unpart()方法来释放挂起的线程;如果head后继节点为空,则从后向前找,最后找到离head最近的节点。再结合acquireQueued()方法进入if (p == head && tryAcquire(arg))的判断来获取锁。由于拿不到锁的线程都会自旋,所以不用担心进不了这个判断if (p == head && tryAcquire(arg)),直到拿到锁结束。
到这里可能会有疑问,为什么后继节点要从队尾开始向前找呢,从前向后找不是更快吗?也就是说为什么循环中是t=tail & t=t.prev(反向),而不是t=head & t=t.next(正向)?返回到新建等待节点addWaiter方法看到,compareAndSetTail方法把新建的node添加到同步队列前已经设置了节点的prev,而前任节点的next指向新节点是在添加到队列之后。这就说明在unparkSuccessor方法来唤醒时,可能队尾节点还没来得及执行pred.next=node;这句话,t.next正向找后继节点就会漏掉这个新加的节点,用t.prev反向找更可靠。
if (pred != null) {
node.prev = pred;//把当前节点前任节点指向队尾节点
if (compareAndSetTail(pred, node)) {//cas添加到同步队列
pred.next = node;//把旧队尾的后置节点指向当前节点,此时当前节点变成新的队尾节点
return node;
}
}
五、ReentrantLock独占模式整体流程图
以上时整个独占模式下锁获取和释放的流程图,供自己参考。
总结
1、本文通过源码追踪的方式解析了自己对ReentrantLock和AQS的理解。ReentrantLock是AQS的实现。AQS设计节点waitStatus、队列状态status等状态属性,维护FIFO队列对线程节点进行阻塞以及独占锁策略的设计等诸多细节。
2、ReetrantLock通过自定义同步器并通过state的值来判断是否可以允许获取锁、释放锁或进入队列。
3、AQS队列中又通过waitStatus状态判断是否线程进入阻塞状态和唤醒线程。
4、AQS维护了一个同步队列,没有获取锁的线程会进行同步队列且会不停的自旋直到该线程的节点成为头部节点且获得了锁才停止自旋。
5、释放锁时会唤醒头节点的后继节点。
最后
欢迎关注公众号:前程有光,领取一线大厂Java面试题总结+各知识点学习思维导+一份300页pdf文档的Java核心知识点总结!