从不知道到了解—RabbitMQ 基础概念及 Spring 的配置和使用

序言

你在系统中是否写过这样的接口:客户端访问服务器,服务器进行了大量逻辑/耗时操作之后,才能将结果返回给客户端,而这时,客户端的连接或许已经因为超时而关闭了。为了能够及时的给客户端返回数据, 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。RabbitMQ 用 Erlang 语言编写,支持多种消息协议,消息队列,传送确认,可以将消息灵活的路由到队列,并支持多种交换类型。同时也可部署为负载均衡的集群,实现高可用性和吞吐量,联跨多个可用性区域和地区。RabbitMQ 支持 Java, .NET, PHP, Python, JavaScript, Ruby, Go 等多种语言,方便开发者使用。

基础介绍

在使用 RabbitMQ 之前,需要了解一些 RabbitMQ 的基础知识,这些基础知识最好还是要了解的,否则在面对 Spring 中配置文件的时候会有很多疑惑。

Queue

MQ 就是 Message Queuing,Queue 就是队列了。在 RabbitMQ 中,Queue 是最主要的地方,Message Queuing 就是存储 Message 的队列。生产者 Producer(以下简写为 p)将消息投递到 Message Queuing(以下简写为 q)中,消费者(以下简写为 c)从 queue 中获取消息进行处理。

如上图中所画(图中缺少 exchange,下面会说到)的那样,多个 p 将消息交给了 q,然后就不关心系统怎么处理这些消息了。而 q 在收到这些消息后,会分发给在它这里注册了的 c,让他们去分别处理。每一条消息只会分发给一个 c,而不是每一个 c 接受到所有的消息然后全部去处理。

Producer

Producer 负责产生消息并将消息发送给 Queue,它不关心消息怎么处理,只把消息按照指定的规则(下面会说)发送出去。

Consumer

系统在启动的时候,会将所有的消费者 Consumer 注册到 RabbitMQ 中并对指定的 Queue(有规则,下面会说)进行监听,如果 Queue 中产生了数据,queue 会将消息平摊发送给所有注册到当前 quene 中的 consumer 中,由 consumer 进行处理。

Exchange

Exchange 在 RabbitMQ 中是一个很重要的组件。上面我们说 Producer 负责产生消息并将消息发送给 Queue,实际上 Producer 并不直接连接 Queue,而是连接了 Exchange,由 Exchange 将消息路由到指定的 Queue 中。也就是在 producer 和 queue 中间插入了一个 Exchange 来分发消息给 queue。
ExchangeType、binding 和 ExchangeName 三者确定了一个 Exchange,ExchangeName 确定了这个 Exchange 的名字,ExchangeType 决定了这是一个什么类型的 Exchange,binding 决定了它以什么规则连接一个 queue。

  • ExchangeName 标记了这个 Exchange,在 producer 发送消息的时候需要指定,消息会发送到指定 ExchangeName 的 Exchange 中。比如说你去参加婚礼宴会,主人之前告诉你到了酒店去大厅找服务员 A,她会带你到包厢。这里的“服务员 A”这个名字就是 ExchangeName
  • binding(其实是一个字符串或者说一种特别的正则)会将 Exchange 和 Queue 联系起来,而 Exchange 路由到 Queue 的规则就是这个 binding/binding-key。比如说你去参加一个婚宴,请帖写着 302 包厢,那服务员就会领着你到 302 包厢。在这个例子中,服务员就是 Exchange,包厢是 queue,而服务员脑中这个“根据你的请柬领你到哪个包厢”的规则就是 binding。而你的请柬是 routing-key,这个后面说
  • ExchangeType 是 Exchange 的类型,这决定了这个 Exchange 会执行哪种规则 (binding) 将消息路由到不同的 queue。ExchangeType 分为多种,其中最为常用的就是 direct,fanout 和 topic。
  • 还是比如你去参加婚宴,婚宴上可能有很多服务员。服务员 A 是个耿直 boy,你请柬上写着哪个包厢他就领你去哪个包厢,也就是你的请柬上的“302 包厢”和他脑中“302 包厢(302 包厢在 3 楼第二个包厢)”是相等的,那他就领着你去了,这个服务员就叫“direct”。
  • 而服务员 B 他负责领路到 4 楼的 401、402 包厢,另外他是个呆子但是会一点魔法,不管是谁找到他,他都会念一句魔法把你复制两份(有几个包厢就复制几份),然后把你分别领到他负责的包厢去,401、402 包厢每一个都有一个你,这个服务员是“fanout”,你在这个例子中就是消息(意即如果消息发送给 type 是 fanout 的 exchange,exchange 会把消息发送给所有和他绑定的 queue 中)。
  • 最后有一个服务员 C 特别聪明,老板交给他的任务是如果找你的是男人,领到 501,如果是女人,领到 502,如果是小孩子,领到 503,即按照不同的规则(这里的规则更详细的解释可以解释成:. 男人. 人路由到 501,. 女人. 人路由到 502,*. 小孩. 人路由到 503),这个服务员就叫“topic”,topic 的详细规则是这样的:

