背景
功能描述:会议开始前五分钟通过消息提醒参会人员。
为什么用redis:其实算是一个前期调研的失误,之前一直使用的是阿里的RocketMQ,后面公司把RocketMQ做了升级,升级到了5.x版本。但是官方并未提供对应的node版sdk。当然再使用旧版本或者其他消息队列服务都是不现实的。只能通过一写其他方案来实现。
预选方案:当时调研了三四种方案,比作了优缺点对比,最终综合考量选择了redis key的方式。
方案一:最简单的利用
node-schedule
库,去指定一个时间执行任务,该方案任务只存在内存之中,一旦服务重启,之前的定时任务就会消失。这显然是不合适的。方案二:把任务存到数据库中,然后在代码中轮询去查库,查询到指定时间范围的执行就去执行。但是该方案会有时间误差,主要看轮询频率。
方案三:通过redis的key过期事件监听实现定时任务,代码中提前监听好事件,当key过期时就会执行监听函数。该方案既没有明显的时间误差(具体特别小的误差有没有并未调研),也不用担心服务重启的问题。因为任务相当于是存在redis中的。
代码实现
接下来我们就用redis key的方式来实现一下定时任务:
首先需要运维同事帮忙设置一下redis的配置项
notify-keyspace-event Ex
,这里设置成Ex即可,E表示键事件通知,x表示过期事件(默认情况下收不到通知)。想了解其他值可以自行扩展。-
在项目启动后,创建redis监听。
先创建一个工厂函数用于生成redis clientconst redis = require('redis'); async function initRedisClient(){ return await redis.createClient({ url: 'redis://xxxx' }); }
事件监听:
async function subRedisKey(){ // 创建一个用于监听事件的client let redisSubClient = initRedisClient() await redisSubClient.connect() // 订阅数据库0的key expired事件 redisSubClient.subscribe('__keyevent@0__:expired', (e) => { // e就是过期的key,去做自己的业务逻辑 } }
-
在创建会议的时候通过设置一个key,并设置其过期时间为会议开始前5分钟即可。需要注意的是key种需要包含需要使用到的数据和唯一性标识,因为过期后就拿不到value中的数据了,所以不能存在value中。同时还要根据key知道对应的会议是哪个。
async function createMeeting(data){ // 创建一个用于创建key的client const redisClient = initRedisClient(); await redisClient.connect() // key自己定义,value可以随意设置 redisClient.PSETEX(`redis_key_${data.id}__${data}`, 指定时间毫秒值, '') }
当然,如果会议开始时间发生了修改或者会议被删除,也可以删除对应的key来清除没用的任务。
redisClient.DEL(key)
分布式问题解决
通过一顿敲代码,功能算是实现了,但是还有一个重要的问题。就是在分布式情况下,会出现多次通知的情况。因为分布式是同时启动了多个服务。然后redis key的过期事件相当于是广播式的。多个服务监听了事件,就会多个服务同时触发事件监听。就出现了多次通知的问题。
为了解决该问题,一般情况下就会想到利用锁的性质。接下来尝试查到的第一个方法:GETSET
。该方法第一次执行,因为之前没有set过,所以会返回null。一但设置了之后再执行就会返回设置的值。
监听函数中:
const value = await redisClient.GETSET('key', 1);
// 如果返回值是1就表示某一台服务器已经执行过,直接退出。反之,执行通知操作。
if(value){
return;;
}
// todo 执行通知
测试之后发现,该方法并不行,两台服务GETSET得到的值都是null,依旧都会通知。接下来尝试第二个方法INCR
。该方法会将key的值增加一,如果key不存在则会先初始化为0再增一。
监听函数中:
const value = await redisClient.INCR('key');
// 如果返回值大于1就表示某一台服务器已经执行过,直接退出。反之,执行通知操作。
if(value > 1){
return;;
}
// todo 执行通知
测试结果:该方法可行。任务完成!