rabbit mq 学习

% rabbitMQ learn
% qijun
% 19/01/2018

mq 的一些概念

  • mq: mq 是一个message broker (消息中介)
  • AMQP (Advanced Message Queue ) 一个标准的消息队列标准
  • RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现

rabbit mq 的一些概念

rabbit mq 的适用场景架构图

image.png
  • Client A &Client B 为消息的producer 消息由payload 和 label 组成,label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer
  • client 1 & client2 & client3 消息的consumer, 消息的接受者 接收到的消息是去除label 的消息,紧包含消息的内容,消费者通过订阅队列获取消息。
  • 中间是的 rabbit server 由 交换器,routingKey 和queue 组成,交换器和queue 通过routingKey 绑定,消息通过交换器和routingKey 路由到相应的queue
  • Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。
  • Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

四种交换器

由上面可知,消息通过交换器,通过对应的routekey 路由到queue, 交换器的类型一共有三种

  1. direct 如果 routing key 匹配, 那么Message就会被传递到相应的queue中
  2. fanout 广播到所有绑定的queue(假设你有一个消息需要发送给a和b,如果现在还需要发送给c,使用fanout 交换器,只需要在c的代码中创建一个队列,然后绑定到fanout 交换器即可)
  3. topic 对key进行模式匹配,比如ab.1,ab.2都可以传递到所有routingkey 为ab.*的queue
    基于topic类型交换器的routing key不是唯一的,而是一系列词,基于点区分。
    例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
    binding key也是。*表示只匹配一个关键字 #可以匹配0或者多个关键字。
    比如*.a.b的队列接受1.a.b 或者2.a.b等等
  4. header header交换器和 direct几乎一样,性能更差,基本不会用到

匿名交换器(默认)

事实上,你在代码中不创建交换器也是可以通过rabbit mq 发送消息的,因为rabbit 提供了默认的交换器。

image.png

如图中空白字符串名字的交换器为默认的交换器,类型为direct
本质上所有的消息发送都要送往exchange(可以没有队列,但不能没有交换机,没有队列时消息直接被丢弃)。
RabbitMQ提供了一种直接向Queue发送消息的快捷方法:直接使用未命名的exchange,不用绑定routing_key,直接用它指定队列名。

  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 参数1 exchange :交换器
        // 参数2 routingKey : 路由键
        // 参数3 props : 消息的其他参数
        // 参数4 body : 消息体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消息的确认和拒绝

使用ack确认Message的正确传递
默认情况下,如果Message 已经被某个Consumer正确的接收到了,那么该Message就会被从queue中移除。当然也可以让同一个Message发送到很多的Consumer
如果一个queue没被任何的Consumer Subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有Consumer时,这个数据会被立即发送到这个Consumer,这个数据被Consumer正确收到时,这个数据就被从queue中删除。
那么什么是正确收到呢?通过ack。每个Message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。
如果在收到数据后处理数据时程序发生错误,无法正确处理数据,而是被reject。reject 参数设为true时RabbitMQ Server会把这个信息发送到下一个Consumer,设为false也可以从队列中把这条消息删除。
如果这个app有bug,忘记了ack,那么RabbitMQ Server不会再发送数据给它,因为Server认为这个Consumer处理能力有限。
而且ack的机制可以起到限流的作用(Benefitto throttling):在Consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance Consumer的load。

在什么地方创建queue

Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

那么谁应该负责创建这个queue呢?是Consumer,还是Producer?

如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。
queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。

VirtualHost

在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
在RabbitMQ中无法通过AMQP创建VirtualHost,可以通过以下命令来创建。
rabbitmqctl add_vhost [vhostname]

