正好最近研究了下Redisson的源码,和大家分享一下
前言
首先我们先回顾一下 Java 中的 ReentrantLock 是如何实现的?
这里我先简单介绍一下ReentrantLock 实现的思路
锁标识:通过AQS的state变量作为锁标识,利用Java的CAS保证多线程竞争锁时的线程安全问题
队列:未竞争到锁的线程进入AQS的队列并挂起,等待解锁时被唤醒(或者超时)
如何设计分布式可重入锁
首先锁标识,这个在Redis中很容易实现,可以用lock name 作为key,当前线程生成一个uuid,作为value,加上Redis 单线程模型,实现线程安全的锁竞争
这种方式在之前的博客里也提到过,可以参考下 Redis分布式锁的正确实现方式
但是如何基于Redis 做一个队列,像Java那样可以挂起唤醒线程呢?这点我在看源码之前一直没有想到...
那么Redisson 是如何做的呢?
答案:利用Redis的发布订阅,加上Java的Semaphore(信号量,不了解Semaphore的小伙伴可以Google一下)
Redisson 分布式锁实现思路
锁标识:Hash 数据结构,key 为锁的名字,filed 当前竞争锁成功线程的"唯一标识",value 重入次数
队列:所有竞争锁失败的线程,会订阅当前锁的解锁事件,利用 Semaphore 实现线程的挂起和唤醒
源码分析
我们来看一下tryLock方法的源码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁,返回null 代表获取锁成功,当获取锁失败时返回当前锁的释放时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 如果此时已经超过等待时间则获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅解锁事件
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待订阅成功,成功后唤醒当前线程
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 再次判断一下是否超时
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 等待解锁消息,此处利用Semaphore,锁未释放时,permits=0,线程处于挂起状态
// 当发布解锁消息时,当前的Semaphore对象的release() permits=1
// 所有的客户端都会有一个线程被唤醒,去尝试竞争锁
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
tryAcquire(leaseTime, unit, threadId); 这个方法我们下面会分析,现在我们只需要知道这个方法是用来获取锁就可以了
这个时候我们已经可以理清Redisson可重入锁的思路了
- 获取锁
- 如果获取锁失败,订阅解锁事件
- 之后是一个无限循环
while(true) {
// 尝试获取锁
// 判断是否超时
// 等待解锁消息释放信号量
//(此时每个Java客户端都可能会有多个线程被挂起,但是只有一个线程会被唤醒)
// 判断是否超时
}
利用信号量,合理控制线程对锁的竞争,合理利用系统资源,可以说做的灰常的奈斯了
需要注意:
!await(subscribeFuture, time, TimeUnit.MILLISECONDS)
,这里很多博客都解释错了,这里并不是等待发布解锁消息,只要订阅事件成功后,就会往下执行,真正等待解锁消息的是getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
这里你可能不信,为什么我说的就对啊,debug一下你就知道
tryLockInnerAsync
tryAcquire 内部依靠 tryLockInnerAsync 来实现获取锁的逻辑,我们来看下源码
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 是否存在锁
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 不存在则创建
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 竞争锁成功 返回null
"return nil; " +
"end; " +
// 如果锁已经被当前线程获取
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重入次数加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁被其他线程获取,返回锁的过期时间
"return redis.call('pttl', KEYS[1]);",
// 下面三个参数分别为 KEYS[1], ARGV[1], ARGV[2]
// 即锁的name,锁释放时间,当前线程唯一标识
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
tryLockInnerAsync 中利用lua脚本 和 Redis 单线程的特点来实现锁的竞争
这里可以看到锁的结构,和我们上文所说的一样,Hash 数据结构,key 为锁的name,filed 当前竞争锁成功线程的"唯一标识",value 重入次数
unlockInnerAsync
接下来我们再来看解锁的核心代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 用锁的name和线程唯一标识去判断是否存在这样的键值对
// 解铃还须系铃人,不存在则无权解锁,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 解锁逻辑
// 冲入次数-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果大于0 代表当前线程重入锁多次无法解锁,更新锁的有效时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 解锁,删除key
"redis.call('del', KEYS[1]); " +
// 发布解锁消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// KEYS[1],KEYS[2]
// 锁的name,发布订阅的Channel
Arrays.<Object>asList(getName(), getChannelName()),
// ARGV[1] ~ ARGV[3]
// 解锁消息,释放时间,当前线程唯一标识
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
发布解锁消息后,会调用到LockPubSub 的 onMessage,释放信号量,唤醒等待锁的线程
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 释放信号量
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
参考
- 慢谈 Redis 实现分布式锁 以及 Redisson 源码解析
- https://www.programcreek.com/java-api-examples/?code=rollenholt-SourceReading/redisson/redisson-master/src/main/java/org/redisson/RedissonLock.java
欢迎点赞、转发。你的支持就是对我最大的帮助