小白一枚,第一次写博客,请大家多多包涵,有什么错误,请大家不吝指教。
rabbitmq基础详解:https://blog.csdn.net/lyhkmm/article/details/78775369
先简单的讲下rabbitmq 吧,RabbitMQ是一个开源的AMQP实现,AMQP,即Advanced Message Queuing Protocol,高级消息队列协议;
1. RabbitMQ的exchange有四种类型:
fanout : 会把所有发送到该exchange的消息路由到都与它绑定的Queue中;
direct : 把消息路由到 binding key 与routing key完全匹配的queue中;
topic : 把消息匹配到 bingding key 与 routing key 通过通配符匹配到 *匹配一个单词,#匹配多个;
headers : 通过发送到消息的头部匹配;
2. 下面简单演示下rabbitmq在springboot中的使用
(1). pom.xml 导入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2) application.properties 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest #如果rabbitmq装在本机 就用guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
server.port=8080
3.fanout 类型 demo
(1) 新建一个FanoutRabbitConfig 类
package com.study.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import con.study.rabbitConsumer.FanoutMessageConsumer;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 上午9:54:37
* @version: 1.0
*/
@Configuration
public class FanoutRabbitConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
public static final String F_EXCHANGE = "fanoutexchange";// 消息交换机的名字
public static final String ROUTINGKEY1 = "fanoutqueue"; // 队列的名字
/**
* @date:2018/8/29
* @className:ConnectionFactory
* @author:nice
* @description: 创建连接工厂
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.host, this.port);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
connectionFactory.setVirtualHost(this.virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午9:44:58
* @Description: 定义交换机类型为fanout
* @return: FanoutExchange
*/
@Bean
public FanoutExchange fanoutExchange() {
FanoutExchange exchange = new FanoutExchange(FanoutRabbitConfig.F_EXCHANGE, true, false);
return exchange;
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午9:45:59
* @Description: 定义队列名字
* @return: Queue
*/
@Bean
public Queue queue_one() {
/*
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列 auto-delete 表示消息队列没有在使用时将被自动删除
* 默认是false exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
Queue queue = new Queue(FanoutRabbitConfig.ROUTINGKEY1, true, false, false);
return queue;
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午9:48:33
* @Description: 交换机与队列绑定,直接绑定即可
* @return: Binding
*/
@Bean
public Binding bind_one() {
return BindingBuilder.bind(queue_one()).to(fanoutExchange());
}
接下来是要将队列与消费者进行绑定,有两种方法,我先演示第一种方法,直接绑定,每种类型我都演示一中方法;在FanoutRabbitConfig类中接着写:
/**
*
* @author: nice
* @time: 2018年8月29日 上午10:02:02
* @Description: 定义一个消费者
* @return: FanoutMessageConsumer 新建的消费者类
*/
@Bean
public FanoutMessageConsumer messageConsumer() {
return new FanoutMessageConsumer();
}
/**
*
* @author: nice
* @time: 2018年8月29日 上午10:03:43
* @Description: 监听模式
* @return: SimpleMessageListenerContainer
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer_one() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(
connectionFactory());
simpleMessageListenerContainer.addQueues(queue_one());
simpleMessageListenerContainer.setExposeListenerChannel(true);
simpleMessageListenerContainer.setMaxConcurrentConsumers(1);
simpleMessageListenerContainer.setConcurrentConsumers(1);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置确认模式手工确认
simpleMessageListenerContainer.setMessageListener(messageConsumer());
return simpleMessageListenerContainer;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
(2) 配置生产者 RabbitController
package com.study.ExcessController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.study.config.FanoutRabbitConfig;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 上午10:14:41
* @version: 1.0
*/
@RestController
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/fanout")
public String fanout() {
rabbitTemplate.convertAndSend(FanoutRabbitConfig.F_EXCHANGE,FanoutRabbitConfig.ROUTINGKEY1,"fanoutssss");//参数:exchange, queue,携带的消息;
return "ok";
}
}
(3) 配置消费者 FanoutMessageConsumer
package con.study.rabbitConsumer;
import java.util.Date;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
/**
* @author: Nice
* @Description :
* @date: 创建时间:2018年8月29日 上午10:01:22
* @version: 1.0
*/
public class FanoutMessageConsumer implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("Consumer收到消息 : " + new String(body));
}
}
(4) 还可以配置确认机制,我这里就不配置了,感兴趣的可以去网上搜一下。好了,在浏览器中输入http://localhost:8080/fanout 如果控制台出现:
Consumer收到消息 : "fanoutssss"
则代表成功。