1、简介
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
2、创建一个springboot的项目
3、添加RabbitMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4、在application.yml中配置RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true
virtual-host: /
5、创建一个rabbitMQ配置类(这个一定要看明白)
/**
* 说明:〈该类初始化创建队列、转发器,并把队列绑定到转发器〉
*/
@Configuration
public class ApplicationConfig {
private static Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
final static String queueName = "helloQuery";
@Bean
public Queue helloQueue() {
return new Queue(queueName);
}
@Bean
public Queue userQueue() {
return new Queue("user");
}
@Bean
public Queue dirQueue() {
return new Queue("direct");
}
//===============以下是验证topic Exchange的队列==========
// Bean默认的name是方法名
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
//===============以上是验证topic Exchange的队列===========
//===============以下是验证Fanout Exchange的队列==========
@Bean(name="AMessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//===============以上是验证Fanout Exchange的队列==========
/**
* exchange是交换机交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.
*
* Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
*
* topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
*
* headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.
*
* Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
*/
@Bean
DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
@Bean
TopicExchange exchange() {
// 参数1为交换机的名称
return new TopicExchange("exchange");
}
/**
* //配置广播路由器
* @return FanoutExchange
*/
@Bean
FanoutExchange fanoutExchange() {
// 参数1为交换机的名称
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue, DirectExchange directExchange){
return BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}
/**
* 将队列topic.message与exchange绑定,routing_key为topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
// 如果参数名和上面用到方法名称一样,可以不用写@Qualifier
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 将队列topic.messages与exchange绑定,routing_key为topic.#,模糊匹配
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
/**
* rabbitTemplate是thread safe的,主要是channel不能共用,但是在rabbitTemplate源码里channel是threadlocal的,所以singleton没问题。
* 但是rabbitTemplate要设置回调类,如果是singleton,回调类就只能有一个,所以如果想要设置不同的回调类,就要设置为prototype的scope。
* @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true
//每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
//使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//将对象序列化为json串
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息发送成功: correlationData:{}, ack{}, cause:{}", correlationData, ack, cause);
}else{
log.error("消息发送失败: correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息丢失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}", exchange, routingKey, replyCode, replyText, new String(message.getBody()));
}
});
return rabbitTemplate;
}
}
rabbitMQ配置类大约就这些内容,里面我基本上都做了注释。
下面我们就开始写rabbitMQ的用法了
6、单生产者和单消费者
6.1、生产者
@Component
public class HelloSender1 {
/**
* AmqpTemplate可以说是RabbitTemplate父类,RabbitTemplate实现类RabbitOperations接口,RabbitOperations继承了AmqpTemplate接口
*/
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private RabbitTemplate rabbitTemplate1;
/**
* 用于单生产者-》单消费者测试
*/
public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg);
}
}
名为helloQueue的队列在配置类创建好了,项目启动的时候会自动创建
6.2、消费者
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
@RabbitListener注解是监听队列的,当队列有消息的时候,它会自动获取。
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
注意
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
application/octet-stream:二进制字节数组存储,使用 byte[]
application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
text/plain:文本数据类型存储,使用 String
application/json:JSON 格式,使用 Object、相应类型
6.3、controller
/**
* 最简单的hello生产和消费实现(单生产者和单消费者)
*/
@RequestMapping("/hello")
public void hello() {
helloSender1.send();
}
6.4、结果
控制台的结果:
Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019
2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
Receiver1 : hello1 Mon Feb 18 10:13:35 CST 2019
7、单生产者对多消费者
7.1、生产者
/**
* 用于单/多生产者-》多消费者测试
*/
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}
7.2、消费者
消费者1
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
消费者2
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
@RabbitHandler public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
7.3、controller
/**
* 单生产者-多消费者
*/
@RequestMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender1.send("hellomsg:"+i);
}
}
7.4、结果:
Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
Receiver2 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
8、实体类的传输,必须格式化
8.1、实体类
public class User implements Serializable {
private String name;
private String pass;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", pass='" + pass + '\'' +
'}';
}
}
8.2、生产者
/**
* 实体类的传输(springboot完美的支持对象的发送和接收,不需要格外的配置。实体类必须序列化)
* @param user
*/
public void send(User user) {
System.out.println("user send : " + user.getName()+"/"+user.getPass());
this.rabbitTemplate.convertAndSend("userQueue", user);
}
8.3、消费者
@Component
@RabbitListener(queues = "userQueue")
public class HelloReceiver3 {
@RabbitHandler
public void process(User user){
System.out.println("user receive : " + user.getName()+"/"+user.getPass());
}
}
8.4、controller
/**
* 实体列的传输
*/
@RequestMapping("/userTest")
public void userTest(){
User user=new User();
user.setName("黄义波");
user.setPass("123456");
userSender.send(user);
}
8.5、结果
user send : 黄义波/123456
2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息发送成功:correlationData(null),ack(true),cause(null)
user receive : 黄义波/123456
9、directExchange
Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.
9.1、在rabbitMQ配置类中添加内容
@Bean
public Queue dirQueue() {
return new Queue("direct");
}
@Bean
DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 将队列dirQueue与directExchange交换机绑定,routing_key为direct
* @param dirQueue
* @param directExchange
* @return
*/
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){
return BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}
9.2、生产者
@Component
public class DirectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="directSender :hello i am hzb";
System.out.println(msgString);
this.rabbitTemplate.convertAndSend("direct", msgString);
}
}
9.3、消费者
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("directReceiver : " + msg);
}
}
9.4、controller
@RequestMapping("/directTest")
public void directTest() {
directSender.send();
}
9.5、结果
directSender :hello i am hyb
directReceiver : directSender :hello i am hyb
2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)
10、topicExchange
topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.
10.1、在rabbitMQ配置类中添加内容
// Bean默认的name是方法名
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
TopicExchange exchange() {
// 参数1为交换机的名称
return new TopicExchange("exchange");
}
/**
* 将队列topic.message与exchange绑定,routing_key为topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
// 如果参数名和上面用到方法名称一样,可以不用写@Qualifier
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 将队列topic.messages与exchange绑定,routing_key为topic.#,模糊匹配
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
10.2、生产者
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg1 = "I am topic.mesaage msg======";
System.out.println("sender1 : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
String msg2 = "I am topic.mesaages msg########";
System.out.println("sender2 : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
}
10.3、消费者
消费者1
@Component
@RabbitListener(queues = "topic.message")
public class TopicMessageReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topicMessageReceiver : " +msg);
}
}
消费者2
@Component
@RabbitListener(queues = "topic.messages")
public class TopicMessagesReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topicMessagesReceiver : " +msg);
}
}
10.4、controller
/**
* topic exchange类型rabbitmq测试
*/
@RequestMapping("/topicTest")
public void topicTest() {
topicSender.send();
}
10.5、结果
sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
topicMessageReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaages msg########
2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)
11、fanoutExchange
Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.
11.1、在rabbitMQ配置类中添加内容
//===============以下是验证Fanout Exchange的队列==========
@Bean(name="AMessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
// 参数1为交换机的名称
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
11.2、生产者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="fanoutSender :hello i am hzb";
System.out.println(msgString); // 参数2被忽略
this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString);
}
}
11.3、消费者
消费者A
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
消费者B
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
消费者C
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
11.4、controller
/**
* fanout exchange类型rabbitmq测试
*/
@RequestMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
11.5、结果
fanoutSender :hello i am hzb
FanoutReceiverA : fanoutSender :hello i am hyb
FanoutReceiverC : fanoutSender :hello i am hyb
FanoutReceiverB : fanoutSender :hello i am hyb
2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息发送成功:correlationData(null),ack(true),cause(null)
12、配置类中的rabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//若使用confirm-callback或return-callback,必须要配置publisherConfirms或publisherReturns为true
//每个rabbitTemplate只能有一个confirm-callback和return-callback,如果这里配置了,那么写生产者的时候不能再写confirm-callback和return-callback
//使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
/**
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}else{
log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
好好看看注释
13、不在配置类中配置callback
方法一:
13.1、配置一个接口
/**
* 说明:〈定义一个名为SendMessageService 的接口,这个接口继承了RabbitTemplate.ConfirmCallback,
* ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,
* 会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知
*/
public interface SendMessageService extends RabbitTemplate.ConfirmCallback{
void sendMessage(String exchange,String routekey,Object message);
}
13.2、实现这个接口
/**
* 说明:〈该类注入了RabbitTemplate,RabbitTemplate封装了发送消息的方法,我们直接使用即可。
* 可以看到我们构建了一个回调返回的数据,并使用convertAndSend方法发送了消息。同时实现了confirm回调方法,
* 通过判断isSendSuccess可以知道消息是否发送成功,这样我们就可以进行进一步处理。
*/
@Service
public class SendMessageServiceImpl implements SendMessageService{
private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);
@Autowired private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String exchange,String routekey,Object message) {
//设置回调对象
//rabbitTemplate.setConfirmCallback(this);
//rabbitTemplate.setMandatory(true);
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
logger.info("SendMessageServiceImpl() >>> 发送消息到RabbitMQ, 消息内容: " + message);
}
/**
* 消息回调确认方法
* @param correlationData 回调数据
* @param isSendSuccess 是否发送成功
* @param
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
logger.info("confirm回调方法>>>>>>>>>>>>>回调消息ID为: " + correlationData.getId());
if (isSendSuccess) {
logger.info("confirm回调方法>>>>>>>>>>>>>消息发送成功");
} else {
logger.info("confirm回调方法>>>>>>>>>>>>>消息发送失败" + s);
}
}
}
方法二:
直接在生产者发送信息的时候修改rabbitTemplate
@Service
public class SendMessage1 {
private static Logger log = LoggerFactory.getLogger(SendMessage1.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routekey, Object message) {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
log.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
}
}
13、有时候消费者出现错误,需要人工处理
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
//rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
// 将 CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,例如放到缓存中,然后人工处理
// 当confirm或return回调时,根据ack类别等,分别处理. 例如return或者ack=false则说明有问题,报警, 如果ack=true则删除关系
// (因为return在confirm前,所以一条消息在return后又ack=true的情况也是按return处理)
Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build();
rabbitTemplate.send(exchange, routekey, message1, correlationData);
将 CorrelationData的id 与 Message的correlationId绑定,然后关系保存起来,例如放到缓存中,然后人工处理
我们可以看到,这两条消息关联起来了。
14、事务消息
消息确认机制
RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction机制,但后者的优势在于强一致性。如果没有特别的要求,建议使用confrim机制。
- 1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
- 2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
- 3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
- basicAck:成功消费,消息从队列中删除
- basicNack:requeue=true,消息重新进入队列,false被删除
- basicReject:等同于basicNack
- basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制。RabbitMQ中,与事务机制有关的方法有三个:
- txSelect():将当前channel设置成transaction模式。
- txCommit():提交事务。
- txRollback():回滚事务。
- channel.basicPublish:发送消息,可以是多条,可以是消费消息提交ack。
- autoAck=false,手动提交ack,以事务提交或回滚为准。
- autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。
RabbitConfig
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
}
RabbitUtil
- 通过@PostConstruct注解,在依赖注入之后,初始化ConnectionFactory。
- 提供创建ConnectionFactory的方法和创建Connection的方法。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public final class RabbitUtil {
@Resource
private RabbitConfig rabbitConfig;
private ConnectionFactory factory;
@PostConstruct
public void init() {
factory = new ConnectionFactory();
factory.setHost(rabbitConfig.getHost());
factory.setPort(rabbitConfig.getPort());
factory.setUsername(rabbitConfig.getUsername());
factory.setPassword(rabbitConfig.getPassword());
factory.setVirtualHost(rabbitConfig.getVirtualHost());
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(60000L);
}
public ConnectionFactory newConnectionFactory() {
return factory;
}
public Connection newConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
}
TransactionProducer
import com.panda.rabbitmq.config.RabbitUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@Component
public class TransactionProducer {
private final static String EXCHANGE_NAME = "publisherconfirm-exchange";
@Resource
private RabbitUtil rabbitUtil;
public void send(String routingKey, String message) throws IOException, TimeoutException {
Channel channel = null;
try {
Connection connection = rabbitUtil.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明一个direct交换机
// 开启事务
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//int i = 1 / 0;
// 提交事务
channel.txCommit();
} catch (Exception e) {
// 回滚事务
if (channel != null) {
channel.txRollback();
}
} finally {
if (channel != null) {
channel.close();
}
}
}
}
https://zhuanlan.zhihu.com/p/582787505