Redis 延迟任务队列

背景

在业务发展过程中,会出现一些需要延时处理的场景,比如:

a.订单下单之后超过30分钟用户未支付,需要取消订单

b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论

c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。

处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。

使用 Redis 的列表结构可以实现执行一种任务的FIFO队列,也可以实现通过调用不同回调函数的来执行多重不同的任务队列,乃至可以是实现简单的优先级队列,当然也可以实现延时队列。

延时队列的基本实现有3类:

  • 在任务信息中包含任务的执行时间,工作进程发现任务时间未到,短暂的等待之后,将任务重新推入队列里面。
  • 使用一个任务列表记录所需要的执行的任务,并在每次进行 while循环的时候,扫描检查列表并执行已经到期的任务。
  • 把所所有需要执行的任务都添加到有序集合里面,并将任务执行的时间设置分值。再是有个一个额外的进程来查询有序集合里面是否有可以执行的任务,如果有,将任务从有序集合里面移除,并将任务推进适当的任务队列。

无论是短暂的等待,还是将任务从入队列,都是已经很好资源的事情,多以通常不会采用第一种方法。如果在本地维护一个任务列表,可以能会导致任务丢失,除非对任务进行持久化。其次,通过不断的扫描别表,查找合适的任务,每次都需要循环遍历,也是件浪费资源的事情,所以第二种方法也不可取。最后,采用有序结合保存任务、执行时间作为排序的依据是最简单最直接的做法。采用执行时间排序,不需要每次遍历整个队列,只需要判断队首的元素是否到了可执行时间即可。其次,只需要一个工作进程。再者,可以使用 “分布式锁”机制将任务从有序集合中个移动到任务队列。这样处理,语义简单,逻辑清晰。
Redis 的有序集合天生就适合做这件事。

功能特性
  • 消息可靠性,消息持久化,消息至少被消费一次
  • 实时性:存在一定的时间误差(定时任务间隔)
  • 支持指定消息remove
  • 高可用性
整体结构
801190-20171202153439526-1334143734.png
  • Messages Pool所有的延时消息存放,结构为KV结构,key为消息ID,value为一个具体的message(这里选择Redis Hash结构主要是因为hash结构能存储较大的数据量,数据较多时候会进行渐进式rehash扩容,并且对于HSET和HGET命令来说时间复杂度都是O(1))
  • Delayed Queue是16个有序队列(队列支持水平扩展),结构为ZSET,value为messages pool中消息ID,score为过期时间(分为多个队列是为了提高扫描的速度)
  • Timed Task定时任务,负责扫描处理每个队列过期消息
流程
801190-20171202153600120-587720598.png
redis客户端演示
127.0.0.1:6379> zadd task_set 1 task1
(integer) 1
127.0.0.1:6379> zadd task_set 2 task2
(integer) 1
127.0.0.1:6379> zadd task_set 3 task3
(integer) 1
127.0.0.1:6379> zadd task_set 4 task4
(integer) 1
127.0.0.1:6379> ZRANGE task_set 0 10 WITHSCORES
1) "task1"
2) "1"
3) "task2"
4) "2"
5) "task3"
6) "3"
7) "task4"
8) "4"
Java 模拟代码:
package me.touch.redis;

import java.util.Set;
import java.util.UUID;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Tuple;

/**
 * 延时队列
 * @author Knight-Ran
 *
 */
public class delayQueue {
    private Jedis jedis;
    private JedisPool pool;
    private static final String QUEUE_NAME = "deplay_queue";
    
    @Before
    public void setUp() {
        pool = new JedisPool(new JedisPoolConfig(), "localhost");
        jedis = pool.getResource();
    }

    @After
    public void after() {
        jedis.close();
        pool.destroy();
    }
    
    
    // 模拟任务处理队列
    public static void addToTaskQue(String taskInfo){
        System.out.println(taskInfo+"已经从延时队列中转至队列"+ "当前时间:"+ System.currentTimeMillis() );
        System.out.println();
    }
        
    public void addToDeplayQueue(Task task){
        System.out.println(task.toString()+ "已经加入延时队列");
        jedis.zadd(QUEUE_NAME, task.getTime(), task.toString());
    }
    
    public void transferFromDelayQueue() throws InterruptedException{
        while(true){
            Set<Tuple> item = jedis.zrangeWithScores(QUEUE_NAME, 0, 0);
            if(item != null && !item.isEmpty()){
                Tuple tuple = item.iterator().next();
                if(System.currentTimeMillis() >= tuple.getScore()){
                    // TODO 获取锁
                    jedis.zrem(QUEUE_NAME, tuple.getElement()); // 从延时队列中移除
                    addToTaskQue(tuple.getElement()); //任务推入延时队列,因为这里只是延时
                    // TODO 释放锁
                }
            }
            
            Thread.sleep(100);
            
        }
    }
    
    @Test
    public void test() throws InterruptedException{
         long now = System.currentTimeMillis();
         Task task = new Task(UUID.randomUUID().toString(), now+10*1000, 10*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+20*1000, 20*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+30*1000, 30*1000+"后执行");
         addToDeplayQueue(task);
         task = new Task(UUID.randomUUID().toString(), now+40*1000, 40*1000+"后执行");
         transferFromDelayQueue();
         
    }
    
    static class Task{
        // 任务id
        private String id ;
        // 任务执行时间
        private long time;
        // 描述
        private String desc;
        
        public Task(String id, long time, String desc){
            this.id = id ;
            this.time = time;
            this.desc = desc;
        }
        
        public String getId() {
            return id;
        }
        public long getTime() {
            return time;
        }
        public String getDesc() {
            return desc;
        }

        @Override
        public String toString() {
            return "Task [id=" + id + ", time=" + time + ", desc=" + desc + "]";
        }
    }
}
测试结果:
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经加入延时队列
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经加入延时队列
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经加入延时队列
Task [id=441a900e-a4a5-44cc-bddc-117bb3f00130, time=1502006961460, desc=10000后执行]已经从延时队列中转至队列当前时间:1502006961481
Task [id=9982a932-3c29-4e3c-a940-5c3beb5b55c2, time=1502006971460, desc=20000后执行]已经从延时队列中转至队列当前时间:1502006971518
Task [id=adfdfdff-b8b0-440d-b85e-06c3432b0094, time=1502006981460, desc=30000后执行]已经从延时队列中转至队

此方式为1.0方案,后续需要把task计划任务修改为redis队列监听或队列元素到期回调

参考链接:https://www.jianshu.com/p/63d5c42299f9
作者:非典型程序员
來源:简书

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342