windows下如何安装rabbit mq

  1. rabbit mq 运行于erlang之上,需要先安装erlang http://www.erlang.org/downloads 下载,并使用管理员运行安装
  2. 安装rabbit mq https://www.rabbitmq.com/download.html
  3. 新增环境变量 ERLANG_HOME= C:\Program Files\erl9.2
    RABBITMQ_SERVER = C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.2
    配置环境变量
    Path=%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin
  4. 替换 erlang cookie
    拷贝C:\WINDOWS 下的.erlang.cookie (还有可能在C:\Windows\System32\config\systemprofile)文件替换 C:\Users%USERNAME%.erlang.cookie 或者 C:\Documents and
    Settings%USERNAME%.erlang.cookie
  5. 通过startMenu 启动erlang 服务 和停止 rabbit mq 可以以服务的方式和按进程的方式启动,建议使用服务方式启动,然后在rabbit mq的命令行(RabbitMQ Command Prompt 开始菜单中) 执行 rabbitmq-plugins enable rabbitmq_management
    最后就可以通过 http://localhost:15672/ 账号guest 密码guest 访问rabbit mq的控制台 /是默认的VirtualHost

常用命令

停止 broker
查询 broker 状态 rabbitmqctl status
更多的命令请查阅 https://www.rabbitmq.com/man/rabbitmqctl.8.html

实战

下面会通过两个例子,演示如何使用rabbitmq,第一个原生的java api 使用direct 交换器演示 routing,第二个例子使用topic 交换器。spring mvc,spring boot 中的注解和接口本质上是对原生接口的包装,spring 会隐藏一些操作,对理解rabbit mq的工作流程会造成阻碍,先使用原生api做演示一般的工作流程,而后结合springboot 演示在项目中如何使用rabbit mq。

rabbitmq 消费者和生产者两端的在处理消息时经历的步骤

  1. 创建连接工厂ConnectionFactory
  2. 通过连接获取通信通道Channel
  3. 声明交换机Exchange(可选)
  4. 申明队列(可选)
  5. 绑定交换机和队列(可选)
    之后生产者通过channel发送消息,消费者获取并处理消息

rabbitmq comsumer 消息获取方式

rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message,
Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息。

使用原生rabbitmq api 的例子

代码发送三种类型的日志到交换器,交换器通过routingkey 分发到不同的queue

maven 依赖

   <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>

消息发送

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 发送消息
        for (int i = 0; i < 10; i++)  {
            int rand = new Random().nextInt(3);
            String severity  = LOG_LEVEL_ARR[rand];
            String message = "Qijun-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
            // 发布消息至交换器
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

消息接收

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 设置日志级别
        int rand = new Random().nextInt(3);

        // 创建三个非持久的、唯一的、自动删除的队列,分别接收不同的日志信息
        String debugQueueName = channel.queueDeclare().getQueue();
        String InfoQueueName = channel.queueDeclare().getQueue();
        String ErrorQueueName = channel.queueDeclare().getQueue();
        // 绑定交换器和队列
        // queueBind(String queue, String exchange, String routingKey)
        // 参数1 queue :队列名
        // 参数2 exchange :交换器名
        // 参数3 routingKey :路由键名
        channel.queueBind(debugQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[0]);
        channel.queueBind(InfoQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[1]);
        channel.queueBind(ErrorQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[2]);

        // rabbit mq 消息的推送支持poll 也支持订阅,先创建一个poll 方式的comsumer
        QueueingConsumer pollConsumer = new QueueingConsumer(channel);
        channel.basicConsume(ErrorQueueName, true, pollConsumer);

        // 创建订阅类型的消费者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "' from "+envelope.getRoutingKey()+ " by subscribe" );
            }
        };
        channel.basicConsume(debugQueueName, true, consumer);
        channel.basicConsume(InfoQueueName, true, consumer);

        // 通过 循环poll 获取队列中的所有消息  
        while (true) {
            QueueingConsumer.Delivery delivery = null;
            try {
                delivery = pollConsumer.nextDelivery();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();


            System.out.println("Received '" + message + "' from "+routingKey +" by poll");
        }

    }
}

源码

springboot 中使用rabbit mq 的例子

maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

ConnectionFactory配置

// 项目中可通过配置文件读取来获取 connect 参数
 @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost("localhost");
        cachingConnectionFactory.setPort(5672);
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

