springboot rabbitmq高可用消息确认消费实战

RabbitMQ的高可用主要体现在消息的发送、传输和接收的过程中,可以保证消息成功发送、不会丢失,以及被确认消费/不重复消费。

  • 对于消息是否发送成功,主要是针对生产者端的消息生产确认机制;
  • 对于消息不会丢失,主要是rabbitmq消息持久化机制;
  • 对于消息确认消费/不重复消费,主要是针对消费者端对消息的确认消费机制。
一、消息生产确认机制

对于消息是否发送成功,在rabbitmq自定义操作组件中可以统一设置消息生产确认相关逻辑rabbitTemplate.setConfirmCallback和rabbitTemplate.setReturnCallback。

@Slf4j
@Configuration
public class RabbitmqConfig {
    //自定义配置RabbitMQ发送消息的操作组件RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置“发送消息后进行确认”
        connectionFactory.setPublisherConfirms(true);
        //设置“发送消息后返回确认信息”
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件实例对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
        //发送消息后,如果发送失败,则输出“消息发送失败-消息丢失”的反馈信息
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
        //定义消息传输的格式为JSON字符串格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //最终返回RabbitMQ的操作组件实例RabbitTemplate
        return rabbitTemplate;
    }
}
二、消息持久化
  1. 在创建交换机和队列的时候,有个durable的参数,即是否持久化,如果设置为true,当rabbitmq服务器重启的时候,创建的交换机和队列均还存在着,不会丢失;
  2. 在发送消息的时候可以选择为该消息设置持久化,即消息体Message的deliveryMode设置为MessageDeliveryMode.PERSISTENT持久化,当消息来不及消费rabbitmq服务器重启,那么消息依旧存在,如果将所有消息都设置持久化,那么会影响性能,内存和磁盘的读写速度差异很大。
三、消息确认消费机制
  • 如何保证消息能够被准备消费、不重复消费,RabbitMQ提供了消息确认机制,即ACK模式。RabbitMQ的消息确认机制有3种,分别是NONE(无须确认)、AUTO(自动确认)和MANUAL(手动确认)。

  • 无须确认流程图如下图所示,对于该模式,消息是否消费成功生产者端是不知道的,存在可能重复消费/消息消费失败的情况:


    无需确认.jpeg
  • 代码目录如图所示,演示自动确认和手动确认:


    自动确认和手动确认.png

    对于设置ACK模式,可以在yaml配置文件中设置spring.rabbitmq.listener.simple.acknowledge-mode: xxx,也可以在声明的监听器Bean中设置,用简单监听器SimpleRabbitListenerContainerFactory即可:

