带着问题去思考
分布式锁有哪些解决方案?方案的利弊各自体现在哪里?
基于redis来实现分布式锁实现原理,以及需要主要那些问题?
基于ZooKeeper 的分布式锁实现原理
背景概要
互联网从开始的单体应用随之发展成目前的分布式应用,例如市场上流行的分布式框架Dubbo、SpringCloud等等
单体应用的优势:维护、集成、部署简单,适合小团队独立维护,劣势随之产生的是可扩展性太差,代码腐化维护成本的增加、应用复杂度越高功能越多风险性就越大。
所以为了解决以上问题,引入了分布式应用,市场上很多大型网站以及应用都是分布式部署的。
分布式系统主要从三方面获得提升:
- 扩展性:集群扩展性、地理扩展性、管理扩展性
- 性能:短RT、低延迟,高吞吐和较低的计算资源占用率。
- 可用性:可用性=可用时间/(可用时间+不可用时间),可用性百分比越高,难度越高 。
分布式家族中包含分布式服务、分布式消息、分布式缓存、分布式调度、分布式数据库、分布式搜索、分布式锁、分布式事务、分布式计算等等
在分布式场景中数据一致性向来是很重要的话题,CAP理论中“任何一个分布式系统都无法同时满足一致性(Consistency
)、可用性(Availability
)和分区容错性(Partition tolerance
),最多只能同时满足两项。要么CP,要么AP。
https://www.yuque.com/docs/share/6e6afffe-c348-461e-90bc-711231fbd209?#
在很多场景下,我们是需要保证数据一致性,为了满足一致性问题,我们需要很多技术方案支持,比如分布式锁分布式事务等等。本次分享主要针对分布式锁来进行延伸探讨。
分布式锁的介绍
讲到分布式锁,首先要提到与之对应的线程锁
- 线程锁:当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。以保证共享资源安全性,线程锁只在同一个进程【同一个JVM】中才有效。
- 分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
到底什么时候需要分布式锁呢?
总结来说,当有多个客户端需要访问并防止并操作同一个资源,并且还需要保持这个资源的一致性的时候,就需要分布式锁,实现让多个客户端互斥的对共享资源进行访问。
eg: 集群部署下秒杀场景、集群部署下批处理业务的执行等等
考虑因素
- 排他性:分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行
- 可重入性:同一线程多次获取锁避免死锁
- 阻塞锁考虑:考虑业务是否需要
- 高可用:获取释放锁性能佳
- 原子性:加锁解锁原子性操作,避免多次请求获得锁。
常见的分布式锁实现方案
想要实现分布式锁,我们就需要借助外部系统实现互斥的功能。常见的有以下几种方式:
- 基于数据库;
- 基于redis;
- 基于Zookeeper;
基于数据库
- Mysql:通过唯一索引、通过乐观锁version版本、通过悲观锁行锁 for update实现;
- MongoDB:findAndModify原子性命令,相较Mysql性能要好很多
容易理解,但解决问题的方案相对越来越复杂,并且数据库需要一定的开销,性能值得考虑
本次分享不涵盖基于数据库分布式锁实现,有兴趣可以下去自行通过上述方向去扩展。
基于Redis
在实现redis分布式锁之前,我们先mock一个场景来看看,当分布式应用场景下,我们不用锁或者用java中线程锁会出现什么问题呢?
@GetMapping("/kill")
public String kill() {
// 定义商品key值
String key = "goods";
// 获取商品数量
Object obj = redisTemplate.opsForValue().get(key);
Integer mount = Integer.valueOf(obj.toString());
// 如果商品被抢完,直接返回
if (mount < 0 || mount == 0) {
System.out.println("很遗憾,商品已被抢完");
return "很遗憾,商品已被抢完";
}
// 线程睡眠,目的在于放大错误
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 抢到商品后,将redis的商品数量减一
mount = --mount;
redisTemplate.opsForValue().set(key, mount.toString());
// 打印,以便观察
System.out.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":抢到第" + (mount + 1) + "件商品【kill】");
return "恭喜,商品抢购成功";
}
原理
最核心的三个命令:setNx、Px(setNx、expire)、delete
setNx当返回1时代表获取锁成功,0则抢锁失败
redis原生实现
具体实现
接下来我们所有的实现redis分布式锁都是基于Spring切面定义来完成。
// 准备工作
@Bean(name="redisTemplate")
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key haspmap序列化
template.setHashKeySerializer(redisSerializer);
return template;
}
@Aspect
@Component
public class RedisLockProvider {
@Around("@annotation(lock)")
public Object execute(ProceedingJoinPoint point, RedisLock lock) throws IllegalAccessException, InstantiationException {
Object result = null;
String lockKey = lock.lockKey();
Class<? extends RedisLockStrategy> strategy = lock.strategy();
RedisLockStrategy redisLockStrategy = strategy.newInstance();
try {
int expireTime = lock.expireTime();
if(redisLockStrategy.lock(lockKey, expireTime)){
result = point.proceed();
}
} catch (Throwable throwable) {
redisLockStrategy.unLock(lockKey);
throwable.printStackTrace();
} finally {
redisLockStrategy.unLock(lockKey);
}
return result;
}
}
我们看了不加锁或者加java锁的情况,确实会出现数据异常的情况,那现在我们通过redis本身的一些特性来实现redis分布式锁。
public class RedisLockUtil0 implements RedisLockStrategy {
private String TEMP_VALUE = "OK";
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
return redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS);
}
public void unLock(String key){
redisTemplate.delete(key);
}
}
但上述场景中会有一个问题,就是当我获取锁的时候,如果没抢锁成功会立刻返回到客户端通知结果,也许下一时间正好就能抢到锁,所以我们做了自旋的操作,并设置默认超时时间,这里也可以不设置超时时间:那就是阻塞锁了,看业务场景而制定
public class RedisLockUtil1 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 2000; // 默认超时时间(毫秒) 默认2秒
private String TEMP_VALUE = "OK";
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("申请锁(" + key + ")成功");
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加锁失败, 锁键值:" + key, e);
}
return Boolean.FALSE;
}
public void unLock(String key){
log.info("释放锁,key:"+key);
redisTemplate.delete(key);
}
}
上述实现方式,看似逻辑ok,有同学能看出有什么漏洞吗???我们来测试看下。
让我们业务时间执行较短时测试,发现业务逻辑没太大问题;
让我们业务时间稍微变大,结果发现了什么?
当业务执行时间大于默认的自旋超时时间时,会触发删除锁操作,会出现误删的情况,A业务的锁还未执行完成,B锁获取异常或获取失败或者自旋时间已经超时,导致误删了A业务的锁,也就会导致分布式锁的定义没有任何意义了。所以我们在设置锁的时候,设置一个属于该锁的唯一ID,在删除锁的时候要判断是否属于自己的锁。以避免误删场景。
public class RedisLockUtil2 implements RedisLockStrategy {
/**
* 默认超时时间(毫秒) 默认2秒
*/
private static final long DEFAULT_TIME_OUT = 2000;
/**
* 避免误删key,在value中赋值UUID
*/
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("加锁成功...");
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加锁失败, 锁键值:" + key, e);
}
return Boolean.FALSE;
}
public void unLock(String key){
if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
log.info("释放锁,key:"+key);
redisTemplate.delete(key);
}
}
}
那现在我们模拟一个场景,假设我们业务的执行时间过长的情况下,但我们设置的redis过期时间很短,那会出现什么问题呢???我们来测试看下。
是不是导致A业务的还未执行完成,B业务却拿到了本应该属于A的锁,分布式锁的意思有荡然无存了,所以我们借鉴了redisson中WatchDog【看门狗】的机制来完善业务时间大于过期时间的问题。
redisson是用java来实现的分布式框架,稍后我们会介绍redisson是如何基于redis来实现分布式锁并解决相关问题的。
public class RedisLockUtil3 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 2000; // 默认超时时间(毫秒) 默认2秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
/**
* 初始化任务线程池
*/
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
/**
* 延长过期次数阈值
*/
private Integer maxRenewTimes = 10;
/**
* 延长过期时间次数
*/
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("申请锁(" + key + ")成功");
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加锁失败, 锁键值:" + key, e);
}
return Boolean.FALSE;
}
/**
* expireTime/3 频率去重置过期时间
* @param key
* @param value
* @param expireTime
*/
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
// 延长过期时间失败直接返回
if(!this.renewExpiration(key, value, expireTime)){
return;
}
// 超过延长次数阈值直接返回
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
/**
* 重置过期时间
* @param lockKey
* @param lockValue
* @param lockWatchdogTimeout
* @return
*/
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String value = (String) redisTemplate.opsForValue().get(lockKey);
if (Objects.isNull(value) || !value.equals(lockValue)) {
return false;
}
log.info("延长过期时间,key:"+lockKey);
return redisTemplate.expire(lockKey, lockWatchdogTimeout, TimeUnit.SECONDS);
}
public void unLock(String key){
if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
log.info("释放锁,key:"+key);
redisTemplate.delete(key);
}
}
}
Lua脚本语言的引入
到此为止,我们自己实现的分布式方案看似已经ok,但其实还是有很大的问题,譬如在删除锁、给锁加长超时时间等操作,我们是先获取锁在删除或者延长超时时间,两者操作并不是原子性操作,如果在获取锁成功之后,redis宕机,那么也会出现业务紊乱,所以我们在redis操作要尽量保证院子性操作。
那么我们可以引入Lua脚本语言来支持,Lua脚本的优势,兴趣的同学可以下去学习下lua语言,大部分游戏开发都用的lua来实现;语法链接:https://www.runoob.com/lua/lua-tutorial.html
并且我们国人章亦春 OpenResty 也是基于Lua和Nginx来实现高性能服务端的。
Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。脚本命令:https://www.runoob.com/redis/redis-scripting.html
脚本入参:key个数、KEYS[1]、ARGV[1]、ARGV[2]
eg:EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
eval "return redis.call('setNx', KEYS[1], ARGV[1])" 1 wuding WD
eval "return redis.call('get',KEYS[1])" 1 CCC;
接下来我们先测试下redis内嵌lua脚本。执行成功返回1,否则返回0
@GetMapping("/luaTest")
public String luaTest() {
String script_init = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
Object initResult1 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
System.out.println(initResult1);
Object initResult2 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
System.out.println(initResult2);
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
Object expireResult = redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList("AAA"),"aaa", "5000");
System.out.println(expireResult);
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Object aaa = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("AAA"), "aaa");
System.out.println(aaa);
return aaa.toString();
}
完善嵌入lua脚本的redis分布式锁实现
public class RedisLockUtil4 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 20000; // 默认超时时间(毫秒) 默认20秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
private Integer maxRenewTimes = 10;
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("redis申请锁(" + key + ")成功");
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加锁失败, 锁键值:" + key, e);
}
return Boolean.FALSE;
}
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
if(!this.renewExpiration(key, value, expireTime)){
return;
}
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 入参切记都是字符串,要不然就会类型转换失败,scheduledExecutor把异常捕获掉了看不到错误信息
Long expireResult = null;
try {
expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
} catch (Exception e) {
log.error("执行lua脚本出错!e:"+ e.getCause().getMessage());
return Boolean.FALSE;
}
if(expireResult == 1L){
log.info("延长过期时间,key:"+lockKey);
}
return expireResult == 1L;
}
public void unLock(String key){
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
if(result == 1L){
log.info("释放锁,key:"+key);
}
}
}
接下来我们再设想一个问题:业务A调用业务B,AB两者业务都调用了分布式锁,或者A业务来做个递归操作,那么大家猜想下会出现什么问题???我们来测试看下。
把锁的超时时间设大来进行测试,如果是阻塞锁会一直阻塞下去,非阻塞锁的话,超时时间获取子方法也没有执行,业务逻辑也就会有问题。这就是我们可重入性的问题。
可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
通俗理解就是:排队打水,一个人只能用一个桶来接水,如果你还有一个桶,只能再去排队,这就是非重入性,反之,只要到排到你,不管你拿几个桶你都可以来接满水。
public class RedisLockUtil5 implements RedisLockStrategy {
private static final long DEFAULT_TIME_OUT = 20000; // 默认超时时间(毫秒) 默认2秒
private String TEMP_VALUE = UUID.randomUUID().toString();
private RedisTemplate<String, String> redisTemplate = ApplicationContextUtil.getBean("redisTemplate");
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
private Integer maxRenewTimes = 10;
private AtomicInteger renewTimes = new AtomicInteger(0);
public Boolean lock(String key){
return lock(key, 10);
}
public Boolean lock(String key, int expireTime) {
long currentTimeMillis = System.currentTimeMillis();
try {
while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
LockInfo lockInfo = ThreadLocalUtil.get();
if(Objects.nonNull(lockInfo)
&& key.equals(lockInfo.getKey())){
lockInfo.getCount().incrementAndGet();
ThreadLocalUtil.put(lockInfo);
// 将threadLocal中的value赋值给当前TEMP_VALUE,保证可重入性,保证删除逻辑正常
TEMP_VALUE = lockInfo.getValue();
// TODO 这里应该重置redis过期时间
log.info("可重入加锁成功...");
return Boolean.TRUE;
}
if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
log.info("redis申请锁(" + key + ")成功");
ThreadLocalUtil.put(new LockInfo(key, TEMP_VALUE, new AtomicInteger(1)));
this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
return Boolean.TRUE;
}
Thread.sleep(100);
}
} catch (InterruptedException e) {
log.error("加锁失败, 锁键值:" + key, e);
}
return Boolean.FALSE;
}
private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
scheduledExecutor.schedule(()->{
if(!this.renewExpiration(key, value, expireTime)){
return;
}
if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
ThreadLocalUtil.clear();
return;
}
this.scheduleExpirationRenewal(key,value,expireTime);
}, expireTime / 3, TimeUnit.SECONDS);
}
private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
// 入参切记都是字符串,要不然就会类型转换失败,scheduledExecutor把异常捕获掉了看不到错误信息
Long expireResult = null;
try {
expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
} catch (Exception e) {
log.error("执行lua脚本出错!e:"+ e.getCause().getMessage());
return Boolean.FALSE;
}
if(expireResult == 1L){
log.info("延长过期时间,key:"+lockKey);
}
return expireResult == 1L;
}
public void unLock(String key){
LockInfo lockInfo = ThreadLocalUtil.get();
if(Objects.nonNull(lockInfo)
&& key.equals(lockInfo.getKey())
&& TEMP_VALUE.equals(lockInfo.getValue())){
lockInfo.getCount().decrementAndGet();
ThreadLocalUtil.put(lockInfo);
log.info("释放threadLocal锁,key:"+key);
}
if(Objects.nonNull(lockInfo) && lockInfo.getCount().get() <= 0){
try {
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
if(result == 1L){
log.info("释放redis锁,key:"+key);
}
} finally {
ThreadLocalUtil.clear();
}
}
}
/**
* 也可以将线程id存入redis中去做比较,有兴趣可以自行实现
*/
static class ThreadLocalUtil{
private static final ThreadLocal<LockInfo> THREAD_LOCAL = new ThreadLocal<>();
public static LockInfo get() {
return THREAD_LOCAL.get();
}
public static void put(LockInfo lockInfo) {
THREAD_LOCAL.set(lockInfo);
}
public static void clear() {
THREAD_LOCAL.remove();
}
}
@AllArgsConstructor
@Data
static class LockInfo{
private String key;
private String value;
private AtomicInteger count;
}
}
也可以将线程id存入redis中去做比较,有兴趣可以自行实现
并且避免线程id可能重复,可以把每个进程标识为唯一id作为前缀,有兴趣可以自行实现
最终形态的流程图
解决了哪些问题
到此为止我们redis分布式锁就最终实现完成了。我们回顾下我们解决了那些问题?
- 阻塞锁-回旋操作
- 误删锁的问题
- 业务执行时间大于分布式锁的过期时间如何处理
- redis命令非原子性问题
- 可重入锁的问题
Redisson实现
在介绍看门狗机制的时候我们有提到redisson框架,那么接下来我们看下redisson分布式锁框架具体是如何实现的。首先我们先针对上述mock业务通过redisson分布式锁来探究是否会出现上述问题
具体实现
redisson.yml配置:具体配置映射对象在org.redisson.config;
# 单节点配置
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# # 重新连接时间间隔,单位:毫秒
# reconnectionTimeout: 3000
# # 执行失败最大次数
# failedAttempts: 3
# 密码
password:
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 客户端名称
clientName: myRedis
# # 节点地址
address: redis://127.0.0.1:6379
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 0
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"
// 准备工作
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
RedissonClient redisson = Redisson.create(
Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));
return redisson;
}
@Aspect
@Component
public class RedissonLockProvider {
@Autowired
private RedissonClient redissonClient;
@Around("@annotation(lock)")
public Object execute(ProceedingJoinPoint point, RedissonLock lock){
Object result = null;
String lockKey = lock.lockKey();
RLock rLock = redissonClient.getLock(lockKey);
rLock.lock();
try {
result = point.proceed();
} catch (Throwable throwable) {
rLock.unlock();
throwable.printStackTrace();
} finally {
rLock.unlock();
}
return result;
}
}
Redisson锁分类
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock):也是继承了可重入锁的
- 联锁(MultiLock):将多个RLock对象关联为一个联锁,每个实例可以来自于不同Redisson实例。
- 红锁(RedLock):继承联锁。n个master节点完全独立,并且没有主从同步,此时如果有n / 2 + 1个节点成功拿到锁并且大多数节点加锁的总耗时,要小于锁设置的过期时间。so加锁成功。
- 读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)、闭锁(CountDownLatch)
节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。
源码探究
大家如果有兴趣了解上述锁的具体实现原理可自行研究,本次分享主要针对默认redisson实现锁方案可重入锁RLock来讲解,
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
// 异步的Executor执行器
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
// 默认超时时间为30S
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
// redis的订阅发布模式
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
// 获取锁成功返回null,获取锁失败 && 等待时间还早就频繁获取锁并监听锁是否被释放掉
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 尝试获取锁,没有获取到锁,返回剩余ttl过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// waitTime超时后,返回false获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅分布式锁,解决通知,发布订阅模式
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待锁释放,等待超时释放订阅信息
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 循环去调用获取锁方法tryAcquire
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
//1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
//2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 取消解锁消息的订阅
unsubscribe(subscribeFuture, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
// tryLock()方法最终执行逻辑
if (leaseTime != -1) {
// 如果有设置锁的过期时间,则直接调用lua,不走看门狗逻辑
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// lock()方法最终执行逻辑
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining) {
// 执行看门狗逻辑
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// 将线程id存入hash中的Field,用于解决可重入问题
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // key是否不存在
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 不存在的话把赋值Key,Filed为线程id,value为1存储到hash数据结构中
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 并且设置过期时间
"return nil; " + // 返回null
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果可以存在,并且field等于线程id
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 则吧value++1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置过期时间
"return nil; " + // 返回null
"end; " +
"return redis.call('pttl', KEYS[1]);", // 返回过期时间
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
// watchDog看门狗逻辑
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 定时任务过期时间 internalLockLeaseTime/3 频率来延长过期时间为internalLockLeaseTime
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
// 解锁逻辑
// key+field不存在,直接返回null
// 存在的话,value--1,判断value是否大于0,大于0重置过期时间,否则删除key并且发布删除订阅事件
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
基于Zookeeper
zookeeper分布式锁原理
- 保持独占:多个客户端同时过来创建/lock/zk-001节点,那么有且仅有一个客户端能创建成功。换句话说,倘若把Zookeeper上的一个节点看做是一把锁,那么成功创建的客户端则能保持独占;
- 控制时序:有一种临时有序节点,每个来尝试获取锁的客户端,都会在Zookeeper的根目录下创建一个临时有序节点,Zookeeper的/lock节点维护一个序列,序号最小的节点获取锁成功。
- 监听机制:Watcher机制能原子性的监听Zookeeper上节点的增删改操作
基础理论
zk-znode
zk操作和维护的为一个个数据节点,称为 znode,采用类似文件系统的层级树状结构进行管理,如果 znode 节点包含数据则存储为字节数组(byte array)。同一个节点多个客户同时创建,只有一个客户端会成功,其它客户端创建时将失败。
zk的四种节点
持久性节点:节点创建后将会一直存在
临时节点:临时节点的生命周期和当前会话绑定,一旦当前会话断开临时节点也会删除,当然可以主动删除。
持久有序节点:节点创建一直存在,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。
临时有序节点:保留临时节点的特性,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。
zk-watcher
事件监听器是zookeeper中的一个很重要的特性。
None(-1), 客户端与服务端成功建立连接
NodeCreated(1),Watcher监听的对应数据节点被创建
NodeDeleted(2),Watcher监听的对应数据节点被删除
NodeDataChanged(3),Watcher监听的对应数据节点的数据内容发生变更
NodeChildrenChanged(4),Wather监听的对应数据节点的子节点列表发生变更
DataWatchRemoved(5),
ChildWatchRemoved(6),
_PersistentWatchRemoved _(7);
实现思路
首先方案:同一个节点只能创建一次,加锁时检查节点是否exist,不存在则创建节点,否则监听该节点的删除事件,当释放锁的时候再次竞争去创建节点。如此带来的就是当并发量很高的时候,释放锁会唤醒许多客户端都去竞争,竞争失败的客户端再去休眠,如此反复对系统资源造成了极大的浪费。
为了规避以上问题,我们可以使用有序子节点的形式来实现分布式锁,而且为了规避客户端获取锁后突然断线的风险,我们有必要使用临时有序节点。
多个客户端竞争锁资源,创建多个临时有序节点,检查所属节点是否是最小节点,如若是,则获取锁成功,如若不是,那就监听自己节点-1的删除事件,等待被唤醒。
这种方案在每次释放锁时只唤醒一个客户端,减少了线程唤醒的代价,提高了效率。
zk原生API实现
接下来我们通过zk原生实现API来实现分布式锁
public class ZkLockUtil implements Watcher {
public static final String NODE_PATH = "/lock-space-watcher";
private ZooKeeper zk = null;
public ZkLockUtil() throws IOException, KeeperException, InterruptedException {
zk = new ZooKeeper("127.0.0.1:2181", 300000, this);
}
protected CountDownLatch countDownLatch=new CountDownLatch(1);
private String lockPath;
public String createNode(String key){
try {
String node = NODE_PATH +"/"+ key;
//检测节点是否存在
Stat stat = zk.exists(node, false);
//父节点不存在,则创建父节点
if(Objects.isNull(stat)){
synchronized (NODE_PATH) {
//父节点是持久节点 一层层创建否则会报错
zk.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
lockPath = zk.create(node + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("节点创建成功,返回值【"+lockPath+"】");
return lockPath;
} catch (KeeperException e1) {
e1.printStackTrace();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return null;
}
//校验当前节点是否为序号最小的节点
public boolean checkLockPath(String key, String lockPath){
String nodePath = NODE_PATH + "/" + key;
try {
//注册父节点监听事件,当父节点下面的子节点有变化,就会触发Watcher事件
List<String> nodeList = zk.getChildren(nodePath, false);
Collections.sort(nodeList);
int index = nodeList.indexOf( lockPath.substring(nodePath.length()+1));
switch (index){
case -1:{
System.out.println("本节点已不在了"+lockPath);
return false;
}
case 0:{
System.out.println("获取锁成功,子节点序号【"+lockPath+"】");
return true;
}
default:{
String waitPath = nodeList.get(index - 1);
zk.exists(nodePath+"/"+waitPath, this);
System.out.println(waitPath+"在"+nodeList.get(index)+"点前面,需要等待【"+nodeList.get(index)+"】");
return false;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
public boolean lock(String key, Integer waitTime){
//创建获取锁的节点(顺序临时节点)
String childPath = createNode(key);
boolean flag = true;
AtomicInteger atomicInteger = new AtomicInteger(0);
if(null != childPath){
lockPath = childPath;
try {
//轮询等待zk获取锁的通知
while(flag){
if(checkLockPath(key, childPath)){
//获取锁成功
return true;
}
if(null != waitTime && atomicInteger.get() > 0){
// 删除当前等待节点
return false;
}
//节点创建成功, 则等待zk通知
if(null != waitTime){
countDownLatch.await(waitTime, TimeUnit.SECONDS);
atomicInteger.incrementAndGet();
System.out.println("await等待被唤醒~"+waitTime);
}else{
countDownLatch.await();
System.out.println("await等待被唤醒~");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println("节点没有创建成功,获取锁失败");
}
return false;
}
@Override
public void process(WatchedEvent event) {
//成功连接zk,状态判断
if(event.getState() == Event.KeeperState.SyncConnected){
//子节点有变化
if(event.getType() == Event.EventType.NodeDeleted){
System.out.println("临时节点自动删除");
countDownLatch.countDown();
}
}
}
public void unlock(){
try {
zk.delete(getLockPath(), -1);
if(Objects.nonNull(zk)){
zk.close();
}
} catch (Exception e) {
}
}
public String getLockPath() {
return lockPath;
}
}
上述实现方式虽能更好的理解zk来实现分布式锁的逻辑,但本身zk原生实现编码实现较多,并且很难保证是否有有问题,不太建议自己编码来实现zk原生的分布式锁,如果有兴趣的同学可自行实现可重入锁的逻辑,上述已经分析了很多实现方案,这儿不在对此深入。
客户端Curator实现
接下来我们通过zk的客户端Curator来实现zk的分布式锁。
Apache 开源框架Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。
Curator锁分类
可重入互斥锁 InterProcessMutex
不可重入互斥锁 InterProcessSemaphoreMutex
读写锁 InterProcessReadWriteLock
集合锁 InterProcessMultiLock
具体实现
接下来我们通过zk客户端Curator来实现分布式锁
InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
// InterProcessMutex mutex = new InterProcessMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
public String kill() throws Exception {
mutex.acquire();
// mutex.acquire(15, TimeUnit.SECONDS);
try{
// 定义商品key值
String key = "goods";
// 获取商品数量
Object obj = redisTemplate.opsForValue().get(key);
Integer mount = Integer.valueOf(obj.toString());
// 如果商品被抢完,直接返回
if (mount < 0 || mount == 0) {
System.out.println("很遗憾,商品已被抢完【kill】");
return "很遗憾,商品已被抢完";
}
// 线程睡眠,目的在于放大错误
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 抢到商品后,将redis的商品数量减一
mount = --mount;
redisTemplate.opsForValue().set(key, mount.toString());
// 打印,以便观察
System.err.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":抢到第" + (mount + 1) + "件商品【kill】");
} catch (Exception e) {
mutex.release();
e.printStackTrace();
} finally {
mutex.release();
}
return "恭喜,商品抢购成功";
}
Curator-InterProcessMutex可重入锁源码探究
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
// LockData存储当前持有锁的线程:为了实现可重入
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// 当前锁 重入++1
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 获取锁成功放到缓存中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
...省略部分代码
// 创建临时有序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 执行获取锁逻辑
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
...省略部分代码
return ourPath;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// 对所有节点进行排序:从小到大
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 返回当前节点或者等待节点的上一个节点
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
// 上一个节点路径信息
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
//设置监听器,getData会判读前一个节点是否存在,不存在就会抛出异常从而不会设置监听器
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
// 等待一段时间被唤醒
wait(millisToWait);
}
else
{
// 一直等待被唤醒
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// LockData当前线程可重入value--1
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
// value == 0 的时候才真正删除临时节点
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
final void releaseLock(String lockPath) throws Exception
{
// 一处所有监听者
client.removeWatchers();
revocable.set(null);
// 删除临时节点
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
分布式锁各自有何优势
基于Redis的分布式锁,适用于并发量很大、性能要求很高的、而可靠性问题可以通过其他方案去弥补的场景。
基于zk的分布式锁,适用于高可靠(高可用)而并发量不是太大的场景;因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。大家知道,ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。
而数据库来实现的分布式锁,受制于连接池资源、无锁失效机制、单点等因素,在并发量较低可靠性不那么强的时候也可以用。
分布式锁没有绝对的可靠性,只能通过人为补偿机制竟可能的提升锁可靠性。