CachingConnectionFactory 内部通过com.rabbitmq.client.ConnectionFactory 去设置 connect的参数

public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware {
    private static final String BAD_URI = "setUri() was passed an invalid URI; it is ignored";
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;

通过 RabbitAdmin 配置队列,交换机和binding

   public static final String  ROUTER_KEY_1 = "*.orange.*";
 @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
       //申明一个 一个topic类型的交换机,routingkey 使用通配符
        TopicExchange topicExchange =(TopicExchange)ExchangeBuilder.topicExchange(QUEUE_EXCHANGE_NAME).durable(true).build();
        rabbitAdmin.declareExchange(topicExchange);
        Queue firstQueue = new Queue(QUEUE_NAME);
        rabbitAdmin.declareQueue(firstQueue);
        rabbitAdmin.declareBinding(BindingBuilder.bind(firstQueue).to(topicExchange).with(ROUTER_KEY_1));
        return rabbitAdmin;
    }

消息消费的两种方法(推荐使用第二种,更灵活)

  1. 通过SimpleMessageListenerContainer 绑定特定的messageListener
@Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receive2");
    }
 @Bean
    SimpleMessageListenerContainer container(MessageListenerAdapter messageListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(messageListenerAdapter);
        return container;
    }
@Service
public class Receiver {

    public void receiveMessage(String message) {
        System.out.println("Received<" + message + ">");
    }

    public void receive2(String in) throws InterruptedException {
        System.out.println("in message"+in);
    }
}
  1. 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
  @Bean
    public SimpleRabbitListenerContainerFactory myContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置了每个消费者再不回ack的情况下最大可接收消息的条数
        factory.setPrefetchCount(100);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
/**
 * @author 祁军
 * 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
 */
@Service
public class Receiver1 {
    @RabbitListener(queues = "${rabbitConfiguration.queue}", containerFactory = "myContainerFactory")
    public void processMessage(String msg){
        System.out.println("Receiver1 got message" + msg);
    }
}

sender

@Service
public class Sender {
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public Sender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void send() {
       // 发送两次routing key不同 由于 是topic exchange routing key 为通配符可达到同一队列
        System.out.println("sender is sending message");
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.bbb", "hello,world1");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.ccc", "hello,world2");
    }
}

测试

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitMQTest {

    @Autowired
    private Sender sender;

    @Test
    public void send() throws Exception {
        sender.send();
    }

}

源码

rabbit mq 的其他应用场景

working queue

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程来完成。应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务。

image.png

RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

image.png

RabbitMQ中实现RPC的机制是:

  1. 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
  2. 服务器端收到消息并处理
  3. 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
  4. 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

rabbitmq 消息的可靠性

  1. 发送端的comfirm 机制,通过注册回调,我们可以知道消息是否已经发送到exchange 或者queue,如果没有正确发送,我们可以通过replycode来判断进行后续什么操作,然后根据业务场景
    比如发送告警,或者重发来应对。
  2. 消息的持久化,通过交换机,队列和消息的持久化来实现
  3. rabbitmq 从queue 发消息给消费者,如果消费者选择no ack 则queue每发一条消息,rabbitmq 就会把消息删除,如果cosumer 由于某种问题消费消息出错,rabbitmq也会把消息删除。
    我们需要在comsumer 关闭自动ack,使用basic ack 手工应答保证消息被正确消费,如果消费失败,basic nack 可以删除队列消息或者重新入原队列,可能导致死循环
    如果不希望把有问题的消息删除或者重新入原来的队列,可以指定一个死信队列,错误的消息重新入死信对列,然后再次被消费。

发送端的ack

rabbitmq提供了确认ack机制,可以用来确认消息是否到broker 或者queue。

/**confirmcallback用来确认消息是否到达broker*/     
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        //log error
    } else {
        //maybe delete msg in db
    }
});
 /**若消息不能正确的达到指定的队列会调用 */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
    log.info("send message failed: " + replyCode + " " + replyText);
    // resend message
   
});

消息的持久化

