title: SpringBoot MQ参考资料
date: 2019-06-24
author: maxzhao
tags:
- SpringBoot
- MQ
categories:
- DevelopTools
- SpringBoot
- 中间件
SpringBoot使用RabbitMQ
RabbitMQ 是
- 消息队列
- 实现AMQP(高级消息队列协议Advanced Message Queuing Protocol)的消息中间件的一种
作用:主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
流程:
一般消息队列都是生产者将消息发送到队列,消费者监听队列进行消费。
rabbitmq中一个虚拟主机(vhost默认 /)持有一个或者多个交换机(Exchange)。 用户只能在虚拟主机的粒度进行权限控制,交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上, 这样生产者和队列就没有直接联系,而是将消息发送的交换机,交换机再把消息转发到对应绑定的队列上。
交换机(Exchange)为rabbitmq
独特的概念,用到的最常见的是4中类型:
- Direct: 先匹配, 再投送。即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去. 交换机跟队列必须是精确的对应关系,这种最为简单。
- Topic: 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版
- Headers: 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers则是一个自定义匹配规则的类型, 在队列与交换器绑定时会设定一组键值对规则,消息中也包括一组键值对( headers属性),当这些键值对有一对或全部匹配时,消息被投送到对应队列
- Fanout : 消息广播模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routingkey会被忽略
举例说明
创建 2 个交换机directExchange
、fanoutExchange
,3个 队列 queueA
、 queueB
、 queueC
。
队列directExchange
作为定点发送,包含队列 A B
队列fanoutExchange
作为广播发送,包含队列 A B C
@Configuration
public class RabbitConfig {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 声明 Direct 交换机 支持持久化.
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
}
/**
* 声明 fanout 交换机.
*
* @return the exchange
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
}
/**
* 声明一个队列 支持持久化.
*
* @return the queue
*/
@Bean("queueA")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_A").build();
}
@Bean("queueB")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_B").build();
}
@Bean("queueC")
public Queue directQueue() {
return QueueBuilder.durable("QUEUE_C").build();
}
/**
* 绑定队列A 到 direct 交换机.
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queue,
@Qualifier("directExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_A").noargs();
}
@Bean
public Binding bindingA(@Qualifier("queueB") Queue queue,
@Qualifier("directExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_B").noargs();
}
/**
* 绑定队列A 到 fanout 交换机.
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingA(@Qualifier("queueB") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).;
}
@Bean
public Binding bindingA(@Qualifier("queueC") Queue queue,
@Qualifier("fanoutExchange") exchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange).;
}
}
消息发送类
@Service
public class SenderService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 测试广播模式.
*
* @param p the p
* @return the response entity
*/
public void broadcast(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
}
/**
* 测试 Direct 模式.
*
* @param p the p
* @return the response entity
*/
public void directA(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_A", p, correlationData);
}
public void directB(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_B", p, correlationData);
}
public void directNull(String p) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "", p, correlationData);
}
}
消息接收类
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
/**
* FANOUT广播队列监听一.
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception 这里异常需要处理
*/
@RabbitListener(queues = {"QUEUE_A"})
public void on(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
}
@RabbitListener(queues = {"QUEUE_B"})
public void t(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
}
@RabbitListener(queues = {"QUEUE_C"})
public void t(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("FANOUT_QUEUE_C " + new String(message.getBody()));
}
/**
* DIRECT模式.
*
* @param message the message
* @param channel the channel
* @throws IOException the io exception 这里异常需要处理
*/
@RabbitListener(queues = {"DIRECT_QUEUE"})
public void message(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
log.debug("DIRECT_QUEUE " + new String(message.getBody()));
}
}
测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class SenderServiceTest {
@Autowired
private SenderService senderService;
@Test
public void testCache() throws InterruptedException {
// 测试广播模式
senderService.broadcast(" Test 同学们集合啦!");
// 测试Direct模式
senderService.directA(" Test 定点消息 A ");
senderService.directB(" Test 定点消息 B ");
senderService.directNull(" Test 定点消息 null key ");
Thread.sleep(5000L);
}
}
结果
DIRECT_QUEUE_A " Test 同学们集合啦!"
FANOUT_QUEUE_B " Test 同学们集合啦!"
FANOUT_QUEUE_C " Test 同学们集合啦!"
DIRECT_QUEUE_A" Test 定点消息 A "
DIRECT_QUEUE_B" Test 定点消息 B "
null key的并没有出现,所以在 directExchange
中没有可以广播的队列(都绑定了routingkey)。
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: maxzhao
password: maxzhao
#支持发布确认
publisher-confirms: true
#支持发布返回
publisher-returns: true
# 默认 /
virtual-host: maxzhao_vhost
listener:
simple:
acknowledge-mode: manual #采用手动应答
concurrency: 1 #指定最小的消费者数量
max-concurrency: 1 #指定最大的消费者数量
retry:
enabled: true #是否支持重试
其他参考
JMS介绍和使用场景
简介:
讲解什么是消息队列,JMS的基础知识和使用场景
- 什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
- JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
- 使用场景:
- 跨平台
- 多语言
- 多项目
- 解耦
- 分布式事务
- 流量控制
- 最终一致性
- RPC调用
- 上下游对接,数据源变动->通知下属
概念
- JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
- JMS生产者(Message Producer)
- JMS消费者(Message Consumer)
- JMS消息
- JMS队列
- JMS主题
- JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe
术语
- Broker - 简单来说就是消息队列服务器的实体。
- Exchange - 消息路由器,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
- Queue - 消息队列,用来存储消息,每个消息都会被投入到一个或多个队列。
- Binding - 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
- RoutingKey - 路由关键字,Exchange 根据这个关键字进行消息投递。
- Producter - 消息生产者,产生消息的程序。
- Consumer - 消息消费者,接收消息的程序。
- Channel - 消息通道,在客户端的每个连接里可建立多个Channel,每个channel代表一个会话。
编程模型
- ConnectionFactory :连接工厂,JMS 用它创建连接
- Connection :JMS 客户端到JMS Provider 的连接
- Session: 一个发送或接收消息的线程
- Destination :消息的目的地;消息发送给谁.
- MessageConsumer / MessageProducer: 消息接收者,消费者
RabbitMQ
RabbitMQ是一个出色的消息代理中间件(Message Broker):接受和转发消息。你可以将它看作是一个邮局, 你把自己的信件写上收件人地址,然后放到邮筒里面就不用管了,由邮局负责将这个信件送到目的地。来自
一、安装
我这里使用的是 ArchLInux
sudo pacman -S rabbitmq
# 会自动安装 socat erlang-nox rabbitmq
RPM 安装
# 先安装erlang
rpm -Uvh https://mirrors.ustc.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
yum install erlang
#安装RabbitMQ
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
wget ftp://195.220.108.108/linux/centos/7.4.1708/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm
yum localinstall -C -y --disablerepo=* *.rpm
安装web管理界面
sudo rabbitmq-plugins enable rabbitmq_management
# 查看所有的插件
sudo rabbitmq-plugins list
启动
# 设置开机自启服务
systemctl enable rabbitmq-server
# 启动RabbitMQ
systemctl start rabbitmq-server
# 重启
systemctl restart rabbitmq-server
配置
vim /etc/rabbitmq/rabbitmq.config
配置端口,备注:消息端口5672
,则web访问端口为 15672
[
{rabbit,
[
{loopback_users, []},
{tcp_listeners, [5672]}
]
}
]
用户管理
# 修改guest的密码
sudo rabbitmqctl list_users
sudo rabbitmqctl change_password guest guest
# 创建其他管理员账号比如test/test:
sudo rabbitmqctl add_user maxzhao maxzhao
#
sudo rabbitmqctl set_user_tags maxzhao administrator
# /是 vhost的目录 Configure regexp Write regexp Read regexp
sudo rabbitmqctl set_permissions -p / maxzhao ".*" ".*" ".*"
# Sets user topic permissions for an exchange,默认使用 AMQP default 的exchange
# sudo rabbitmqctl set_topic_permissions
添加vhost
# 查看帮助
sudo rabbitmqctl --help
# 查看创建 vhost 的帮助
sudo rabbitmqctl add_vhost --help
# 创建
sudo rabbitmqctl add_vhost maxzhao_vhost
# 查看
sudo rabbitmqctl list_vhosts
# 赋权
sudo rabbitmqctl set_permissions -p /maxzhao_vhost maxzhao ".*" ".*" ".*"
删除 vhost
sudo rabbitmqctl add_vhost maxzhaoTest
sudo rabbitmqctl delete_vhost maxzhaoTest
二、任务队列等引用
本文地址:SpringBoot使用RabbitMQ及RabbitMQ介绍
推荐
下面2017年写的,但是比较全面的新手教材。