routing key 为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key 与 routing key 一样也是句点号“. ”分隔的字符串
binding key 中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)


以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到 Q1 与 Q2,
routingKey=”lazy.orange.fox”的消息会路由到 Q1 与 Q2,
routingKey=”lazy.brown.fox”的消息会路由到 Q2,
routingKey=”lazy.pink.rabbit”的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配);
routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何 bindingKey。
参考自:RabbitMQ 基础概念详细介绍

介绍 Producer 和 Consumer 的规则

上面在介绍 Producer 的时候都把规则给略过了,现在介绍完了 Exchange,可以再回过头来介绍 Producer 的规则。

当时说 Producer 会将消息发送给 queue,现在我们知道 Producer 是将消息发送给了 Exchange。Producer 在发送消息的时候需要指定 Exchange,也就是需要声明 ExchangeName,同时根据指定的这个 Exchange 的类型,来设置一个 routing-key。我们知道 Exchange 和 queue 是通过一个叫 binding 的 key 来完成的,在 Exchange 中会记录所有它连接的 queue 所对应的 binding(暂且叫 bs 吧),消息通过设置的 ExchangeName 发送到指定的这个 Exchange,同时携带过去的 routing-key 会与 bs 中的 binding 进行匹配,匹配条件满足就会将消息发送过去(匹配到几个就发送几个)。

Consumer 连接 queue 的规则相对来说就简单很多了,指定 queue 的名字,设置几个连接参数就好了(比如说:exclusive、Auto-delete、Durable)。

exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 Auto-delete: 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 Durable: 持久化

另外对于 Producer、Exchange、Queue,和 Consumer 都还有一些可设置的参数用来个性化配置,这个自己去搜索一下吧。

一张图来解释整个过程


图中 Producer 发送消息给 message,routing-key 未指定是指没有固定一个 routing-key,这个 key 是动态可变的。

而根据 message 这个 Exchange 的 type 可知,这个 message 是那么聪明的服务员,会根据规则将发送来的消息“聪明”的交给相应的 Queue 处理。

假如这个 Producer 发送的 Message 所指定的 routing-key 是 order.log.phbj,那么根据 Exchange 与 Queue 间对应的橙色线,这条消息会被发送给 queue2, 并由 Consumer 接收处理。

如果这个 Producer 发送的 Message 所指定的 routing-key 是 weixin.order.phbj,那么这个消息会被发送 queue,并由对应注册的 Consumer2 接收(注意这里的 Consumer 可能不止一个,但是只会有一个 Consumer 来接收)

过程大概就是这样了,接下来配合 Spring 来使用。

Spring 中使用 RabbitMQ

