基于Redis实现分布式定时任务

目录

1. 技术背景
2. 设计思想
3. 总结
常见问题
附录

1. 技术背景

1.1. Redis Keyspace Notifications

Redis 2.8.0+开始Redis提供了Keyspace Notifications[^1]特性; 这一特性使得客户端可以通过发布/订阅来接收redis影响数据集相关事件, 例如:

  • 新建KEY
  • 对KEY执行了LPUSH操作
  • KEY过期

1.1.1 配置

由于该特性会新增CPU消耗, keyspance events notifications是默认关闭的, 可通过修改redis.conf或CONFIG SET 配置notify-keyspace-events来开启,

K     Keyspace events, published with __keyspace@__ prefix.
E     Keyevent events, published with __keyevent@__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
A     Alias for g$lshzxe, so that the "AKE" string means all the events.

配置中至少需要出现K/E, 否则将不会接收到任何事件, 如果配置为KEA则会接收到任何可能的事件。

#  specify at least one of K or E, no events will be delivered.
notify-keyspace-events "KEA"

注意: Redis的发布/订阅阅后即焚是不支持持久化的, 故如果客户端断开重连则在这期间的消息将丢失!

1.1.2 测试

订阅事件

s1.vm.net:6379> PSUBSCRIBE __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1

过期一个KEY

SET foo val EX 10

收到通知

1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "a"

1.1.3 RedisKeyExpiredEvent

网上实际有很多其他方案, 在spring-data-redis中已提供了对上面特性的实现只是很少有人介绍到, 我推荐使用以下方案, 则每当有KEY失效则以下listener会收到消息:

public @Bean ApplicationListener redisKeyExpiredEventListener() {
        return event -> {
            System.out.println(String.format("A Received expire event for key=%s with value %s.", new String(event.getSource()), event.getValue()));
        }
}

实现原理是在org.springframework.data.redis.listener.KeyExpirationEventMessageListener中订阅事件__keyevent@*__:expired如下:


public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
        ApplicationEventPublisherAware {

  private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

    @Override
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
    }
  ... 
}

1.2 Distributed Locks

有多种方式去实现分布式锁, 关于使用Redis做分布式锁我推荐大家可以看看附录[^2]官方的文章, 里面详细介绍了官方推荐的正确的实现方式。

1.2.1 RedisLockRegistry

Spring Integration[^3]中从4.0开始就提供了一种基于redis的分布式锁实现RedisLockRegistry, 可用过用obtain方法直接获取到java.util.concurrent.locks.Lock也很简单:

// 1\. 创建对象
public @Bean RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {         return new RedisLockRegistry(connectionFactory, "Foo-API"); 
}

@Autowired
private RedisLockRegistry redisLockRegistry;

// 并发方法
public void foo() {
    java.util.concurrent.locks.Lock lock = null;
    try {
        lock = redisLockRegistry.obtain(DistributedLockService.createLockKey(trigger));
        if (!lock.tryLock()) {
            // 未获取到锁
            return;
        }
        // 已成功获取到分布式锁
    } finally {
         // Unlock safely
         if (lock != null) try { lock.unlock(); } catch (Exception e) { /* NOTHING */ }
    }
}

1.2.3 java.util.concurrent.locks.Lock

根据实际的需求选择使用tryLock/lock来实现我们的具体场景, java中对该对象定义如下:

public interface Lock {
    /**
     * Acquires the lock.
     *
     * If the lock is not available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until the
     * lock has been acquired.
     */
    void lock();

