rabbitmq中间件搭建在本地虚拟机上,详情搭建过程可查看:rabbitmq安装部署
使用上次搭建的dubbo项目补充rabbitmq实现,代码可参考:20分钟springboot搭建dubbo服务
首先查看virtual-host配置(VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。)
rabbitmq原理结构
生产者/消费者模型,类似于交换机。Exchange交换器,共有四种类型,不同的类型对应不同的路由策略。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机,一方面接收生产者发送来的消息。另一方面知道如何处理消息,例如交给特别的队列,或者全部的队列,或者将消息丢弃。到底如何操作取决于Exchange是哪种类型:
根据交换机类型不同,分为3种发布模式:
Direct<定向>:1对1-----一个消息只能被一个消费者消费;把消息交给符合特定routing key(queue与exchange的关系key) 的队列。
Topic<通配符>:1对多-----一个消息可以被多个消费者消费(轮询);把消息交给符合routing pattern (路由模式)的队列。
Fanout<广播>:将消息分发给所有绑定到交换机的队列。
消息队列内生产者添加消息队列数据,消费者接收并使用队列中的数据,上次搭建的简单的dubbo服务中consumer发出请求,provider提供查询数据库的服务,具体如下图:
继续完成代码实现
consumer主体结构如下:
补充consumer的pom文件rabbitmq配置
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
补充consumer内yml的rabbitmq配置
spring:
application:
name: consumer
profiles:
active: test
#配置rabbitMq 服务器
rabbitmq:
host: 10.1.31.199
port: 5672
username: admin
password: admin
#虚拟host 可以不设置,使用server默认host
virtual-host: /
注意1) rabbitmq的默认web端口号是15672,接扣访问端口是5672
2)rabbitmq的默认virtualhost配置为"/"
在config文件夹添加DirectRabbitConfig类,配置rabbitmq的配置信息如下:
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
创建测试接口SendMessageController类,完成消息队列数据的添加
package com.example.consumer.openapi;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RestController
@RequestMapping("/demo")
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
注意:1)convertAndSend的方法中exchange是Virtual host的name决定了在哪个queue存放消息,routingKey则确定了queue与exchange的绑定,不填写时自动为exchange的name。
2)rabbitTemplate与amqpTemplate方法,rabbitTemplate实现自amqpTemplate接口,使用起来并无区别
启动项目,访问url,执行rabbitmq消息写入:
写入成功:
provider主体结构:
首先同理consumer,修改provider的pom文件及yml文件
在service文件夹内添加DirectReceiver类如下:
package com.example.provider.service.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
}
启动provider项目,查看监听到的消息如下:
简单的消息队列完成。
RabbitTemplate和AmqpTemplate的使用区别:
两者都能调用convertAndSend方法向队列发送消息,而
API:amqpTemplate.convertAndSend("队列名",“消息内容”)此处队列名必须与创建的队列一致。
API:amqpTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
具体实现可详看使用方法。