pom 文件放在文章末尾,这里先来说 RabbitMQ 的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
 <!--配置 connection-factory,指定连接 rabbit server 参数 -->
 <rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
 host="192.168.1.198"
 port="5672"
 virtual-host="/"/>
 <bean id="rabbitConnectionFactory"
 class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
 <property name="host" value="192.168.1.198"/>
 <property name="username" value="admin"/>
 <property name="password" value="123456"/>
 <property name="port" value="5672"/>
 <property name="virtualHost" value="/"/>
 <property name="channelCacheSize" value="5"/>
 <property name="publisherConfirms" value="true"/>
 </bean>
 <!--
 这里是两个 queue 的定义
 exclusive:排他,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
 Auto-delete: 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
 Durable: 持久化
 -->
 <rabbit:queue id="queue" name="queue" durable="true" auto-delete="false" exclusive="false"/>
 <rabbit:queue id="queue2" name="queue2" durable="true" auto-delete="false" exclusive="false"/>
 
 <!--RabbitMQ 的 Consumer-->
 <bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
 <bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>
 
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
 prefetch="5">
 <rabbit:listener queues="queue" ref="messageReceiver"/>
 <rabbit:listener queues="queue2" ref="messageReceiver2"/>
 </rabbit:listener-container>
 
 
 <!--通过指定下面的 admin 信息,当前 producer 中的 exchange 和 queue 会在 rabbitmq 服务器上自动生成 -->
 <rabbit:admin connection-factory="connectionFactory"/>
 
 
 <!-- 两种 Exchange 的定义 -->
 <rabbit:direct-exchange name="order" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue" key="order"></rabbit:binding>
 </rabbit:bindings>
 </rabbit:direct-exchange>
 
 <rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
 </rabbit:bindings>
 </rabbit:direct-exchange>
 
 <rabbit:topic-exchange name="message" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue2" pattern="*.log.phbj"/>
 <rabbit:binding queue="queue" pattern="*.order.phbj"/>
 <rabbit:binding queue="queue" pattern="*.pay.phbj"/>
 </rabbit:bindings>
 </rabbit:topic-exchange>
 
 <!--定义是否使用序列化传输-->
 <!--<bean id="jsonMessageConverter"-->
 <!--class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>-->
 <!--message-converter="jsonMessageConverter"-->
 
 <!--定义 rabbit template 用于数据的发送 -->
 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
 exchange="exchange" routing-key="exchange"/>
 <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
 exchange="order" routing-key="order"/>
 <bean id="amqpTemplate3"
 class="org.springframework.amqp.rabbit.core.RabbitTemplate">
 <property name="connectionFactory" ref="rabbitConnectionFactory"/>
 <property name="confirmCallback" ref="confirmCallBackListener"/>
 <property name="returnCallback" ref="returnCallBackListener"/>
 <property name="mandatory" value="true"/>
 <!--只有在关闭事务的情况下 mandatory 才起作用-->
 <property name="channelTransacted" value="false"/>
 <property name="exchange" value="message"/>
 </bean>
 <!--发送确认监听-->
 <bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
 <bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>
</beans>

在程序启动之前,首先来确认下 RabbitMQ 的状态


图中可以看出,现在没有连接链接到 RabbitMQ,Exchange 和 Queue 也都是空的。
然后启动程序
RabbitMQStart

这是启动之后立刻就截的图,可以看到,有一个连接链接上了 RabbitMQ,同时创建了三个 Exchange 以及两个 queue。
然后打开每一个 Exchange 来看看它的对应规则是什么,着重看红色框中的
RabbitMQExchangeBinding

可以看到,红色框中的设置正是在 xml 中配置的那部分:

<rabbit:direct-exchange name="order" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue" key="order"></rabbit:binding>
 </rabbit:bindings>
</rabbit:direct-exchange>
 
<rabbit:direct-exchange name="exchange" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue2" key="exchange"></rabbit:binding>
 </rabbit:bindings>
</rabbit:direct-exchange>
 
<rabbit:topic-exchange name="message" durable="true" auto-delete="false">
 <rabbit:bindings>
 <rabbit:binding queue="queue2" pattern="*.log.phbj"/>
 <rabbit:binding queue="queue" pattern="*.order.phbj"/>
 <rabbit:binding queue="queue" pattern="*.pay.phbj"/>
 </rabbit:bindings>
</rabbit:topic-exchange>

其中配置文件中的“<rabbit:” 后面的 topic 和 direct 指该 Exchange 的类型。而绑定的 queue 则在 bindings 中,规则是通过 binding 配置的 key 或者 pattern。这些在图中都有显示。


图中 Bindings 与上面的 Exchange 的配置有对应,而 Consumers 则对应了 xml 中的以下配置:

<!--RabbitMQ 的 Consumer-->
<bean id="messageReceiver" class="net.sumile.consumer.MessageConsumer"></bean>
<bean id="messageReceiver2" class="net.sumile.consumer.MessageConsumerQueue2"></bean>
 
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency="1"
 prefetch="5">
 <rabbit:listener queues="queue" ref="messageReceiver"/>
 <rabbit:listener queues="queue2" ref="messageReceiver2"/>
</rabbit:listener-container>

Ack required 通过配置的 acknowledge 来配置,表示需要消息确认
Prefetch count 通过配置中的 prefetch 标签配置,来限制 Queue 每次发送给每个消费者的消息数。

如果有多个消费者同时订阅同一个 Queue 中的消息,Queue 中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。参考:RabbitMQ 基础概念详细介绍

