测试代码
public class ReentrantLockTest {
public static void main(String[] args) throws InterruptedException {
//默认为非公平锁,有参构造函数 new ReentrantLock (true)为公平锁
ReentrantLock lock = new ReentrantLock ();
for (int i = 0; i < 10; i++) {
new Thread(()->{
lock.lock ();
try {
Thread.sleep (1000);
} catch (InterruptedException e) {
e.printStackTrace ();
}
System.out.println ("ReentrantLock-"+Thread.currentThread ().getName ());
lock.unlock ();
}).start ();
}
Thread.sleep (11000);
System.out.println ("主线程结束");
}
}
运行的结果 每隔一秒打印一次
ReentrantLock-Thread-0
ReentrantLock-Thread-1
ReentrantLock-Thread-2
ReentrantLock-Thread-3
ReentrantLock-Thread-4
ReentrantLock-Thread-5
ReentrantLock-Thread-6
ReentrantLock-Thread-7
ReentrantLock-Thread-8
ReentrantLock-Thread-9
主线程结束
说到ReentrantLock,就要提及重要的概念AQS
AbstractQueuedSynchronizer类,其中成员变量有
CLH队列的头
private transient volatile Node head;
CLH队列的尾
private transient volatile Node tail;
哪个线程可以将state利用cas将0赋值为1证明可以获取锁,如果是锁的重入,那么state就不为0,
判断如果exclusiveOwnerThread为当前线程,如果是锁重入,state+1 依然可以获取锁,在释放
锁的时候,state-1直到为0
private volatile int state;
还有父类的
private transient Thread exclusiveOwnerThread;
CLH(同步)队列就是AbstractQueuedSynchronizer静态内部类Node组成的链
ReentrantLock lock = new ReentrantLock ();
创建lock,无参构造函数为非公平锁
有参构造函数为公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock.lock ();加锁
final void lock() {
//加锁,判断成员变量state是否为0,如果为0拿到锁,将state利用cas赋值为1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState(0,1)
unsafe类利用state的偏移量将state赋值为1(cas比较并交换)后面会大量的用
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
赋值当前的独占线程(exclusiveOwnerThread成员变量)为当前线程,重入锁的时候会用到,当state不为0,但是exclusiveOwnerThread为当前线程判断为重入锁,后面会说到
setExclusiveOwnerThread(Thread.currentThread())方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg) 最终调用非公平锁类NonfairSync的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state状态
int c = getState();
//为0则可以获取锁
if (c == 0) {
//获取锁的逻辑
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
//返回true 因为acquire中tryAcquire(arg)是取反的,并且&&,则直接返回,不向下执行代码,也就是不进入CLH队列
return true;
}
}
//重入锁,没有拿到锁,但是exclusiveOwnerThread为当前线程则代表重入锁
else if (current == getExclusiveOwnerThread()) {
//每重入一次state加1,不需要cas因为同时只能一个线程进来
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//赋值state
setState(nextc);
//返回true,同上
return true;
}
return false;
}
acquire(int arg) 方法中的判断条件 !tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
如果tryAcquire(arg)返回false,因为取反!tryAcquire(arg)为true代表加锁失败,因为用&&连接,则继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,一个一个来看
addWaiter(Node.EXCLUSIVE) Node.EXCLUSIVE为null
这里用到了CLH队列,下面为CLH队列结构
private Node addWaiter(Node mode) {
//获取当前线程mode为null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//tail初始为空
Node pred = tail;
//下面的方法和enq中一致,直接看enq方法(为了效率类似单例懒汉式的双重判断)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
enq(node),下面的代码中 t为null 证明clh队列为空,那大家是不是有个疑问,既然队列为null,为什么在tryAcquire(arg)获取锁的时候失败呢,可能tryAcquire(arg),队列还不为空,但是执行完tryAcquire(arg)这个方法的时候,队列的线程都执行完了,就可能出现tail为null的情况,注意一下这里使用的for空循环,跳出条件是tail不为null
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果为null将head赋值为一个空的Node
if (t == null) { // Must initialize
//这里发现如果clh队列为空的话,该线程就是第一个线程,注意这里是new Node(),代表是一个空的node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//将新加入的线程排队到末尾并赋值给tail,
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
得到的结果,head的node是一个new node的,thread为null,这个空的node代表的就是已经拿到锁的对象,
这句话我在深入的说一下,t1 t2竞争锁,而t1拿到锁,t1就执行自己的逻辑,因为已经拿到锁就不不会执行enq(node);这段代码,那么t2走到enq(node);方法,发现clh队列是空队列,因为就两个线程,那么t2就会执行for循环,发现tail为null,创建一个空node,tail和head同时执行空node,再次for循环,把入参的node放到tail中,这个node的线程就是当前线程,prev并指向上一个空node,那这个空的node,可以理解为正在执行自己逻辑的t1了.
而当前线程排到队尾赋值为tail,waitStatus的值应为初始值0,为什么此时队列中除了tail节点的waitStatus为0,其余的节点的waitStatus为-1 下面会分析什么时候赋值为-1
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//p为当前node 的上一个node
final Node p = node.predecessor();
//如果p为head,尝试获取锁,因为解释enp方法的时候,说过如果上一个node为heap,就代表下一个执行的
//线程可能是该线程,因为可能非公平锁.
//&&连接符,如果上一个节点是heap,可以尝试的获取锁
if (p == head && tryAcquire(arg)) {
//老规矩,获取锁成功,证明当先线程正在执行自己的逻辑,将当前node设置为head,head的perv和thread设置为
//null
setHead(node);
//将原来head的next指向null,GCRoot不可达,下一次被GC回收
p.next = null; // help GC
failed = false;
//返回线程中断信号后面会分析
return interrupted;
}
//如果获取锁失败的话,就会走这段逻辑,还记得上面提到过只有tail的waitStatus为0,其余的node都为-1吗
//就是shouldParkAfterFailedAcquire(p, node)这个方法执行的
//而parkAndCheckInterrupt就是将当前线程挂起,等待被唤醒
//LockSupport.park这种阻塞线程的方式,在唤醒的时候可以指定唤醒哪个线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//在本次lock分析中failed一直为false,则cancelAcquire不会执行
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//默认waitStatus为0,当上一个node为Node.SIGNAL(-1),线程才能返回true
//向下执行parkAndCheckInterrupt()方法
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//每个新加入node都会执行这个方法,waitStatus赋值为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
直至之后队列的变化,除了tail的waitStatus为0其余都为-1,每新加一个node,都将上一个节点的waitStatus赋值为-1
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//利用LockSupport将当先线程阻塞等待被唤醒,队列中的所有线程都会阻塞到此,
//当上一个线程唤醒下一个线程的时候,会继续执行acquireQueued方法的for死循环,尝试获取锁,如果获取不到则依然会阻塞到这段代码,为什么会获取不到锁,是因为非公平锁,下面会分析到
LockSupport.park(this);
//返回当前线程是否中断,执行之后将中断信号赋值为0
return Thread.interrupted();
}
非公平锁的解释
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//被唤醒的线程依旧执行到这段代码
final Node p = node.predecessor();
//如果此时有新线程执行到这段代码并早于被唤醒的线程,那么唤醒的线程会被继续的阻塞
//非公平锁,并不是上一个线程执行结束,所有阻塞线程共同竞争,而是最新的线程和被唤醒的线程争夺锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//当上一个线程执行结束,唤醒下一个线程去执行,则返回到for循环尝试加锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
解锁
lock.unlock()->sync.release(1);
release(1)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//h.watiStatus的状态代表的下个node节点的状态,如果为-1则可以正常运行
if (h != null && h.waitStatus != 0)
//
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg)
protected final boolean tryRelease(int releases) {
//当前state减release 在非重入的时候state为1 则c=0
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c如果等于0 设置exclusiveOwnerThread为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//重置state值,
setState(c);
//如果是可重入锁的话,返回false,当前线程继续执行,也不会唤醒下一个线程
//如果c==0,证明当前线程已经退出锁了,下一个线程就可以运行了,如果释放锁之后当先线程啥也没干
//证明当前线程已经结束了,如果是线程池可能重新放到池子里,也可能就结束了,等待被系统回收掉
//如果还有代码,就继续执行代码,是可以被多个线程同时执行的
return free;
}
UnparkSuccessor
private void unparkSuccessor(Node node) {
//如果为-1则待变下一个节点可以正常运行
int ws = node.waitStatus;
if (ws < 0)
//将head的waitStatus赋值为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//下一个节点,就是将要被唤醒的节点
Node s = node.next;
//如果头部节点的下一个节点waitStatus>0 (可能由于中断引起的),则从tail向前找,最靠前
//的waitStatus为<=0的node节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
关于为什么用for (Node t = tail; t != null && t != node; t = t.prev)这样一个for循环,可以参考这篇文章,主要是enp这个方法不是线程安全的,可能出现下一个节点为null的情况
https://blog.csdn.net/foxException/article/details/108917338
公平锁和非公平锁的区别主要在tryAcquire(arg)方法中判断hasQueuedPredecessors(),当前队列是否有节点,有节点返回true执行入队方法,没有节点就尝试加锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//判断当前队列是否有节点
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}