    /**
     * Acquires the lock unless the current thread is
     * {@linkplain Thread#interrupt interrupted}.
     *
     * Acquires the lock if it is available and returns immediately.
     *
     * If the lock is not available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     *
     * 
     * The lock is acquired by the current thread; or
     * Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of lock acquisition is supported.
     * 
     *
     * If the current thread:
     * 
     * has its interrupted status set on entry to this method; or
     * is {@linkplain Thread#interrupt interrupted} while acquiring the
     * lock, and interruption of lock acquisition is supported,
     * 
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is
     *         interrupted while acquiring the lock (and interruption
     *         of lock acquisition is supported)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Acquires the lock only if it is free at the time of invocation.
     *
     * Acquires the lock if it is available and returns immediately
     * with the value {@code true}.
     * If the lock is not available then this method will return
     * immediately with the value {@code false}.
     *
     * A typical usage idiom for this method would be:
     *   {@code
     * Lock lock = ...;
     * if (lock.tryLock()) {
     *   try {
     *     // manipulate protected state
     *   } finally {
     *     lock.unlock();
     *   }
     * } else {
     *   // perform alternative actions
     * }}
     *
     * This usage ensures that the lock is unlocked if it was acquired, and
     * doesn't try to unlock if the lock was not acquired.
     *
     * @return {@code true} if the lock was acquired and
     *         {@code false} otherwise
     */
    boolean tryLock();

    /**
     * Acquires the lock if it is free within the given waiting time and the
     * current thread has not been {@linkplain Thread#interrupt interrupted}.
     *
     * If the lock is available this method returns immediately
     * with the value {@code true}.
     * If the lock is not available then
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     * 
     * The lock is acquired by the current thread; or
     * Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of lock acquisition is supported; or
     * The specified waiting time elapses
     * 
     *
     * If the lock is acquired then the value {@code true} is returned.
     *
     * If the current thread:
     * 
     * has its interrupted status set on entry to this method; or
     * is {@linkplain Thread#interrupt interrupted} while acquiring
     * the lock, and interruption of lock acquisition is supported,
     * 
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.
     * If the time is
     * less than or equal to zero, the method will not wait at all.
     *
     * @param time the maximum time to wait for the lock
     * @param unit the time unit of the {@code time} argument
     * @return {@code true} if the lock was acquired and {@code false}
     *         if the waiting time elapsed before the lock was acquired
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while acquiring the lock (and interruption of lock
     *         acquisition is supported)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * Releases the lock.
     */
    void unlock();

    ...
}

2. 设计思想

流程图

2.1 任务管理

定义任务管理服务, 用于受理其他服务程序通过RPC/DB/MQ等任务创建指令, 该服务根据任务等元数据(META DATA)判断任务是需要立即执行或是延时执行。

  • 立即执行 - 立即把任务交接给任务执行立即开始执行。
  • 延时执行 - 将任务数据存入Redis并设置TTL = (执行时间 - 当前时间)

2.2 执行任务

根据不同等任务数据调用不用等任务具体实方法去执行任务, 例如执行一条SQL、执行一个RPC调用等, 执行成功则任务调度完成, 执行不成功则根据任务元数据(META DATA)来控制任务执行情况, 例如可约定以下数据:

RETRY_INTERVAL = 3000 # 任务失败重试间隔
MAX_RETRIES = 3 # 任务失败最大重试次数

当任务执行失败且还满足可执行条件, 则根据配置RETRY_INTERVAL将任务数据放入Redis并设置TTL = RETRY_INTERVAL, 则任务则会在TTL之后重新被执行。

根据前面技术背景中提到当Redis现有当特性, 以及前面我们根据KEY的TTL来控制任务的执行, 则收到KEY过期事件即代表任务达到执行时间了; 但在分布式环境中, 多个JVM会同时监听到KEY过期, 为了防止任务重复执行, 所以在可执行任务前需要再结合分布式锁获取到锁的JVM方可执行任务, 否则直接忽略该事件, 因为其他JVM已经执行了该任务。

3. 总结

本文描述的方案主要结合了Redis两大特性:

  • Keyspace Notifications[^1]
  • 基于Redis的分布式锁

来实现来分布式任务调度, 都基于Redis来实现, 较大程度发挥了其自身优势, 相较于quartz[^4]更加轻量级。

常见问题

  • KEY过期没有触发失效事件
    检查redis中notify-keyspace-events配置情况, 或者直接通过redis-cli连接到redis执行MONITOR指令观察消息情况。

附录

[1] Redis Keyspace Notifications
https://redis.io/topics/notifications
[2] Distributed locks with Redis
https://redis.io/topics/distlock
[3] Spring Integration
https://spring.io/projects/spring-integration
[4] Quartz Enterprise Job Scheduler
http://www.quartz-scheduler.org/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335