定义
正常的单机状态的,共享资源都是在通过一个数据库下的,可以在单机中进行加锁,保证共享数据的线程安全,分布式环境下,因为不是在同一虚拟机进程的,全局的某些唯一资源需要进行锁定,这时候就需要分布式锁。
现如今都是分布式系统,需要部署多台服务器,进行负载均衡。如图:
上图可以看到,变量A存在JVM1、JVM2、JVM3三个JVM内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象,例如:UserController控制器中的一个整形类型的成员变量),如果不加任何控制的话,变量A同时都会在JVM分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同JVM内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!
如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了很多并发处理相关的API。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。
为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
满足条件
1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁;
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
具体例子
-
交易订单锁定
需要处理防止重复下单。
解决业务层面的幂等问题
-
MQ消息消费的幂等性
发送的消息重复。
消息消费端去重。
比如手机提现,不能重复提现。
在用户对商品下单后,订单状态为待支付,在某一时刻用户正在对该订单做支付操作,商家正在进行改价操作??? 这时候,该状态需要做串行处理,避免出现数据错乱。
解决方式
1.基于数据库实现分布式锁;
创建一个表:
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
(2)想要执行某个方法,就使用这个方法名向表中插入数据
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!
使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:
1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
2.基于redis做分布式锁
为什么?
redis本身是单线程,唯一线程串行处理。
实现方式
Redis Setnx命令,在指定的key不存在时,为key设置指定的值.多个线程并发的请求去设置时,只有一个可以设置成功。其他的会返回失败。一般设置五秒钟。
//设置成功,返回1,设置失败,返回 0
Setnx KEY_NAME VALUE Expire Time
分析存在问题:
-
单点问题
单机模式,设置T1 T2两个线程.如果T1刚设置成功,单机挂了,重启,请求丢了,T2去请求再去拿锁,会获取不到,这时候会获取不到key。(因为分布式锁一般不考虑做持久化,所以这里不考虑持久化。)
主从模式,主从数据异步,会存在锁失效的问题,主服务器还未同步到从服务器,这时候主挂了,从服务器获取不到锁。
锁时间不可以控制,无法续租期
Redis本身建议:使用RedLock算法来保证,但是问题是需要至少三个Redis主从实例来完成,维护成本很高。这个等同于自己简单实现的一致性协议,细节繁琐,且容易出错。
是否能使用
业务场景来规定,在设计交易时,只能发一次交易请求,这时候不适合。如果是MQ消息消费场景,依次获取不到,可以在发送一次消息保证能被消费。
CAP问题
分布式锁,主要选择满足C P模型,而redis实现的主要满足AP模型。不太ok。
代码实现
使用命令介绍:
(1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
(2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
1
(3)delete
delete key:删除key
实现思想:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
/**
* 分布式锁的简单实现代码
* Created by liuyang on 2017/4/20.
*/
public class DistributedLock {
private final JedisPool jedisPool;
public DistributedLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* 加锁
* @param lockName 锁的key
* @param acquireTimeout 获取超时时间
* @param timeout 锁的超时时间
* @return 锁标识
*/
public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
Jedis conn = null;
String retIdentifier = null;
try {
// 获取连接
conn = jedisPool.getResource();
// 随机生成一个value
String identifier = UUID.randomUUID().toString();
// 锁名,即key值
String lockKey = "lock:" + lockName;
// 超时时间,上锁后超过此时间则自动释放锁
int lockExpire = (int) (timeout / 1000);
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
if (conn.setnx(lockKey, identifier) == 1) {
conn.expire(lockKey, lockExpire);
// 返回value值,用于释放锁时间确认
retIdentifier = identifier;
return retIdentifier;
}
// 返回-1代表key没有设置超时时间,为key设置一个超时时间
if (conn.ttl(lockKey) == -1) {
conn.expire(lockKey, lockExpire);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retIdentifier;
}
/**
* 释放锁
* @param lockName 锁的key
* @param identifier 释放锁的标识
* @return
*/
public boolean releaseLock(String lockName, String identifier) {
Jedis conn = null;
String lockKey = "lock:" + lockName;
boolean retFlag = false;
try {
conn = jedisPool.getResource();
while (true) {
// 监视lock,准备开始事务
conn.watch(lockKey);
// 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
if (identifier.equals(conn.get(lockKey))) {
Transaction transaction = conn.multi();
transaction.del(lockKey);
List<Object> results = transaction.exec();
if (results == null) {
continue;
}
retFlag = true;
}
conn.unwatch();
break;
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retFlag;
}
}
测试刚才实现的分布式锁
例子中使用50个线程模拟秒杀一个商品,使用–运算符来实现商品减少,从结果有序性就可以看出是否为加锁状态。
模拟秒杀服务,在其中配置了jedis线程池,在初始化的时候传给分布式锁,供其使用.
/**
* Created by liuyang on 2017/4/20.
*/
public class Service {
private static JedisPool pool = null;
private DistributedLock lock = new DistributedLock(pool);
int n = 500;
static {
JedisPoolConfig config = new JedisPoolConfig();
// 设置最大连接数
config.setMaxTotal(200);
// 设置最大空闲数
config.setMaxIdle(8);
// 设置最大等待时间
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
config.setTestOnBorrow(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
}
public void seckill() {
// 返回锁的value值,供释放锁时候进行判断
String identifier = lock.lockWithTimeout("resource", 5000, 1000);
System.out.println(Thread.currentThread().getName() + "获得了锁");
System.out.println(--n);
lock.releaseLock("resource", identifier);
}
}
模拟线程进行秒杀服务
public class ThreadA extends Thread {
private Service service;
public ThreadA(Service service) {
this.service = service;
}
@Override
public void run() {
service.seckill();
}
}
//这里推荐使用 countDownLatch
public class Test {
public static void main(String[] args) {
Service service = new Service();
for (int i = 0; i < 50; i++) {
ThreadA threadA = new ThreadA(service);
threadA.start();
}
}
}
3.基于zookeeper
ZooKeeper是一个为分布式应用提供一致性服务的开源组件,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能有一个唯一文件名。基于ZooKeeper实现分布式锁的步骤如下:
(1)创建一个目录mylock;
(2)线程A想获取锁就在mylock目录下创建临时顺序节点;
(3)获取mylock目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁;
(4)线程B获取所有节点,判断自己不是最小节点,设置监听比自己次小的节点;
(5)线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是不是最小的节点,如果是则获得锁。
这里推荐一个Apache的开源库Curator,它是一个ZooKeeper客户端,Curator提供的InterProcessMutex是分布式锁的实现,acquire方法用于获取锁,release方法用于释放锁。
优点:具备高可用、可重入、阻塞锁特性,可解决失效死锁问题。
缺点:因为需要频繁的创建和删除节点,性能上不如Redis方式。
具体代码可以看这篇文章:
https://blog.csdn.net/qiangcuo6087/article/details/79067136
自己设计一个分布式锁
设计的目标
- 强一致性
- 服务高可用、系统稳健
- 锁自动续约及其自动释放
- 代码高度抽象业务接入极简
- 可视化管理凭他、监控及管理
对存储模型进行选型
N+1 代表部署奇数个
由于redis实现无法保证一致性,zookeeper对锁实现使用创建临时节点和watch机制,执行效率,扩展能力、社区活跃度等方面低于etcd,所以我们会选择基于etcd实现。
etcd优势
- 简单KV(key Value)
- 强一致性
- 高可用
- 无单点
- 数据可靠性
- 持久化
整体方案
分布式Client + etcd
Client TTL模式
Server TTL模式
拿锁的时候,选择key,ttl是超时时间,value可以忽略,uuid为该锁的唯一凭证,后面对锁的操作都是对uuid做操作。需要uuid才能做操作。etcd会保证只有一个线程能拿到锁。
使用场景1.申请锁
使用场景2.申请锁,锁已经被占用
使用场景3.锁的清理
业务接入
JDK7以上,建议9.
获取锁实例:
释放锁示例
兼容性考虑
ETCD恢复/版本
分布式锁的特殊场景
特殊场景一:
分布式锁只是在同一自然时间的互斥锁,本省不解决幂等性问题。
接入业务需要完善从获得锁到释放锁中间的数据幂等逻辑。
特殊场景二: 锁没有按照日期续约
心跳续约没有成功
马上启动GC,GCs时间太长
特殊场景三: etcd内部协调发生问题
Leader节点挂了,选主中,
raft日志数据同步发生错误或者不一致问题。
待续。。。
部分摘自: