订阅 Redis 的 key 过期事件实现动态定时任务

一、需求

  1. 设置了生存时间的Key,在过期时能不能有所提示?
  2. 如果能对过期Key有个监听,如何对过期Key进行一个回调处理?
  3. 如何使用 Redis 来实现定时任务?

比如:

  • 处理订单过期自动取消,12306 购票系统超过30分钟没有成功支付的订单会被回收处理;
  • 购买商品15天后默认好评;
  • 外卖系统的送餐超时提醒;
  • 客服与顾客聊天,客服超过多长时间没回复,系统给客服发一个提醒消息;
    ...

这里的定时任务并不是 Crontab 这种如 0 0 23 * * ? (每日23点执行) 定死多长时间执行一次的, 而是某种特定动作触发创建的一个多长时间后执行的任务。比如有100个 用户触发了这个动作,那么就会创建100个定时任务,并且这100个任务由于触发创建的时间不同,执行的时间也很可能不在同一时间。

二、思路

在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了。

Redis 的键空间通知支持 订阅指定 Key 的所有事件 与 订阅指定事件 两种方式。

Keyspace notifications are implemented sending two distinct type of events for every operation affecting the Redis data space. For instance a DEL operation targeting the key named mykey in database 0 will trigger the delivering of two messages, exactly equivalent to the following two PUBLISH commands:

PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey

通过 Redis 的键空间通知(keyspace notification)可以做到:下单时将订单 id 写入 redis,设置过期时间30分钟,利用 redis 键过期回调提醒,30分钟后可以在回调函数里检查订单状态,如果未支付,则进行处理。

三、实现

1. 修改 redis.conf 开启redis key过期提醒

By default keyspace events notifications are disabled because while not very sensible the feature uses some CPU power. Notifications are enabled using the notify-keyspace-events of redis.conf or via the CONFIG SET.

由于键空间通知比较耗CPU, 所以 Redis默认是关闭键空间事件通知的, 需要手动开启 notify-keyspace-events 后才启作用。

K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;        
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;        
g:一般性的,非特定类型的命令,比如del,expire,rename等;       
$:String 特定命令;        
l:List 特定命令;        
s:Set 特定命令;        
h:Hash 特定命令;        
z:Sorted 特定命令;        
x:过期事件,当某个键过期并删除时会产生该事件;        
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;        
A:g$lshzxe的别名,因此”AKE”意味着所有事件。

notify-keyspace-events Ex 表示开启键过期事件提醒

2. 继承 JedisPubSub 实现一个消息监听器类

@Component
public class RedisKeyExpiredListener extends JedisPubSub {

    private Logger logger = LoggerFactory.getLogger(RedisKeyExpiredListener.class);

    @Override
    public void onMessage(String channel, String message) {
    
       //message.toString()可以获取失效的key
      String expiredKey = message.toString();
      if(expiredKey.startsWith("key:prefix")){
            /**
             * TODO
             * 如果是自己想要监控的KEY, 则可以在这里处理业务
             */
        }
    }
}

由于每个key过期都会回调 onPMessage 方法, 所以不建议在 onPMessage 回调方法中直接处理业务, 这里可以通过 MQ 来做缓冲,在 onPMessage 中 把消息直接扔到 MQ 里, 然后再去监听队列消费消息处理具体的业务。

改进版如下:

@Component
public class RedisKeyExpiredListener extends JedisPubSub {

    private Logger logger = LoggerFactory.getLogger(RedisKeyExpiredListener.class);

    @Resource
    private ICommonsMqService commonsMqService;

    @Override
    public void onMessage(String channel, String message) {

        try {
            commonsMqService.sendSingleMessageAsync("REDIS_TIMEOUT_KEY_QUEUE", message);
            logger.info("发送支付超时MQ消息成功:{}",message);
        }catch (Exception e){
            logger.error("发送支付超时MQ消息失败:{}",e.toString());
        }
    }
}

3. 订阅指定 db 的过期事件

@Component
@Order(value = 4)
public class SubscriberRedisKeyTimeout implements CommandLineRunner {

    private Logger logger = LoggerFactory.getLogger(SubscriberRedisKeyTimeout.class);

    @Resource
    RedisKeyExpiredListener redisKeyExpiredListener;

    @Override
    public void run(String... args) throws Exception {

        JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 8005);
        Jedis jedis = pool.getResource();

        /**
         * 订阅线程:接收消息
         * 由于订阅者(subscriber)在进入订阅状态后会阻塞线程,
         * 因此新起一个线程(new Thread())作为订阅线程
         */
        new Thread(new Runnable() {
            public void run() {
                try {
                    logger.info("Subscribing. This thread will be blocked.");
                    //使用subscriber订阅 db0上的key过期事件消息,这一句之后,线程进入订阅模式,阻塞。
                     jedis.subscribe(redisKeyExpiredListener, "__keyevent@0__:expired");
     
                    //当unsubscribe()方法被调用时,才执行以下代码
                    logger.info("Subscription ended.");
                } catch (Exception e) {
                    logger.error("Subscribing failed.", e);
                }
            }
        }).start();

    }
}

4. 测试

public class TestJedisExpipreNotice {

    public static void main(String[] args) {
        JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 8005);
        Jedis jedis = pool.getResource();

        jedis.setex("REDIS:EXPIPRE:NOTICE:TEST",5, "测试键过期事件回调");

    }
}

5秒后控制台打印如下:

2019-03-26 17:35:44.248  INFO 20464 --- [ Thread-127] c.p.c.r.b.r.RedisKeyExpiredListener  : 发送聊天会话超时MQ消息成功:REDIS:EXPIPRE:NOTICE:TEST

四、 subscribe/psubscibe 的区别

Redis 提供了 publish 和 subscribe/psubscibe 指令来实现发布/订阅模型,发布和订阅的目标称为通道(channel)。 subscribe/psubscribe 了一个或多个通道的客户端,可以收到其他客户端向这个通道publish的消息。subscribe和psubscribe的区别是,前者指定具体的通道名称,而后者可以指定一个正则表达式,匹配这个表达式的通道都被订阅。

20190421001716.png

上图展示了一个带有频道和模式的例子, 其中 tweet.shop.* 模式匹配了 tweet.shop.kindle 频道和 tweet.shop.ipad 频道, 并且有不同的客户端分别用 psubscibe 订阅它们三个:当有信息发送到 tweet.shop.kindle 频道时, 信息除了发送给 clientX 和 clientY 之外, 还会发送给订阅 tweet.shop.* 模式的 client123 和 client256。

五、参考文献

[1]Redis Pub/Sub

[2]Redis Keyspace Notifications

[3]Redis的Pub/Sub模式

[4]Redis设计与实现第一版-订阅与发布

[5]Redis实践操作之—— keyspace notification(键空间通知

此文首发于我的个人博客:
https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容