Rabbitmq业务流程包含容错排查

流程是这样的,订阅者,发送消息到test交换机,通过route key 分发到绑定的队列,这里涉及到交换机的类型,可以看我上一篇文章。如果没有匹配到这个routeKey就默认发送到AE交换机(fanout模式),这个交换机要设置internal:true意为内部交换机 。AE交换机再把错误的消息,发送到其绑定的队列中,如果test交换机,发送消息被匹配到的队里中,而处理该队列的订阅者,拒绝了或者超时了处理,test交换机就将该消息发送到就死信交换机,然后到死信队列中

一、 进入死信队列(进入死信的三种方式)

  • 1.消息被拒绝(basic.reject or basic.nack)并且requeue=false
  • 2.消息TTL过期
  • 3.队列达到最大长度

代码演示

- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
    - true   发送给下一个消费者
    - false  谁都不接受,从队列中删除

Rabbit设置

  • 1.设置AE交换机 设置为内部交换机,模式为fanout
    当发送到正常交换机消息,没有被匹配到route key的消息对进到改交换机
 FanoutExchange fanoutExchange=new FanoutExchange("alter");
fanoutExchange.setInternal(true);//设置为内部交换机,作为处理了非法的消息,无法匹配到route key的消息
- 为AE交换机绑定队列 `alter_message`
  • 2.设置处理正常的交换机 test

绑定参数,设置没有匹配 route key 的消息发送到AE交换机 alternate-exchange

  • 3.添加正常的队列

    • hello 测试处理正常逻辑

    • task_queue 模拟被拒绝的消息
      添加超时时间和死信交换机和rk

    x-dead-letter-exchange: dead_letter_exchange

    x-dead-letter-routing-key: task_queue.fail

    x-message-ttl: 600

  • 4.设置死信交换机 dead_letter_exchange

    • 另外创建死信队列 dead
    • 绑定 route key task_queue.fail
死信交换机
模拟死信队列

代码实例 Python

import pika

#认证,生产者
credentials = pika.PlainCredentials('guest', 'guest')


#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,'/',credentials))

#通过tcp协议获取一个连接
channel = connection.channel()

#声明一个对下列和贾环加
#channel.queue_declare(queue='hello')


#被hello接受了
channel.basic_publish(exchange='test',
                  routing_key='hello',
                  body='Hello World!')

#发送了一个没有匹配的消息,匹配到了alter_message
channel.basic_publish(exchange='test',
                  routing_key='hello12312',
                  body='Hello World!')

#模拟一条虽然能被匹配到,但是无法消费的消息,然后被发送到死信队列消息
channel.basic_publish(exchange='test',
                  routing_key='task_queue',
                  body='Hello World!')


  • 正常队列


  • 没有匹配到的到


  • 被拒绝或者超时进入私信队列的


使用代码去创建队列和交换机 Java

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
        connectionFactory.setUsername("liuxin");
        connectionFactory.setPassword("930914lx");
        connectionFactory.setVirtualHost("az");
        connectionFactory.setPublisherConfirms(true); // 必须要设置回调

        Channel channel = connectionFactory.createConnection().createChannel(false);

        //String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("internal",true);
        //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        //设置AE交换机
        channel.exchangeDeclare("alter", "fanout", false, false, false, arguments);
        channel.queueDeclare("alter_message", false, false, false, null);
        channel.queueBind("alter_message", "alter", "");
        
        //声明死信交换机并绑定
        channel.exchangeDeclare("dead_letter_exchange", "direct", false, false, null);
        channel.queueDeclare("dead", false, false, false, null);
        channel.queueBind("dead", "dead_letter_exchange", "task_queue.fail");


        arguments = new HashMap<>();
        arguments.put("alternate-exchange", "alter");//指定AE交换机
        channel.exchangeDeclare("test", "direct", false, false, arguments);
        //声明接受正式的队列,不需要参数
        channel.queueDeclare("hello", false, false, false, null);
        channel.queueBind("hello", "test", "hello");

        arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dead_letter_exchange");
        arguments.put("x-dead-letter-routing-key", "task_queue.fail");
        arguments.put("x-message-ttl",6000);//6s没有被处理,就死了
        //设置测试死信队列的task_queue,推送该队列里面,被拒绝会到dead_letter_exchange,并最终到dead,routeKey,task_queue.fail 为并设置死信队列参数
        channel.queueDeclare("task_queue", false, false, false, arguments);
        channel.queueBind("task_queue", "test", "task_queue");

        return connectionFactory;
    }
    
    
    
    
      /**
     * 接受消息的监听,这个监听客户交易流水的消息
     * 针对消费者配置
     *
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, PayMentConsumeImpl transactionConsume) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.addQueueNames("hello");
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(8);
        container.setConcurrentConsumers(4);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认,当设置了此模式,必须返回ACK,否则会进入死信队列
        container.setMessageListener(transactionConsume);
        container.setPrefetchCount(1000);
        return container;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • 为了一些初学习者更好理解我就从简单的解释一下Rabbitmq的原理吧​,首先你可以这样想RabbitMq就是一个队...
    螃蟹和骆驼先生Yvan阅读 7,368评论 6 4
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,435评论 0 12
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,335评论 2 34
  • 背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就...
    wooyoo阅读 3,442评论 0 17