常见秒杀方案设计:
1.数据库行锁
2.分布式锁+分段锁提升效率
3.Redis单线程机制,将库存放在Redis里面使用
set count 1000
decrby count 1 扣减库存,返回正数就可扣减库存
4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本里面去执行
这是一个原子操作,解决高并发下线程安全问题
总结:简单利用redis的LUA脚本功能,一次性操作,实现原子性
Redis+Lua实现高并发秒杀功能
1、导入相关依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
2、RedisConfig Bean初始化配置
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.redisson.config.SingleServerConfig;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
final Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer()
.setAddress("redis://" + host + ":" + port);
if (StringUtils.isNotBlank(password)) {
singleServerConfig.setPassword(password);
}
System.out.println("------------- redisson -----------------------");
System.out.println(config.getTransportMode());
return Redisson.create(config);
}
/**
* 重写Redis序列化方式,使用Json方式:
* 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的
* RedisTemplate默认使用的是JdkSerializationRedisSerializer
* StringRedisTemplate默认使用的是StringRedisSerializer
* <p>
* Spring Data JPA为我们提供了下面的Serializer:
* GenericToStringSerializer、Jackson2JsonRedisSerializer
* JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
* 在此我们将自己配置RedisTemplate并定义Serializer
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
// 设置键(key)的序列化采用StringRedisSerializer。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
3、Redis+Lua脚本实现秒杀扣减库存
public interface IStockCallback {
/**
* 扣减Redis库存
*
* @param batchNo 商品唯一编号
* @param expire 过期时间
* @param num 扣减库存数量
* @return 剩余库存数量
*/
long getStock(String batchNo, long expire, int num);
/**
* 初始化库存
*
* @param commodityId 业务
* @return 库存
*/
int initStock(long commodityId);
}
下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取实际的量,
LUA脚本保证原子性,查询剩余库存和扣减逻辑是一个原子性操作
@Slf4j
@Service
public class RedisStockService implements IStockCallback {
/**
* 库存还未初始化
*/
public static final long UNINITIALIZED_STOCK = -3L;
/**
* 判断商品是否存在KEY标识
*/
public static final long EXIST_FLAG = -2L;
/**
* 配置库存Redis缓存Key前缀
*/
public static final String REDIS_KEY = "REDIS_KEY:STOCK:";
/**
* 执行扣库存的Lua脚本
*/
public static final String STOCK_LUA;
/**
* Redisson 客户端
*/
@Resource
private RedissonClient redissonClient;
/**
* Redis 客户端
*/
@Resource
private RedisTemplate<String, Integer> redisTemplate;
static {
/*
* @desc 扣减库存Lua脚本
* 库存(stock)-1:表示不限库存
* 库存(stock) 0:表示没有库存
* 库存(stock)大于0:表示剩余库存
*
* @params 库存key
* @return
* -3:库存未初始化
* -2:库存不足
* -1:不限库存
* 大于等于0: 剩余库存(扣减之后剩余的库存), 直接返回-1
*/
final StringBuilder strBuilder = new StringBuilder();
strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then");
strBuilder.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
strBuilder.append(" local num = tonumber(ARGV[1]);");
strBuilder.append(" if (stock == -1) then");
strBuilder.append(" return -1;");
strBuilder.append(" end;");
strBuilder.append(" if (stock >= num) then");
strBuilder.append(" return redis.call('incrby', KEYS[1], 0 - num);");
strBuilder.append(" end;");
strBuilder.append(" return -2;");
strBuilder.append("end;");
strBuilder.append("return -3;");
STOCK_LUA = strBuilder.toString();
}
/**
* 执行扣减库存业务
*
* @param batchNo 库存唯一标识
* @param expire 库存过期时间
* @param num 扣减库存的数量
* @return 返回扣减库存后剩余库存数量
*/
@Override
public long getStock(String batchNo, long expire, int num) {
// 商品库存唯一标识
final String key = REDIS_KEY + batchNo;
/*
* 从redis中获取key对应的过期时间;
* 1、如果该值有过期时间,就返回相应的过期时间;
* 2、如果该值没有设置过期时间,就返回-1;
* 3、如果没有该值,就返回-2;
*
* 注意:这里为了方便模拟,实际线上。通过缓存预热的方式通过DB查询实际的库存数据
* 添加到Redis中
*/
Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key);
if (Objects.equals(EXIST_FLAG, expire1)) {
redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS);
System.out.println("Redis无初始库存,设置库存数据 = " + expire1);
}
// 初始化商品库存
Integer stock = redisTemplate.opsForValue().get(key);
// 设置分布式锁
final RLock rLock = redissonClient.getLock(REDIS_KEY + ":LOCK");
try {
if (rLock.tryLock(1, TimeUnit.SECONDS)) {
stock = redisTemplate.opsForValue().get(key);
log.info("--- 当前Key:[{}]加锁成功,当前最新库存:{}---", key, stock);
// 调一次扣库存的操作
Long stock1 = stock(key, num);
System.out.println("stock1 = " + stock1);
stock = redisTemplate.opsForValue().get(key);
int batchNoLock = Objects.requireNonNull(stock);
log.info("--- 当前剩余库存:{}", batchNoLock);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (rLock != null && rLock.isHeldByCurrentThread()) {
rLock.unlock();
}
}
return stock;
}
/**
* 扣库存这步特别注意,分布式连接有问题,需要依赖包里,去掉lettuce组件
* 初始化库存数量,这个可以从DB里取实际的量
*
* @param key 库存key
* @param num 扣减库存数量
* @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
*/
private Long stock(String key, int num) {
// 脚本里的KEYS参数
List<String> keys = new ArrayList<>();
keys.add(key);
// 脚本里的ARGV参数
List<String> argvList = new ArrayList<>();
argvList.add(Integer.toString(num));
// 执行扣减库存LUA脚本
return redisTemplate.execute((RedisCallback<Long>) connection -> {
Object nativeConnection = connection.getNativeConnection();
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList);
}
// 单机模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList);
}
return UNINITIALIZED_STOCK;
});
}
/**
* 获取初始的库存
* 初始化库存数量,这个可以从DB里取实际的量
*
* @param commodityId 业务ID
* @return 初始库存
*/
@Override
public int initStock(long commodityId) {
// TODO 这里做一些初始化库存的操作
return 30;
}
}
3、调用接口并发Controller,测试分布式库存扣减
@Resource
private RedisStockService redisStockService;
/**
* 下单扣减库存
*
* @param stockDTO 下单请求参数
* @return 扣减库存结果
*/
@PostMapping("/buyProductCreateOrder")
public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) {
try {
return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum());
} catch (Exception e) {
return e.getMessage();
}
}