使用RabbitMQ实现未支付订单在30分钟后自动过期

延迟队列可以实现消息在投递到Exchange之后,经过一定的时间之后再投递到相应的Queue。再被消费者监听消费。
即:生产者投递的消息经过一段时间之后再被消费者消费。

  • 业务场景:订单在30分钟内还未支付则自动取消。

该业务的其他实现方案:

  • 使用Redis,设置过期时间,监听过期事件。
  • 使用RabbitMQ的过期队列与死信队列,设置消息的存活时间,在设置的时间内未被消费,即会投递到死信队列,我们监听死信队列即可。可参考上一篇文章RabbitMQ死信队列在SpringBoot中的使用

使用RabbitMQ延迟队列实现:

# 安装延迟队列插件:

plugin
  • 重启RabbitMQ

# 业务相关代码编写

  • 订单实体(仅保留相关字段)
OrderEntity
  • 订单状态枚举(仅保留相关状态)
image.png
  • OrderMapper
/**
 * @author futao
 * @date 2020/4/14.
 */
public interface OrderMapper extends BaseMapper<Order> {
}
  • 模拟下定的接口OrderController
    为了简单起见,省略了Service层.
OrderController

# RabbitMQ相关代码编写

  • 配置文件
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: delay-vh
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 手动签收ACK
        acknowledge-mode: manual

app:
  rabbitmq:
    # 延迟时长设置
    delay:
      order: 10S
    # 队列定义
    queue:
      order:
        delay: order-delay-queue
    # 交换机定义
    exchange:
      order:
        delay: order-delay-exchange
  • 延迟交换机,队列定义与绑定

/**
 * 队列,交换机定义与绑定
 * 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
 *
 * @author futao
 * @date 2020/4/10.
 */
@Configuration
public class Declare {

    /**
     * 订单队列 - 接收延迟投递的订单
     *
     * @param orderQueue 订单队列名称
     * @return
     */
    @Bean
    public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {
        return QueueBuilder
                .durable(orderQueue)
                .build();
    }

    /**
     * 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列
     *
     * @param orderExchange 订单延迟交换机
     * @return
     */
    @Bean
    public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {
        Map<String, Object> args = new HashMap<>(1);
        args.put("x-delayed-type", "topic");
        return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);
    }

    /**
     * 订单队列-交换机 绑定
     *
     * @param orderQueue         订单队列
     * @param orderDelayExchange 订单交换机
     * @return
     */
    @Bean
    public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {
        return BindingBuilder
                .bind(orderDelayQueue)
                .to(orderDelayExchange)
                .with("order.delay.*")
                .noargs();
    }
}

可以看出队列就是普通的队列。重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type,值为交换机类型,可以是fanout,topic,direct。并且设置交换机的type为x-delayed-message

  • 定义完成后可以启动SpringBoot应用程序,在RabbitMQ管理后台查看Exchange和Queue。
delay-exchange

可以看到,除了默认的交换机,SpringBoot已经帮我们创建好了延迟交换机order-delay-exchange,并且此时messages delayed为0,因为我们还未向交换机投递消息。

  • 可以继续查看交换机的路由类型与绑定的队列
ExchangeDetail
  • 队列为普通的队列


    Queue
  • 回到代码中,定义消息生产者

Orderender

在消息投递之前为每条消息都设置了延迟时长setDelay()
调用消费者的代码在上面OrderController中,下定之后,订单数据落库,并且向MQ中投递延迟消息。可以回头看看。

  • 消费者-监听过期的订单信息,并且将DB中相应的订单设置为已过期。
OrderConsumer

为了方便查看到延迟投递的效果,我在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。

# 测试

  • 把订单过期时长设置为10S
app:
  rabbitmq:
    delay:
      order: 10S
  • 下定
log
DB100

可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe的订单在2020-04-14 22:22:04.307进行了投递,此时数据库中该订单的状态为100,待支付。

  • 此时查看Exchange详情可以发现,messages delayed:1,即目前有一条消息处于延迟状态。
ExchangeDetail
  • 等待10S后。
log

可以看到OrderConsumer在10S后2020-04-14 22:22:14.320接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe的订单消息。距离投递时间2020-04-14 22:22:04.307为10S。此时查看DB中订单状态:

DB

订单状态为200已过期,且过期时间为2020-04-14 22:22:14

  • 达到了订单在我们指定的时间后过期。

  • 再测试几条一分钟的场景
app:
  rabbitmq:
    delay:
      order: 1M
Exchange
log

消息都在延迟1分钟后投递到了队列-消费者。

建议收藏,当然我只是建议。



# 严重风险提示:

在实际业务使用中,如果消费者的消费能力比较低下,会存在已经过期的消息阻塞积压在队列,无法在指定的时间内过期,导致业务出现异常。
实际上,按照我们业务意图,队里Queue里是不应该有大量消息存在的,因为投递到过期队列的消息已经是过期了的,应该立即被消费掉。

  • 进行测试:
    为了降低消费者的消费能力,进行如下处理:
    1. 设置消费者的最大并发数为1,并进行手动签收。
    listener:
      simple:
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        acknowledge-mode: manual

app:
  rabbitmq:
    delay:
      # 订单过期时间为1分钟
      order: 1M
  1. 消费者在处理消息时休眠5S


    sleep
  2. 向MQ投递两条消息,预期两条消息都在1分钟后正常过期。

  3. 结果(去除了无关信息):

result
2020-04-15 20:18:05.269  OrderSender       : 订单[d6fd965b11f8db0fafb762d305db83b0]投递到MQ
2020-04-15 20:18:05.765  OrderSender       : 订单[77ceb7f1bfbbcaf627224ac75e96b0e5]投递到MQ

2020-04-15 20:19:05.279  OrderConsumer     : 消费者接收到延迟订单[d6fd965b11f8db0fafb762d305db83b0]
2020-04-15 20:19:15.316  OrderConsumer     : 订单业务处理结束.....进行消息ack签收

2020-04-15 20:19:15.318  OrderConsumer     : 消费者接收到延迟订单[77ceb7f1bfbbcaf627224ac75e96b0e5]
2020-04-15 20:19:25.330  OrderConsumer     : 订单业务处理结束.....进行消息ack签收

第一个订单d6fd965b11f8db0fafb762d305db83b0投递时间为2020-04-15 20:18:05.269。1分钟后2020-04-15 20:19:05.279接收到了通知,并且处理了10S后进行了签收ack。
第二个订单77ceb7f1bfbbcaf627224ac75e96b0e5投递时间为2020-04-15 20:18:05.765。1分钟过后并没有收到通知,而是在第一个订单处理完毕之后,2020-04-15 20:19:15.318才收到了通知,比预期的时间长了10秒,实际延迟时间为1分钟+10秒。出现了业务异常。

  • 导致这个问题的原因就是消费者无法及时消费消息并更新订单状态。所以我们在进行开发时,需要考虑实际的数据量大小,消费者消费能力。及时关注队列消息积压情况,灵活调整消费者并发数量,优化消费者代码,提高消费者消费能力。

# 系列文章

任何技术的使用都不可生搬硬套,需要结合自己实际的业务场景进行相应的调整优化。在平时的工作中应该多关注程序在实际的运行过程中的结果是否符合我们的预期

本文涉及的源代码:https://github.com/FutaoSmile/springboot-learn-integration/releases/tag/v_rabbitmq_delay_queue

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容