// 交换机的持久化
// 参数1 name :交互器名
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
new TopicExchange(name, durable, autoDelete)

// 队列是持久化
// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
new Queue(name, durable, exclusive, autoDelete);

springAMQP  的消息持久化是默认的

消费者端的手工确认

如果一直不回ack,mq会block 这个消费者

      @Bean
    SimpleMessageListenerContainer container() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueNames(QUEUE_NAME);
        //设定单次可分发给消费则的消息个数
        container.setPrefetchCount(1);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                try {
                    log.info("receive msg: " + new String(body));
                    //do something
                } catch (Exception e) {
                } finally {
//                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                }

            }

        });
        return container;
    }

springAMQP 提供的确认方式

很明显上述代码提供的手工确认方式(使用ChannelAwareMessageListener)很不优雅,你需要创建多个bean 然后绑定queue。
当setDefaultRequeueRejected(true) (默认情况下),如果消息被正常消费,container 会ack,然后队列删除消息,如果消费者抛出异常,container会reject这个消息,然后这个消息会requeue到原来的消息队列,如果业务一直处在这个异常情况下,requeue的消息会再次回到消费者,然后死循环,这种情况很显然不行,spring AMQP 提供的替代方式:listener抛出AmqpRejectAndDontRequeueException,则这个消息会被抛弃,或者进入死信队列,Listener抛出AmqpRejectAndDontRequeueException还可以通过配置factory 的ErrorHandler 把你抛出的异常 转换为AmqpRejectAndDontRequeueException,如下式例,如果你的listener 抛出了XMLException 则这个消息会被discard(在没有配置死信队列的情况下)。

factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof XMLException));

factory.setDefaultRequeueRejected(false); 则只要listener 抛出异常,message就会被discard或者转入死信队列,如果需要针对不同的异常(比如可短时间内恢复的异常,需要重入原队列,不可恢复的异常discard 或者入死信队列)建议设置成true,然后配置ErrorHandler 如上

springAMQP 如何配置死信队列

当然你可以通过创建一个死信队列,然后在listener端消费时重新发送到死信队列,但springAMQP 提供了更好的方式如下

@Bean
TopicExchange exchange()
{
    return new TopicExchange(DEFAULT_EXCHANGE);
}

@Bean
Queue deadLetterQueue()
{
    return new Queue(DEAD_LETTER_QUEUE,true);
}

@Bean
Queue queue()
{
    // 通过args参数为当前队列绑定一个死信队列
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE);
    return new Queue(WORKORDER_QUEUE,true,false,false,args);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange)
{
    return BindingBuilder.bind(queue).to(exchange).with(WORKORDER_QUEUE);
}

@Bean
Binding bindingDeadLetter(Queue deadLetterQueue, TopicExchange exchange)
{
    return BindingBuilder.bind(deadLetterQueue).to(exchange).with(DEAD_LETTER_QUEUE);
}



消费者抛出AmqpRejectAndDontRequeueException 异常时则会进入死信队列


  @RabbitListener(queues = RabbitConfig.WORKORDER_QUEUE)
    public void processMessage(String msg) throws Exception
    {
        
            throw new AmqpRejectAndDontRequeueException("to dead-letter");
        
    }

死信队列的消费者

@Service
public class ErrorHandler {
    @RabbitListener(queues = "dead_queue", containerFactory = "myContainerFactory")
    public void handleError(Object message){
        System.out.println("XXXXXXX"+message);
    }
}

其他高级主题

rabbit mq的消息确认机制(包括producer到broker 和broker 到 consumer的确认),集群等等。

参考

https://www.rabbitmq.com/getstarted.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/spring-amqp
https://docs.spring.io/spring-amqp/reference/html/
http://blog.720ui.com/2017/springboot_06_mq_rabbitmq/
http://www.cnblogs.com/xingzc/p/5945030.html
https://www.cnblogs.com/diegodu/p/4971586.html
http://blog.csdn.net/column/details/rabbitmq.html
http://blog.csdn.net/u013256816/article/category/6532725/1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339