@Slf4j
@Configuration
public class RabbitmqConfig {
    /**
     * 确认消费模式为自动确认机制-AUTO,采用直连传输directExchange消息模型
     */
    @Bean
    public SimpleRabbitListenerContainerFactory singleListenerContainerAuto(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        //确认消费模式为自动确认机制
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 确认消费模式为手动确认机制-MANUAL,采用直连传输directExchange消息模型
     */
    @Bean
    public SimpleRabbitListenerContainerFactory singleListenerContainerManual(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        //确认消费模式为自动确认机制
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

(1)自动确认模式
自动确认模式流程图如图所示,RabbitMQ内置组件通知生产者端,当消息成功消费/消费失败都会通知:


auto确认.jpeg

对于自动确认模式,在消费者端可以看到和普通的消息队列没什么区别,而手工确认消费模式则比较灵活。

  • 确认消费模式为自动确认机制-AUTO,采用直连传输directExchange消息模型-生产者
@Slf4j
@Component
public class AutoAckPublisher {
    //定义RabbitMQ消息操作组件RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     */
    public void sendMsg(Order order) {
        try {
            //设置交换机
            rabbitTemplate.setExchange(RabbitMqConstants.AUTO_ACKNOWLEDGE_EXCHANGE);
            //设置路由
            rabbitTemplate.setRoutingKey(RabbitMqConstants.AUTO_ACKNOWLEDGE_ROUTING_KEY);
            //发送消息
            rabbitTemplate.convertAndSend(order);
            log.info("确认消费模式为自动确认机制-消息模型DirectExchange-one-生产者-发送消息:{} ",order);
        }catch (Exception e){
            log.error("确认消费模式为自动确认机制-消息模型DirectExchange-one-生产者-发送消息:{},发生异常:{} ",order, e);
        }
    }
}
  • 确认消费模式为自动确认机制-AUTO,采用直连传输directExchange消息模型-消费者
@Slf4j
@Component
public class AutoAckConsumer {

    @RabbitListener(queues = RabbitMqConstants.AUTO_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerAuto")
    public void consumeMsg(Order order) {
        try {
            log.info("基于AUTO的自动确认消费模式-消费者监听消费消息-内容为:{} ",order);
        }catch (Exception e){
            log.error("基于AUTO的自动确认消费模式-消费者监听消费消息:{},发生异常:", order, e);
        }
    }
}

(2)手工确认流程图如图所示,当消息处理过程中出现异常的时候,需要手工确认处理该异常消息,该消息是否重新归入队列等处理。


manual确认.jpeg
  • 确认消费模式为手动确认机制-MANUAL,采用直连传输directExchange消息模型-生产者
@Slf4j
@Component
public class ManualAckPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     */
    public void sendMsg(Order order) {
        try {
            rabbitTemplate.setExchange(RabbitMqConstants.MANUAL_ACKNOWLEDGE_EXCHANGE);
            rabbitTemplate.setRoutingKey(RabbitMqConstants.MANUAL_ACKNOWLEDGE_ROUTING_KEY);
            rabbitTemplate.convertAndSend(order);
            log.info("确认消费模式为手动确认机制-消息模型DirectExchange-one-生产者-发送消息:{} ", order);
        }catch (Exception e){
            log.error("确认消费模式为手动确认机制-消息模型DirectExchange-one-生产者-发送消息:{},发生异常:{} ", order, e);
        }
    }
}
  • 确认消费模式为手动确认机制-MANUAL,采用直连传输directExchange消息模型-消费者
    在监听到消息并且消息成功处理完之后,通过basicAck来确认消息成功消费,当捕获到异常的时候即该消息处理失败的时候,有两种方式,一种是拒绝该消息并且消息重新归入队列中,另一种是拒绝该消息并且丢弃掉,一般情况下重新归入队列,还是会出现异常没法消费掉,除非把异常修复了才行,并且在未修复该异常的情况下,后面的消息会被堵塞住没办法消费,将消息重新归入队列中或许不是一个好的选择。
    一般情况下可以保留该消息的信息然后把消息丢弃掉,最后重新发送消息;或者把该消息丢入到死信队列中,不对该死信队列进行监听,最后在rabbitmq管理后台取出该消息/重新监听该消息重新发送到原先队列进行消费,修复好异常情况再发送消息进行处理,保证消息成功消费。
@Slf4j
@Component
public class ManualAckConsumer {
    @RabbitListener(queues = RabbitMqConstants.MANUAL_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerManual")
    public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
        try {
            log.info("基于MANUAL的手工确认消费模式-消费者监听消费消息,消息投递标记:{},内容为:{} ", tag, order);
            //抛异常,归入使得消息重新归入队列
            //int num = 1 / 0;
            //执行完业务逻辑后,手动进行确认消费,其中第一个参数为:消息的分发标识(全局唯一);第二个参数:是否允许批量确认消费
            channel.basicAck(tag, false);
        }catch (Exception e){
            //第二个参数reueue重新归入队列,true的话会重新归入队列,需要人为地处理此次异常消息,重新归入队列也会继续异常
            channel.basicReject(tag, true);
            log.error("基于MANUAL的手工确认消费模式-消费者监听消费消息:{},消息投递标签:{},发生异常:", order, tag, e);
        }
    }
}

出现异常重新归入队列的情况,如图所示,显示有unacked 1条消息,下面有get messages,当点击的时候发现提示queue is empty队列为空,确实准备消费的消息为0条,正在消费的消息一直是unacked状态无法取出。


unacked消息.png
取不出来.png

这个时候只能停止监听重启项目,这个在线上不是好的办法,停止监听之后消息变为ready状态,这个时候可以取出,可以看到提示“取出消息是毁灭性的操作”。


ready状态消息.png
取出消息.png

四种取出消息的模式,分别为:不确认消息重新归入队列、确认消息不重新归入队列、拒绝该消息重新归入队列、拒绝该消息不重新归入队列。当取出消息可以看到消息的内容。


取出消息模式.png

消息内容.png

对于确认消息消费,避免消息异常出现上述情况,可以用死信队列来处理,捕获异常消息,发送消息到死信队列,不监听该队列的消息,最后修复异常重新发送消息到原先队列进行消费,详情请看下篇博文

参考资料:
《分布式中间件实战》
《rabbitmq实战指南》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容