发送者的相关配置不会在上面的图中体现,他是通过代码来调用运行的。

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
 exchange="exchange" routing-key="exchange"/>
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
 exchange="order" routing-key="order"/>
<bean id="amqpTemplate3"
 class="org.springframework.amqp.rabbit.core.RabbitTemplate">
 <property name="connectionFactory" ref="rabbitConnectionFactory"/>
 <property name="confirmCallback" ref="confirmCallBackListener"/>
 <property name="returnCallback" ref="returnCallBackListener"/>
 <property name="mandatory" value="true"/>
 <!--只有在关闭事务的情况下 mandatory 才起作用-->
 <property name="channelTransacted" value="false"/>
 <property name="exchange" value="message"/>
</bean>
<!--发送确认监听-->
<bean id="confirmCallBackListener" class="net.sumile.producer.ConfirmCallBackListener"/>
<bean id="returnCallBackListener" class="net.sumile.producer.ReturnCallBackListener"/>

我们在配置文件中配置了三个发送消息的模板,其中 amqpTemplate 和 amqpTemplate2 是两个普通的发送模板,它定义了 exchange 和 routing-key。
而 amqpTemplate3 比较特殊,它定义了 mandatory,这个用来标识这个模板发送的消息是否需要回执(当 mandatory 标志位设置为 true 时,如果 exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 queue,那么会调用 basic.return 方法将消息返还给生产者;当 mandatory 设为 false 时,出现上述情形 broker 会直接将消息扔掉。
这就是为什么需要使用 bean 标签而不使用<rabbit:template 标签的原因。同时我们看到 amqpTemplate 和 amqpTemplate2 使用的 connection-factory 是 connectionFactory,而 amqpTemplate3 使用的是 rabbitConnectionFactory,这是因为如果需要回执的话,需要在 Connection-Factory 中指定一个参数 publisherConfirms,从下面两个 connection-factory 的配置就可以看出:

<rabbit:connection-factory id="connectionFactory" username="admin" password="123456"
 host="192.168.1.198"
 port="5672"
 virtual-host="/"/>
<bean id="rabbitConnectionFactory"
 class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
 <property name="host" value="192.168.1.198"/>
 <property name="username" value="admin"/>
 <property name="password" value="123456"/>
 <property name="port" value="5672"/>
 <property name="virtualHost" value="/"/>
 <property name="channelCacheSize" value="5"/>
 <property name="publisherConfirms" value="true"/>
</bean>

至此,配置文件与 RabbitMQ 的关系算是稍微的讲完了,解下来,我们实测一下。
实测运行
先放 Controler 文件以及其他一些文件 Controler

package net.sumile.controler;
 
import net.sumile.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
 
 
/**
 * Created by Administrator on 2017/6/14 0014.
 */
@Controller
public class ControlerM {
 @Autowired
 MessageProducer messageProducer;
 
 @RequestMapping(value = "/home")
 public void send(@RequestParam(value = "type", defaultValue = "1") String type,
 @RequestParam(value = "routing_key", defaultValue = "default.order.phbj") String routing_key,
 @RequestParam(value = "message", defaultValue = "defaultMessage") String message) {
 if ("1".equals(type)) {
 messageProducer.sendMessage(message);
 } else if ("2".equals(type)) {
 messageProducer.sendMessage2(message);
 } else if ("3".equals(type)) {
 messageProducer.sendMessage3(routing_key, message);
 }
 }
}

以及 MessageProducer

package net.sumile.producer;
 
import javax.annotation.Resource;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
 
/**
 * 功能概要:消息产生, 提交到队列中去
 */
@Service
public class MessageProducer {
 
 private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
 
 @Resource
 private RabbitTemplate amqpTemplate;
 @Resource
 private RabbitTemplate amqpTemplate2;
 @Resource
 private RabbitTemplate amqpTemplate3;
 
 public void sendMessage(Object message) {
 System.out.println("1 发送:" + message);
 amqpTemplate.convertAndSend(message);
 }
 
 public void sendMessage2(Object message) {
 System.out.println("2 发送:" + message);
 amqpTemplate2.convertAndSend(message);
 }
 
 public void sendMessage3(final String routingKey, final Object message) {
 System.out.println("3 发送:[" + message + "] routing-key = [" + routingKey + "]");
 amqpTemplate3.convertAndSend(routingKey, message);
// for (int i = 1; i < 3000000; i++) {
// System.out.println("3 发送:[" + message + "] routing-key = [" + routingKey + "]" + i);
// amqpTemplate3.convertAndSend(routingKey, message);
// }
 }
}

以及两个接收者中的一个,另外一个类似,只是输出的文字由 Queue 替换为了 Queue2 用来区分接收的是哪个 queue 中的 message

package net.sumile.consumer;
 
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 
import static java.lang.Thread.sleep;
 
/**
 * 功能概要:消费接收
 *
 * @author sumile
 * @since 2017 年 6 月 23 日 15:38:07
 */
public class MessageConsumer implements ChannelAwareMessageListener {
 
 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
 
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 System.out.println("接收到 Queue:" + new String(message.getBody()));
 try {
 sleep(3000);
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 } catch (InterruptedException e) {
 
 }
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
}

最重要的就是这几个文件,来捋一遍过程:首先通过浏览器访问一个我们的服务器地址,进入 controler,根据传过来的 type 去调用不同的模板发送不同的消息,然后由对应的 Consumer 接收并打印出来。

请求地址:http://localhost:8080/home?type=1&routing_key=myO.pay.phbj&message=k打印:
1 发送:k
接收到 Queue2:k

因为 template 对应的 Exchange 是 exchange,而 exchange 对应的 queue 是 queue2,所以打印出来的接受者是 queue2
请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65打印:
2 发送:65
接收到 Queue:65

原理同上
请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65打印:
2 发送:65
接收到 Queue:65

原理同上
请求地址:http://localhost:8080/home?type=3&routing_key=myO.pay.phbj&message=65打印:
3 发送:[65] routing-key = [myO.pay.phbj]
接收到 Queue:65

这里 routing_key 匹配到了*.pay.phbj,所以发送到 queue 中并由 queue 的 Consumer 接收
请求地址:http://localhost:8080/home?type=3&routing_key=myO.log.phbj&message=65打印:
3 发送:[65] routing-key = [myO.log.phbj]
接收到 Queue2:65

这里 routing_key 匹配到了*.log.phbj,所以发送到 queue2 中并由 queue2 的 Consumer 接收

confirmCallback 和 returnCallback

接下来我们来看一组请求:请求地址:http://localhost:8080/home?type=3&touting_key=myO.l2og.phbj&message=65看这组请求,我们知道是调用 amqpTemplate3 来发送的,但是并没有 binding-key 与之对应,所以这个 Message 发送到 Exchange 之后 Exchange 不知道该交给哪个 Queue。但是由于我们设置了

<property name="returnCallback" ref="returnCallBackListener"/>

所以如果 Exchange 没有找到匹配的 queue 的时候,就会进入到这个类的方法中,由我们来处理这条迷路的 Message,不至于将这条消息丢失了。
那这个配置是做什么的呢?

<property name="confirmCallback" ref="confirmCallBackListener"/>

简单来说,如果发送出去的消息找不到 Exchange,到 confirmCallback 中,如果找到了 Exchange 找不到 Queue,到 returnCallback 中。其余的可以参考这个 RabbitMQ(四) 消息确认 (发送确认, 接收确认),如果要测试的话我们可以不停的发送消息,然后手动的将要路由到的 queue 删掉,就会出现这种情况了。

Consumer 的回复

我们在 Consumer 的配置中设置了 acknowledge 参数,表示需要手动回复“已经接收到该消息”然后 queue 才会删除该 Message。下面来实测下如果不回复会怎么样
我们将 MessageConsumer 中的回复代码注释掉,并请求地址:http://localhost:8080/home?type=2&routing_key=myO.pay.phbj&message=65。看看 RabbitMQ 中 queue 里面 Message 是怎么变化的

public class MessageConsumer implements ChannelAwareMessageListener {
 
 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
 
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 System.out.println("接收到 Queue:" + new String(message.getBody()));
// try {
// sleep(3000);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (InterruptedException e) {
//
// }
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 }
}

我们请求三遍上面的那个地址,然后刦看控制台,会看到回复:

2 发送:65
接收到 Queue:65
2 发送:65
接收到 Queue:65
2 发送:65
接收到 Queue:65

接收到了。然后再去看 RabbitMQ 的网页控制端:http://192.168.1.198:15672


queue 中累计了三条消息,而这三条消息已经是处理过的,如果有消息不停的进入,结果就是堆满内存
这是最需要注意的一点

都是自己在实际了解学习过程中遇到的一些问题以及感悟,看了很多博客,感谢各位大牛。有错误请指出,望不吝赐教。

RabbitMQ-Demo

我的博客—sumile

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,219评论 0 11