引言
提起java的线程同步,大家总能想到sychronized关键字。sychronized是由JVM提供的重量级锁,使用方式简单,功能比较单一。
ReentrantLock是由java API提供的用来做线程同步的类,它的实现借助了队列同步器AQS(AbstractQueuedSynchronizer)。
AQS 是 Java 并发包中实现锁、同步的一个重要基础框架。
ReentrantLock提供了比sychronized更丰富的功能,包括可重入、公平锁、可中断等。
ReentrantLock使用
ReentrantLock的使用姿势如下
private ReentrantLock lock = new ReentrantLock();
public void run() {
lock.lock();
try {
//do bussiness
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
公平锁
实例化ReentrantLock时,可以通过构造器传参指定声明的是公平锁还是非公平锁。
Lock lock=new ReentrantLock(true);//公平锁
Lock lock=new ReentrantLock(false);//非公平锁
公平锁是指线程获取锁的顺序是按照加锁顺序来的,而非公平锁则可以抢锁,先申请的线程不一定先获得锁。
由于公平锁需要维护一个队列,记录当前已排队的线程,所以非公平锁的性能会高于公平锁。默认构造函数创建的就是非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
tryLock()
tryLock的功能非常实用,实际业务中的线程等待锁资源释放不是永无止境。通过tryLock形参可以设定等待时间,时间到后如果能够获取锁就返回true,否则返回false,而不是一直阻塞直到等待的锁资源被释放。
Lock lock = new ReentrantLock();
@Override
public void run() {
try {
lock.tryLock(100, TimeUnit.MILLISECONDS);
} finally {
lock.unlock();
}
}
ReentrantLock原理
查看源码,可以看到ReentrantLock类包含了3个内部类:
- Sync:其中抽象内部类Sync继承自AQS,实现最核心的功能。
- FairSync:继承自抽象类Sync,实现公平锁
- NonfairSync:继承自抽象类Sync,实现非公平锁
加锁解析
公平锁
ReentrantLock的lock()方法调用的是Sync的lock方法,lock()是抽象方法,具体实现由其子类FairSync的lock方法提供。
//ReentrantLock的lock方法
public void lock() {
sync.lock();
}
//FairSync的lock方法
final void lock() {
acquire(1);
}
acquire方法来自Sync类
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法的公平锁实现如下
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
state字段为0表示没有被任何线程持有,紧接着调用hasQueuedPredecessors方法判断队列中是否有其他线程正在排队
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
如果没有排队的线程,就调用CAS的compareAndSetState方法线程安全的将同步器的state字段设置为1。接下来调用setExclusiveOwnerThread方法把当前线程保存下来,以便下次该线程的重入。
state大于0意味着当前锁已经被线程持有,需要判断持有锁是否当前线程。
如果是当前线程,将state值加一重新写入state状态(线程可重入)。
如果不是当前线程,直接返回失败。
acquire方法中,tryAcquire方法返回false会继续调用addWaiter(Node.EXCLUSIVE), arg)方法将当前线程构建成Node对象写入到线程排队队列。
AQS中的线程队列是由Node对象为节点的双向链表。Node节点包含两种模式:排他模式与共享模式。ReentrantLock是排他模式,ReadWriteLock是共享模式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果该队列已经有node(tail!=null),则将新节点的前驱节点置为tail,再通过CAS将tail指向当前节点,前驱节点的后继节点指向当前节点,然后返回当前节点。
如果队列为空或者CAS失败,则通过enq入队。
进队列的时候,要么是第一个入队并且设置head节点并且循环设置tail,要么是add tail,如果CAS不成功,则会无限循环,直到设置成功,即使高并发的场景,也最终能够保证设置成功,然后返回包装好的node节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就完成任务了。
如果不是头节点,或者获取锁失败,则会根据上一个节点的waitStatus状态来处理。shouldParkAfterFailedAcquire(p,node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt()。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt方法使用LockSupport的park方法挂起线程。
非公平锁
//ReentrantLock的lock方法
public void lock() {
sync.lock();
}
//NonfairSync的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
非公平锁直接调用compareAndSetState方法修改state,修改成功即成功获得锁,将当前线程写入排他锁独占线程。如果获取锁失败,则调用Sync的acquire方法,非公平锁的tryAcquire方式实现如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁的逻辑相对简单,不需要判断线程队列直接尝试获得锁。
释放锁解析
公平锁和非公平锁的释放流程都是一样的。
public void unlock() {
sync.release(1);
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
总结
由于公平锁维护了线程队列,加锁的流程比较繁琐导致性能开销大。实际使用中可优先考虑非公平锁,以获取更好的性能优势。