为了一些初学习者更好理解我就从简单的解释一下Rabbitmq的原理吧,首先你可以这样想RabbitMq就是一个队列,而这个队列相当于一个消息投递服务,用于应用程序可以发送和接收包裹,而数据所在的服务器也可以接收和发送。所以RabbitMq在应用程序和服务器之间扮演着路由器的角色,当应用程序连接到RabbitMq时,它就必须做个决定:我是发送者还是接收呢?
这里我们首先了解一下什么是生产者:
生产者就是创建消息,然后发送到代理服务器。这里肯定有很多人不明白消息包括什么?很好解释有效载体和标签。有效载体就是你想传输的数据(它是可以是如何内容,一个json数组或者你喜欢的字符串RabbitMq根本不会在意这些)。标签作用就是描述有效载荷(通俗点就是标记可以找到对应的消费者),并且RabbitMq用它来决定谁讲获得消息的拷贝。举例来说:和tcp协议有点不同,当你明确指定发送方和接收方时,AMQP只会用标签来表述这条消息(一个交换机的名称和可选的主题标记),然后发到队列,队列会根据标签发给感兴趣的一方(如下图)
这里接着了解消费者:
消费者看字面意思我们就很清楚了,主要就是连接到代理服务器上,并订阅到队列上,为何订阅,订阅就像上面生产者会发送给感兴趣的一方,所以订阅了代表我们感兴趣。这里有个特点就是消费者接收到消息时,它只得到消息的一部分:有效载荷(内容),在消息路由过程中,消息标签并没有随有效载荷一同传递。RabbitMq甚至不会告诉你是哪个生产者发送的信息。就好比你拿到信件时,所有的信封都是空白,想要知道到底是谁发送的就看信里是否签名了。同理,如果想知道是谁生成的AMQP消息的话,就要看生成者是否把发送方信息放入有效载荷中(内容)。
看了上面描述我们最基础的过程应该懂了:生产者创建消息,消费者接收这些消息。你的应用程序可以做为生产者,像其他应用程序发送消息。也可以做为消费者接收消息。两者可以互相装换。不过在此之前,它必须依靠一条信道。( 字面理解就是一个通道 )
你想用信息队列RabbitMq,就必须和它先建立连接吧,没有连接就不可能完成消费或者发布消息。所以我们要在应用程序和RabbitMq代理服务器之间创建一条TCP连接,一旦TCP连接打开,应用程序就可以创建一条AMQP信道,信道必须建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去的。每条信道都会指派唯一的ID(AMQP会帮我们做记录,这里不用管知道就行)。不论是发布消息、订阅队列或是接收消息,这些都是通过信道完成的。这里如果学会思考的人肯定疑问有了TCP连接还要信道干嘛,不直接利用TCP完成AMQP命令呢?主要原因在于对操作系统来说建立和销毁TCP会话是非常昂贵的的开销。比如应用程序从队列消费消息,并根据服务需求合理调度线程。假设你只进行TCP连接,那么每个线程都需要自行连接到RabbitMq上。也就是说稍微高峰期不说多每秒需要上百条连接。很容易对服务器造成不可避免的问题(瓶颈)。如果我们所有线程只使用一条TCP连接以满足性能方面的要求,不仅能保证私密性,还能各自独立,这就需要引入信道概念的原因。线程启动时,会现在连接上建立一条信道,也就获得连接到RabbitMq上私密通信路径,而不会给系统的TCP造成额外负担,因此,你可以每秒成百上千次地创建信道而不会影响操作系统。在一条TCP连接上创建多少条信道是没有限制,把它想象成光光纤电缆就可以了。每条电缆中的光纤都可以传输(就像一条信道)。一条电缆有许多光纤束,允许所有连接的线程通过多条光纤束同时进行传输和接收。TCP连接就像电缆,而AMQP信道就像光纤束。
你会感觉前面内容很枯燥,但是没办法干货都是无聊的!你以对消费者和生产者有了一定理解,首先你想要知道什么是队列。从概念上来讲,AMQP消息路由必须有三部分:交换器、队列、和绑定。生产者把消息发布到交换器上;消息最终到达队列,并被消费者接收;绑定决定了消息如何从路由器到特定的队列。这里你还要先理解队列的概念和工作原理。
消费者通过以下两种方式从特定的队列中接收消息:
1:通过AMQP的basic.consume命令订阅。这样做会将信道为接收模式,直到取消对队列的订阅为止。订阅了消息后,消费者在消费最近接收的那条消息后,就能从队列自动接收下一条消息。
2:还有一种情况是我们只想接收一条消息而不是持续订阅。向队列请求单条消息时通过AMQP的basic.get命令实现的。这样做可以让消费者接收队列红的下一条消息。如果想要更多消息,想要再次发送basic.get命令。你不能把basic.get放在一个循环里来替代basic.consume。虽然放在循环里面这种方法不可取,但是比较简单易懂,只是这样做会影响性能,所以不是不可以这样做。
还有一点这里说明一下如果至少有一个消费者订阅队列的话,消息会立即发送给这些订阅的消费者。如果消息到达看无人订阅的队列,它会一直在队列等待,一旦有订阅,就会立即发送消息。
大家应该迫不及待想看代码没问题首先我会演示生产者到消费者这一块延迟会放在最后面解说:
这里同时解说一下吧:
依赖jar:
org.springframework.boot
spring-boot-starter-amqp
首先我会先写一个配置类,创建两个队列,一个是消息队列queue1和消息队列queue2,并且配置连接TCP信息的,配置消息的交换机针对消费者配置,还要让消息队列持久化防止丢失,最后是配置绑定交换机和路由。
配置:
package com.didispace.delaymq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitConfig {
public static final String EXCHANGE= "sb-exchange";//交换机
public static final String ROUTINGKEY1 = "sb-routingKey1";//路由
public static final String ROUTINGKEY2 = "sb-routingKey2";//路由
public static final String QUEUE1 = "queue1";//队列1
public static final String QUEUE2 = "queue2";//队列2
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactoryconnectionFactory = new CachingConnectionFactory("127.0.0.1",5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置这里是回调用的
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
@Bean(name = "queue1")
public Queue queue1() {
return new Queue(QUEUE1, true); //队列持久
}
@Bean(name = "queue2")
public Queue queue2() {
return new Queue(QUEUE2, true); //队列持久
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(defaultExchange()).with(ROUTINGKEY1);
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(defaultExchange()).with(ROUTINGKEY2);
}
}
上面都是对应配置接下来是生产者和对应消费者
生产者:
package com.didispace.delaymq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
public class TestController implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate = rabbitTemplate;
}
@RequestMapping("send1")
public String send1(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY1, msg, correlationId);//交换机、路由、要发送的消息和对应的id值
return null;
}
@RequestMapping("send2")
public String send2(String msg){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, RabbitConfig.ROUTINGKEY2, msg, correlationId); //交换机、路由、要发送的消息和对应的id值
return null;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回调id:" + correlationData);
if (ack) {
System.out.println("消息成功消费");
} else {
System.out.println("消息消费失败:" + cause+"\n重新发送");
}
}
}
消费者:
package com.didispace.delaymq;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqListener {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private Queue queue1;
@Autowired
private Queue queue2;
@Autowired
private TestController testController;
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue1);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
testController.send2("hahahahaahha");//这里做了实验启动了队列2,说明在1个队列可以启动其他队列。
System.out.println("queue1 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
@Bean
public SimpleMessageListenerContainer messageContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue2);
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("queue2 收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}
上面我多加了一个回调,这个意思就是我发送的消息成功发送至于消费者接收没接收,消费没消费我回调根本不知道。对啦我在上面还做了一个小实验你认真看会发现我在queue1队列消费者里面调用了队列queue2嘿嘿个人爱好喜欢瞎搞!
好啦上面也可以测试,把项目启动起来那个Spring启动的那个类没写太简单我相信你们都会,直接启动,然后如下图访问页面 :http://localhost:8080/send1
好啦上面都是RabbitMq简单操作没进入正题,现在正式研究延迟队列
关于延迟队列先讲一下原理:
创建一个交换机e1(广播类型),一个交换机e2(类型看自己需求),e1绑定一个队列q1,q1设置死信交换机为e2,e2可以绑定有需求的任何队列,比如q2。当要发延迟消息的时候,直接将消息发送到交换机e1,消息会被广播到q1队列中,q1队列因为没有消费者,消息过期后会自动转发先前绑定的e2交换机,e2会正确投递到你指定的队列中。
当然还有一种方法是直接对队列操作的:
创建两个队列,一个队列标记为死信e1,一个是延迟队列设置延迟时间e2,在消费者那边,要把e1死信队列事先订阅绑定到死信路由和交换机,把延迟队列e2设置了死信的路由和交换器,e2延迟之后自然到对应的死信里面,根据死信路由和交换器找到对应绑定的队列e
上面对延迟队列应该有一定的理解,个人建议是自己画图理解比较好,毕竟我画图太丑!所以不献丑了!直接上代码:
生产者:
package com.didispace.delaymq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static String DIRECT_EXCHANGES = "amq.direct";//交换机
public static String DEAD_LETTER_ROUTING = "DEAD_LETTER_ROUTING";//死信路由
public static String DEAD_LETTER_QUEUE_NAME = "DEAD_LETTER_QUEUE";//死信队列名
public static String DELAY_QUEUE = "DELAY_QUEUE";//延迟队列
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);
//发送的消息延迟30秒并且持久化
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration("30000").deliveryMode(2).build();
//往队列中发出一条消息这时候要发送的队列不应该是QUEUE_NAME,这样才能进行转发的
for (int i=10; i > 0; i--) {
String message = "hello world!"+System.currentTimeMillis();
channel.basicPublish("", DELAY_QUEUE, properties, message.getBytes());
System.out.println("Producer Sent_DELAY_QUEUE_"+i+" : " + message + "" );
}
//关闭频道和连接
channel.close();
connection.close();
}
}
消费者:
package com.didispace.delaymq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.util.HashMap;
public class Consumer {
//private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(Producer.DEAD_LETTER_QUEUE_NAME, true, false, false, null);
// channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);
channel.queueBind(Producer.DEAD_LETTER_QUEUE_NAME, Producer.DIRECT_EXCHANGES, Producer.DEAD_LETTER_ROUTING);
//这里是创建死信并且绑定死信
HashMap arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", Producer.DIRECT_EXCHANGES);
arguments.put("x-dead-letter-routing-key", Producer.DEAD_LETTER_ROUTING);
channel.queueDeclare(Producer.DELAY_QUEUE, true, false, false, arguments);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//创建队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消费队列
channel.basicConsume(Producer.DEAD_LETTER_QUEUE_NAME, true, consumer);
while (true) {
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'"+ "'[当前系统时间戳]" +System.currentTimeMillis());
}
}
}
这里需要运行main方法当然如果想用其他方法也是可以的改一下就ok啦,这里我还是截图最后运行结果,肯定先让生产者main方法先走,在走消费者main方法。这里建议延迟时间10秒最好不多不少,太长时间也不好。如图结果:
这里开了4个消费者,因为我用循环发了是10个延迟队列,阻塞的话一个消费者处理太慢,所以开了4个消费者处理会很快,分配任务是3,3,2,2的方式。
好了基本上写完了,肯定有很多瑕疵,所以还是希望可以互相讨论,我研究这个也才2个星期,不可能写的那么完美,多多体谅!有什么不懂的或者有什么瑕疵!转载需声明地址,谢谢!
####指导qq:179061434