Redis 单线程:
网络IO和键值对读写是由一个线程完成的(网络请求模块和数据操作模块是单线程的)
1,使用set加锁。
1)set key value [EX seconds] [PX milliseconds] [NX|XX]
NX:set if not exists
XX:set if already exists
EX:seconds
PX:milliseconds
set zq 3 EX 10000 NX
支持设置EX失效时间,NX表示key不存在时才设置。
2)在删除key时,将内容对比(确定是否是自身线程拥有的锁),一致才进行删除。
3)支持可重入:循环+唯一id
2,MyRedisLock
/**
* RedisLock 分布式锁
*/
public class MyRedisLock {
public static final String RELEASE_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final int DEFAULT_LOCK_EXPIRE_TIME_MILLIS = 10 * 1000;//默认的lock key的过期时间
private static final int DEFAULT_TRY_LOCK_TIMEOUT_MILLIS = 5 * 1000;//尝试加锁的超时时间
private static final int CHECK_RELEASE_MAX_INTERVAL_MILLIS = 100;//检查过期的间隔
private Jedis jedis;//节点客户端
private String lockKey;//加锁的key
private String lockId;//加锁的value
private volatile long lockHoldingTime = 0;//锁的持有时间
public MyRedisLock(Jedis jedis, String lockKey, String lockId){
this.jedis = jedis;
this.lockKey = lockKey;
this.lockId = lockId;
}
/**
* 主动释放锁
*/
public synchronized boolean release(){
Object result = jedis.eval(RELEASE_LUA_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(lockId));
lockHoldingTime = 0;
return result != null && (Integer)result > 0;
}
/**
* 判断是否加锁成功
*/
public boolean isAcquired(){
return lockHoldingTime >= System.currentTimeMillis();
}
/**
* 阻塞尝试使用默认超时
*/
public synchronized boolean acquire() throws InterruptedException{
return acquire(DEFAULT_TRY_LOCK_TIMEOUT_MILLIS);
}
/**
* 阻塞尝试加锁
*/
public synchronized boolean acquire(int lockAcquireTimeoutMillis) throws InterruptedException {
if (lockAcquireTimeoutMillis <= 0) {
throw new IllegalArgumentException("lockAcquireTimeoutMillis <= 0");
}
boolean isAcquired = false;
long startTimeNanos = System.nanoTime();
try {
int retry = 0;
while (lockAcquireTimeoutMillis > 0) {
String reply = jedis.set(lockKey, lockId, "NX", "PX", DEFAULT_LOCK_EXPIRE_TIME_MILLIS);
if ("OK".equals(reply)) {
lockHoldingTime = System.currentTimeMillis() + DEFAULT_LOCK_EXPIRE_TIME_MILLIS;
isAcquired = true;
break;
}
//支持可重入特性
if (lockId.equals(jedis.get(lockKey))) {
isAcquired = true;
break;
}
// exponential backoff sleep starting at 16 milliseconds and maximum 100 milliseconds
int sleep = 8 << ++retry;
if (sleep > CHECK_RELEASE_MAX_INTERVAL_MILLIS || sleep < 0) {
sleep = CHECK_RELEASE_MAX_INTERVAL_MILLIS;
}
lockAcquireTimeoutMillis -= sleep;
Thread.sleep(sleep);
}
return isAcquired;
} finally {
long elapsed = System.nanoTime() - startTimeNanos;
System.out.println("lockKey:" + lockKey + ", lockId:" + lockId + ", elapsed:" + elapsed);
}
}
}
3,eval执行lua脚本
1)
eval script numkeys key [key ...] arg [arg ...]
eval 脚本 key的数量 key列表 附加参数列表
KEYS key数组:KEYS[1]获取第一个key
;
ARGV 参数数组:ARGV[1]获取第一个附加参数
。
2)eval可以返回多个数据
eval "return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]}" 2 key1 key2 arg1 arg2
3)脚本中使用redis.call()调用redis命令
eval "return redis.call('set','testEval','evalValue')" 0
eval "return redis.call('get', 'testEval')" 0
4)redis原子操作,分布式解锁。
如果调用redis.get(${Key})获取到的值,和附加参数中ARGV[1]相等,则删除${Key},返回1,否则返回0
4,使用上述方案,潜在问题
1)超时时间设置
过长:如果线程执行时挂掉(如机器宕机、程序挂掉)等,未释放锁。则会导致其他线程的【锁等待时间】过长。【合理评估业务执行时间,宁大勿小】
过短:如果线程还在执行中,锁超时被释放了,则有可能会造成其他线程获取到锁,造业务故障。(不可取)
2)使用Redisson(Watch Dogs)实现锁续期
internalLockLeaseTime
:默认key的超时时间30s
锁续期定时任务间隔
:internalLockLeaseTime / 3
5,Redisson实现原理
1)原子性设计:
加锁、解锁、续约等都是通过Lua脚本中发送给redis
2)存储结构与加锁脚本
key为唯一id+threadId,值为调用次数
3)线程阻塞与解除阻塞
redisson_lock__channel:{my-redisson-key}
4)Watch Dog锁续期
leaseTime=-1(使用默认的加锁时间为 30s),才会生效
1.删除锁;重入锁则减1
2.使用redis广播锁释放消息,向redisson_lock__channel发送UNLOCK_MESSAGE
3.取消 Watch Dog。RedissonLock.EXPIRATION_RENEWAL_MAP中线程id删除,并取消掉netty的定时任务
5)redis分布式锁的最大缺陷
master-slave架构中,当A写入master成功后,在slave异步复制过程中,发生了主从切换,导致B在新的master上也能加锁成功。多个客户端加锁成功,可能会造成脏数据产生