Redis实现高并发扣减库存,秒杀功能(可线上使用)

常见秒杀方案设计:
1.数据库行锁
2.分布式锁+分段锁提升效率
3.Redis单线程机制,将库存放在Redis里面使用
set count 1000
decrby count 1 扣减库存,返回正数就可扣减库存
4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本里面去执行
这是一个原子操作,解决高并发下线程安全问题
总结:简单利用redis的LUA脚本功能,一次性操作,实现原子性

Redis+Lua实现高并发秒杀功能

1、导入相关依赖
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.4</version>
        </dependency>

2、RedisConfig Bean初始化配置

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.redisson.config.SingleServerConfig;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        final Config config = new Config();

        SingleServerConfig singleServerConfig = config.useSingleServer()
                .setAddress("redis://" + host + ":" + port);
        if (StringUtils.isNotBlank(password)) {
            singleServerConfig.setPassword(password);
        }
        System.out.println("------------- redisson -----------------------");
        System.out.println(config.getTransportMode());
        return Redisson.create(config);
    }


    /**
     * 重写Redis序列化方式,使用Json方式:
     * 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的
     * RedisTemplate默认使用的是JdkSerializationRedisSerializer
     * StringRedisTemplate默认使用的是StringRedisSerializer
     * <p>
     * Spring Data JPA为我们提供了下面的Serializer:
     * GenericToStringSerializer、Jackson2JsonRedisSerializer
     * JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
     * 在此我们将自己配置RedisTemplate并定义Serializer
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<Object>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 设置键(key)的序列化采用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}
3、Redis+Lua脚本实现秒杀扣减库存
public interface IStockCallback {

    /**
     * 扣减Redis库存
     *
     * @param batchNo 商品唯一编号
     * @param expire  过期时间
     * @param num     扣减库存数量
     * @return 剩余库存数量
     */
    long getStock(String batchNo, long expire, int num);

    /**
     * 初始化库存
     *
     * @param commodityId 业务
     * @return 库存
     */
    int initStock(long commodityId);
}

下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取实际的量,
LUA脚本保证原子性,查询剩余库存和扣减逻辑是一个原子性操作

@Slf4j
@Service
public class RedisStockService implements IStockCallback {

    /**
     * 库存还未初始化
     */
    public static final long UNINITIALIZED_STOCK = -3L;
    /**
     * 判断商品是否存在KEY标识
     */
    public static final long EXIST_FLAG = -2L;
    /**
     * 配置库存Redis缓存Key前缀
     */
    public static final String REDIS_KEY = "REDIS_KEY:STOCK:";
    /**
     * 执行扣库存的Lua脚本
     */
    public static final String STOCK_LUA;
    /**
     * Redisson 客户端
     */
    @Resource
    private RedissonClient redissonClient;
    /**
     * Redis 客户端
     */
    @Resource
    private RedisTemplate<String, Integer> redisTemplate;

    static {
        /*
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock) 0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *      -3:库存未初始化
         *      -2:库存不足
         *      -1:不限库存
         *      大于等于0: 剩余库存(扣减之后剩余的库存), 直接返回-1
         */
        final StringBuilder strBuilder = new StringBuilder();
        strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then");
        strBuilder.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        strBuilder.append("    local num = tonumber(ARGV[1]);");
        strBuilder.append("    if (stock == -1) then");
        strBuilder.append("        return -1;");
        strBuilder.append("    end;");
        strBuilder.append("    if (stock >= num) then");
        strBuilder.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        strBuilder.append("    end;");
        strBuilder.append("    return -2;");
        strBuilder.append("end;");
        strBuilder.append("return -3;");
        STOCK_LUA = strBuilder.toString();
    }

    /**
     * 执行扣减库存业务
     *
     * @param batchNo 库存唯一标识
     * @param expire  库存过期时间
     * @param num     扣减库存的数量
     * @return 返回扣减库存后剩余库存数量
     */
    @Override
    public long getStock(String batchNo, long expire, int num) {
        // 商品库存唯一标识
        final String key = REDIS_KEY + batchNo;

        /*
         * 从redis中获取key对应的过期时间;
         * 1、如果该值有过期时间,就返回相应的过期时间;
         * 2、如果该值没有设置过期时间,就返回-1;
         * 3、如果没有该值,就返回-2;
         *
         * 注意:这里为了方便模拟,实际线上。通过缓存预热的方式通过DB查询实际的库存数据
         * 添加到Redis中
         */
        Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key);
        if (Objects.equals(EXIST_FLAG, expire1)) {
            redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS);
            System.out.println("Redis无初始库存,设置库存数据 = " + expire1);
        }

        // 初始化商品库存
        Integer stock = redisTemplate.opsForValue().get(key);

        // 设置分布式锁
        final RLock rLock = redissonClient.getLock(REDIS_KEY + ":LOCK");
        try {
            if (rLock.tryLock(1, TimeUnit.SECONDS)) {
                stock = redisTemplate.opsForValue().get(key);
                log.info("--- 当前Key:[{}]加锁成功,当前最新库存:{}---", key, stock);
                // 调一次扣库存的操作
                Long stock1 = stock(key, num);
                System.out.println("stock1 = " + stock1);

                stock = redisTemplate.opsForValue().get(key);
                int batchNoLock = Objects.requireNonNull(stock);
                log.info("--- 当前剩余库存:{}", batchNoLock);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            if (rLock != null && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }
        return stock;
    }

    /**
     * 扣库存这步特别注意,分布式连接有问题,需要依赖包里,去掉lettuce组件
     * 初始化库存数量,这个可以从DB里取实际的量
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);

        // 脚本里的ARGV参数
        List<String> argvList = new ArrayList<>();
        argvList.add(Integer.toString(num));

        // 执行扣减库存LUA脚本
        return redisTemplate.execute((RedisCallback<Long>) connection -> {
            Object nativeConnection = connection.getNativeConnection();
            // 集群模式
            if (nativeConnection instanceof JedisCluster) {
                return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList);
            }
            // 单机模式
            else if (nativeConnection instanceof Jedis) {
                return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList);
            }
            return UNINITIALIZED_STOCK;
        });
    }

    /**
     * 获取初始的库存
     * 初始化库存数量,这个可以从DB里取实际的量
     *
     * @param commodityId 业务ID
     * @return 初始库存
     */
    @Override
    public int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 30;
    }
}
3、调用接口并发Controller,测试分布式库存扣减
   @Resource
    private RedisStockService redisStockService;

    /**
     * 下单扣减库存
     *
     * @param stockDTO 下单请求参数
     * @return 扣减库存结果
     */
    @PostMapping("/buyProductCreateOrder")
    public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) {
        try {
            return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum());
        } catch (Exception e) {
            return e.getMessage();
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容