由于具体业务场景的需求,需要保证数据在分布式环境下的正确更新,所以研究了一下Java中分布式锁的实现。
Java分布式锁的实现方式主要有以下三种:
- 数据库实现的乐观锁
- Redis实现的分布式锁
- Zookeeper实现的分布式锁
其中,较常用的是前两种方式,但是数据库实现方式需要较多的数据库操作,所以最终选择的是用Redis实现分布式锁。
最初考虑分布式锁的数据安全性的时候,只考虑到两点。第一,Redis锁需要有一个超时时间,这样即便某个持有锁的节点挂了,也不到导致其他节点死锁,保证每个锁有一个UniqueId;第二,每个锁需要有一个UniqueId,确保当一个线程执行完一个任务去释放锁的时候释放的一定是自己的锁,否则可能存在一种场景,就是一个线程释放锁的时候,它的锁可能已经超时被释放了,而因为缺少一个UniqueId,它却释放了另一个线程的锁
基于以上两点的考虑,分别设计了获取锁和释放锁的api。
public interface DistributionLockService {
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* @param expireTime expire time of lock(MILLISECONDS)
* @return the result of get lock
* */
boolean getLock(String lockName, String uniqueCode, int expireTime);
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* */
void releaseLock(String lockName, String uniqueCode);
}
具体的实现代码如下:
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* @param expireTime expire time of lock(MILLISECONDS)
* @return the result of get lock
* */
@Override
public boolean getLock(String lockName, String uniqueCode, int expireTime) {
boolean isLock = false;
try {
Long result = jedis.setnx(lockName, uniqueCode);
isLock = result == 1 ? true : false;
if (isLock) {
jedis.expire(lockName, expireTime);
}
} catch (Exception e){
logger.error("DistributionLockService/getLock", e);
}
return isLock;
}
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* */
@Override
public void releaseLock(String lockName, String uniqueCode) {
try {
String tag = jedis.get(getKey(lockName));
if (tag != null && tag.equals(uniqueCode))
jedis.del(getKey(lockName));
} catch (Exception e) {
logger.error("DistributionLockService/releaseLock", e);
}
}
上述的代码用setnx+expire实现分布式锁。调用setnx,当传入的key未被占用时,就在redis中插入一条该key的记录,返回值为1,此时为其设置超时时间。而当这个key在redis中已有记录时,则不会重新插入记录,这样的话,便可以实现分布式锁的基本功能。且为其设置过期时间,并加入UniqueId的check,避免了上述提及的两个问题。
但是,上述代码仍然存在问题,就是忽略了操作的原子性。获取锁的时候,调sexnx方法与设置超时时间expire不是原子操作,如果在sexnx方法执行成功后,节点突然down掉,没有执行expire方法,而之后的释放锁操作也没有执行,那么这个节点便会长期持有锁,尽管这种可能性很小,但是依然存在死锁的风险。为了避免这种风险,修正代码如下:
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final String IS_LOCKED = "OK";
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* @param expireTime expire time of lock
* @return the result of get lock
* */
@Override
public boolean getLock(String lockName, String uniqueCode, int expireTime) {
boolean isLock = false;
try {
String result = jedis.set(lockName, uniqueCode, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
isLock = IS_LOCKED.equalsIgnoreCase(result) ? true : false;
} catch (Exception e){
logger.error("DistributionLockService/getLock", e);
}
return isLock;
}
Redis较高的版本中,有一个有五个参数的set方法,其中前两个参数就是key和value,最后一个参数是过期时间,中间两个参数表示setnx和setex,实际上就是一个可以设置过期时间的setnx方法。这个方法可以保证加锁和设置过期时间两者是作为一个请求传送到Redis服务器的,所以不会出现上述的死锁场景。
加锁的问题解决了,解锁的问题依然在。上述的解锁代码中,在解锁之前先验证了UniqueId,然后采用del方法来释放锁,但是由于get和del是两次请求,而不是一个原子操作,所以这之间仍存在并发的问题。若做check的时候,检查得到确实是这个锁的UniqueId,但是在执行del方法之前,这个锁已经超时,然后新的线程也已经获取到锁了,那么del删掉的锁,便不是自己的锁,而是下一个线程的锁。
Redis中没有直接的api处理这个问题。解决这个问题,需要使用lua脚本,来确保整个操作的原子性。代码如下:
/**
* @param lockName the name of the lock
* @param uniqueCode uniqueCode for the lock
* */
@Override
public void releaseLock(String lockName, String uniqueCode) {
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"else return 0 end";
jedis.eval(script, Arrays.asList(lockName), Arrays.asList(uniqueCode));
} catch (Exception e) {
logger.error("DistributionLockService/releaseLock", e);
}
}
jedis的eval方法支持执行lua脚本方法,所以便可利用这个方法来实现释放锁的原子操作,具体逻辑和之前的代码其实是一致的,但是由于是原子操作,所以可以避免上文中存在的问题。
至此,简单Redis锁的实现便算是成功了。但是其中依然存在许多问题,如果Redis不是单机的,而是集群分布的,那么其中的数据同步该怎么做?在有些较看重数据的正确性的场景中,即使Redis锁超时,只要检测到机器仍在正常运行Redis锁就不应该被释放,而应该被续期,这些,都是redis锁在更复杂的场景中所需要考虑的。留待以后继续研究。