示例小demo
public class reentrantlockTest {
private ReentrantLock lock=new ReentrantLock();
public void test1(){
try {
System.out.println("test1开始等待获取锁");
lock.lock();
System.out.println("test1已经获取锁");
Thread.sleep(4000);
}catch (InterruptedException e) {
Thread.interrupted();
} finally {
lock.unlock();
}
}
public void test2(){
try {
System.out.println("test2开始等待获取锁");
lock.lock();
System.out.println("test2开始已经获取锁");
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
final reentrantlockTest test= new reentrantlockTest();
new Thread(new Runnable() {
@Override
public void run() {
test.test1();
}
},"t1").start();
new Thread(new Runnable() {
@Override
public void run() {
test.test2();
}
},"t2").start();
}
test1开始等待获取锁
test1已经获取锁
test2开始等待获取锁
test2开始已经获取锁
用起来很简单lock,unlock就可以了。当多个线程同时要获取这个锁时候到底发生了什么?
简单描述
在具体分析源码之前,先用语言简单描述一下.
有一位足疗店技师活特别好,大家去消费都想点他服务,但是他只是一个人,所有来的人得排队等他服务,开始服务就相当于加锁,服务结束相当于释放锁资源。
之后第一个人去叫第二个人进来服务,不断循环,直到都所有人都消费完成.
还有个概念
公平锁:大家都老老实实排队,先到的先被服务.
非公平锁:在第一个人叫第二个人的这段时间内,来消费就先进屋看看,还没人进来,就直接让技师服务(就是抢个时间差,直接插队,不过这样就节省了第一个人叫第二人的时间,效率高些).
那ReentrantLock和Synchronize锁有什么区别
先说相同点:两者都是可以非公平锁.在最新的jdk版本下效率差不多
不同点:
Synchronize:使用简单一些,非特定场景可以用
ReentrantLock:可以实现公平锁,可以在等资源时候可以中断.
那么AQS在里面起什么作用?
队列怎么排,怎么入队,出队,资源消息是怎么传递的.
开始源码分析
从lock开始分析
public void lock() {
sync.lock();
}
final void lock() {
state是一个表示锁资源状态的参数
1.上来就直接插队,用CAS算法看能不能获取锁资源
if (compareAndSetState(0, 1))
获取成功后,当前线程获得资源
setExclusiveOwnerThread(Thread.currentThread());
else
2.如果获取不成功,正常排队等待
acquire(1);
}
lock默认采用非公平锁,上来尝试获取锁,获取不到进入aqs队列排队
acquire()
public final void acquire(int arg) {
1.尝试获取锁资源,直接抛出异常,必须由子类定义什么才是获取锁
2.如果获取不到锁,创建一个新的队列节点,将节点放到队列里面
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
3.拿到锁之后,如果该线程是中断状态,直接中断线程
selfInterrupt();
}
入队前在尝试一次获取锁,如果获取不到,添加一个节点,把节点放到队列里排队,阻塞知道被前节点unpark通知,恢复执行后检查是否需要中断
tryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(acquires);
================================
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
1.获取锁的状态
int c = getState();
2.如果没有线程持有锁
if (c == 0) {
3.尝试自己获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
4.如果是当前已经是该线程持有锁,那么将state计数器+1.这里就体现了 可重入的特性
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
自定义了锁的获取形式state>0,表示锁已经被占有了,只有cas(0到1)是才获取成功,这样保证了独占模式。如果当前是该线程占有这个锁,那么state+1,unlock时-1,重入性就是这么来的.
可以看到这里继承了aqs之后,自由的实现锁的获取方式,concurrent包里面不少类通过实现不同的锁的获取,来实现不同的特性.后续博文中会陆续介绍
addWaiter(Node.EXCLUSIVE)
这里添加了一个独占模式的节点
先看addWaiter(Node.EXCLUSIVE)
添加一个等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
1.如果存在尾部节点,将新节点链接在其后,并且将新节点设置为尾部节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
2.有可能队列为空,则直接入队
enq(node);
return node;
}
=====================
private Node enq(final Node node) {
for (;;) {
1.如果尾部为空
Node t = tail;
if (t == null) { // Must initialize
2.先初始化一个空节点,将其设置为 头,尾 然后for循环
if (compareAndSetHead(new Node()))
tail = head;
} else {
3.到这里肯定已经有尾部节点了,将我们的节点加在尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
4.最后将节点返回
return t;
}
}
}
}
这里就是初始化队列(实际是双向链表),头尾都是傀儡节点,将节点链接到链表尾部
现在有了一个节点,那么就开始入队了
acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
1.获取当前节点的前一个节点
final Node p = node.predecessor();
2.如果前节点是头节点,尝试一次获取锁
if (p == head && tryAcquire(arg)) {
3.如果获得了锁,把该节点设置为头节点,将其设置为傀儡节点
setHead(node);
4.原头节点后面的链表置为空,// help GC
p.next = null; // help GC
failed = false;
return interrupted;
}
5.如果获取锁失败,那么就需要挂起该线程,等待通知
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
6.如果失败,做失败处理
if (failed)
7.后面进行分析
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
=============
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
1.这里又多了一个状态,waitStatus表示节点的等待状态
2.先获取前节点状态
int ws = pred.waitStatus;
3.如果前节点处于通知状态,意味当前节点可以尝试去获取锁了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
4.如果前节点是cancel状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
5.那就不断遍历找到不是取消状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
6.如果头节点是新创建的状态是0(PROPAGATE以后在讨论),(这里设置为0是因为Unlock时候处理的,具体细节看后面)那么将他设置为通知状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
===============
private final boolean
1.走到了这里说明,该线程是可以挂起的状态了
parkAndCheckInterrupt() {
2.这里用了park(park(),unpark(),这一对方法很有意思。wait,notify都应该知道,先wait,在sign才可以生效要不然就卡死了.这里park,unPark,作用是类似的,但是完全不需要顺序,可以先unpark,在Park.具体写个小demo就可以了解了)
所以就避免了这种情况,head已经执行完了,也unpark了,但是当前线程还没有执行到挂起的地方,造成卡死
最后线程阻塞到这里,等到前节点通知
LockSupport.park(this);
3.返回线程是否中断了(被自己或其他中断了)
只有当节点被唤起后才能设置中断状态
return Thread.interrupted();
}
上面入队之后进行阻塞,直到接到前节点发到的信号.
以上就是lock的实现流程,下面在看看unlock的操作
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
1.尝试释放锁资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg) 同tryAcquire一样需要子类自定义获取锁的方式
====================================
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
子类的具体实现
protected final boolean tryRelease(int releases) {
1.锁当前状态-1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
2.如果状态等于0了,那么这个锁就被成功的释放了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
3.如果减了一次还不为0,那么当前这个线程多了lock了这个锁,这里也是可重入导致的
setState(c);
return free;
}
==========================================
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
1.将头状态设置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
2.找到后继节点,并且没有取消,通知该节点恢复 这也是为什么必须要在finaly里unlock
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
以上是源码解析
最后留一些思考
1.ReentrantLock独占性,可重入性是怎么实现的
2.AQS在里面起什么作用?
3.公平性和非公平性是怎么实现的
4.park(),unPark()