SpringBoot使用RabbitMQ及RabbitMQ介绍

title: SpringBoot MQ参考资料
date: 2019-06-24
author: maxzhao
tags:
  - SpringBoot
  - MQ
categories:
  - DevelopTools
  - SpringBoot
  - 中间件

SpringBoot使用RabbitMQ

RabbitMQ 是

  • 消息队列
  • 实现AMQP(高级消息队列协议Advanced Message Queuing Protocol)的消息中间件的一种

作用:主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

流程:

一般消息队列都是生产者将消息发送到队列,消费者监听队列进行消费。

rabbitmq中一个虚拟主机(vhost默认 /)持有一个或者多个交换机(Exchange)。 用户只能在虚拟主机的粒度进行权限控制,交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上, 这样生产者和队列就没有直接联系,而是将消息发送的交换机,交换机再把消息转发到对应绑定的队列上。

交换机(Exchange)为rabbitmq独特的概念,用到的最常见的是4中类型:

  1. Direct: 先匹配, 再投送。即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去. 交换机跟队列必须是精确的对应关系,这种最为简单。
  2. Topic: 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版
  3. Headers: 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers则是一个自定义匹配规则的类型, 在队列与交换器绑定时会设定一组键值对规则,消息中也包括一组键值对( headers属性),当这些键值对有一对或全部匹配时,消息被投送到对应队列
  4. Fanout : 消息广播模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routingkey会被忽略

举例说明

创建 2 个交换机directExchangefanoutExchange,3个 队列 queueAqueueBqueueC

队列directExchange作为定点发送,包含队列 A B

队列fanoutExchange作为广播发送,包含队列 A B C

@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 声明 Direct 交换机 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }
    /**
     * 声明 fanout 交换机.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }
    /**
     * 声明一个队列 支持持久化.
     *
     * @return the queue
     */
    @Bean("queueA")
    public Queue directQueue() {
        return QueueBuilder.durable("QUEUE_A").build();
    }
    @Bean("queueB")
    public Queue directQueue() {
        return QueueBuilder.durable("QUEUE_B").build();
    }
    @Bean("queueC")
    public Queue directQueue() {
        return QueueBuilder.durable("QUEUE_C").build();
    }
    /**
     * 绑定队列A 到 direct 交换机.
     *
     * @param queue          the queue
     * @param exchange       the  exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("queueA") Queue queue,
                            @Qualifier("directExchange") exchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_A").noargs();
    }
    @Bean
    public Binding bindingA(@Qualifier("queueB") Queue queue,
                            @Qualifier("directExchange") exchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange).with("DIRECT_ROUTING_KEY_B").noargs();
    }
    /**
     * 绑定队列A 到 fanout 交换机.
     *
     * @param queue          the queue
     * @param exchange       the  exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("queueA") Queue queue,
                            @Qualifier("fanoutExchange") exchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingA(@Qualifier("queueB") Queue queue,
                            @Qualifier("fanoutExchange") exchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange).;
    }
    @Bean
    public Binding bindingA(@Qualifier("queueC") Queue queue,
                            @Qualifier("fanoutExchange") exchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange).;
    }

}

消息发送类

@Service
public class SenderService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试广播模式.
     *
     * @param p the p
     * @return the response entity
     */
    public void broadcast(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
    }

    /**
     * 测试 Direct 模式.
     *
     * @param p the p
     * @return the response entity
     */
    public void directA(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_A", p, correlationData);
    }
    public void directB(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_B", p, correlationData);
    }
    public void directNull(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "", p, correlationData);
    }

}

消息接收类

@Component
public class Receiver {
    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    /**
     * FANOUT广播队列监听一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
    }
    @RabbitListener(queues = {"QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
    }
    @RabbitListener(queues = {"QUEUE_C"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_C " + new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT_QUEUE " + new String(message.getBody()));
    }
}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class SenderServiceTest {
    @Autowired
    private SenderService senderService;

    @Test
    public void testCache() throws InterruptedException {
        // 测试广播模式
        senderService.broadcast("   Test    同学们集合啦!");
        // 测试Direct模式
        senderService.directA("   Test    定点消息 A ");
        senderService.directB("   Test    定点消息 B ");
        senderService.directNull("   Test    定点消息 null key ");
        Thread.sleep(5000L);
    }
}

结果

DIRECT_QUEUE_A "   Test    同学们集合啦!"
FANOUT_QUEUE_B "   Test    同学们集合啦!"
FANOUT_QUEUE_C "   Test    同学们集合啦!"
DIRECT_QUEUE_A"   Test    定点消息 A "
DIRECT_QUEUE_B"   Test    定点消息 B "

null key的并没有出现,所以在 directExchange中没有可以广播的队列(都绑定了routingkey)。

Maven 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: maxzhao
    password: maxzhao
    #支持发布确认
    publisher-confirms: true 
    #支持发布返回
    publisher-returns: true      
    # 默认 /
    virtual-host: maxzhao_vhost 
    listener:
      simple:
        acknowledge-mode: manual #采用手动应答
        concurrency: 1 #指定最小的消费者数量
        max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true #是否支持重试

其他参考

JMS介绍和使用场景

简介:

讲解什么是消息队列,JMS的基础知识和使用场景

  1. 什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
  2. JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
  3. 使用场景:
    1. 跨平台
    2. 多语言
    3. 多项目
    4. 解耦
    5. 分布式事务
    6. 流量控制
    7. 最终一致性
    8. RPC调用
    9. 上下游对接,数据源变动->通知下属

概念

  1. JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
  2. JMS生产者(Message Producer)
  3. JMS消费者(Message Consumer)
  4. JMS消息
  5. JMS队列
  6. JMS主题
  7. JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe

术语

  1. Broker - 简单来说就是消息队列服务器的实体。
  2. Exchange - 消息路由器,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
  3. Queue - 消息队列,用来存储消息,每个消息都会被投入到一个或多个队列。
  4. Binding - 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
  5. RoutingKey - 路由关键字,Exchange 根据这个关键字进行消息投递。
  6. Producter - 消息生产者,产生消息的程序。
  7. Consumer - 消息消费者,接收消息的程序。
  8. Channel - 消息通道,在客户端的每个连接里可建立多个Channel,每个channel代表一个会话。

编程模型

  1. ConnectionFactory :连接工厂,JMS 用它创建连接
  2. Connection :JMS 客户端到JMS Provider 的连接
  3. Session: 一个发送或接收消息的线程
  4. Destination :消息的目的地;消息发送给谁.
  5. MessageConsumer / MessageProducer: 消息接收者,消费者

RabbitMQ

RabbitMQ是一个出色的消息代理中间件(Message Broker):接受和转发消息。你可以将它看作是一个邮局, 你把自己的信件写上收件人地址,然后放到邮筒里面就不用管了,由邮局负责将这个信件送到目的地。来自

一、安装

我这里使用的是 ArchLInux

sudo pacman -S rabbitmq
# 会自动安装 socat  erlang-nox rabbitmq 

RPM 安装

# 先安装erlang
rpm -Uvh https://mirrors.ustc.edu.cn/epel/7/x86_64/Packages/e/epel-release-7-11.noarch.rpm
yum install erlang
#安装RabbitMQ
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-3.6.15-1.el7.noarch.rpm
wget ftp://195.220.108.108/linux/centos/7.4.1708/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm
yum localinstall -C -y --disablerepo=* *.rpm

安装web管理界面

sudo rabbitmq-plugins enable rabbitmq_management
# 查看所有的插件
sudo rabbitmq-plugins list

启动

# 设置开机自启服务
systemctl enable rabbitmq-server
# 启动RabbitMQ
systemctl start rabbitmq-server
# 重启
systemctl restart rabbitmq-server

配置

vim /etc/rabbitmq/rabbitmq.config

配置端口,备注:消息端口5672,则web访问端口为 15672

[
    {rabbit,
      [
        {loopback_users, []},
        {tcp_listeners, [5672]}
      ]
    }
]

用户管理

# 修改guest的密码
sudo rabbitmqctl list_users
sudo rabbitmqctl change_password guest guest

# 创建其他管理员账号比如test/test:
sudo rabbitmqctl add_user maxzhao maxzhao
# 
sudo rabbitmqctl set_user_tags maxzhao administrator
# /是 vhost的目录  Configure regexp   Write regexp  Read regexp
sudo rabbitmqctl set_permissions -p / maxzhao ".*" ".*" ".*"
# Sets user topic permissions for an exchange,默认使用 AMQP default 的exchange
# sudo rabbitmqctl set_topic_permissions

添加vhost

# 查看帮助
sudo rabbitmqctl  --help
# 查看创建 vhost 的帮助
sudo rabbitmqctl add_vhost --help
# 创建 
sudo rabbitmqctl add_vhost maxzhao_vhost
# 查看
sudo rabbitmqctl  list_vhosts
# 赋权
sudo rabbitmqctl set_permissions -p /maxzhao_vhost maxzhao ".*" ".*" ".*"

删除 vhost

sudo rabbitmqctl add_vhost maxzhaoTest
sudo rabbitmqctl delete_vhost maxzhaoTest

二、任务队列等引用

本文地址:SpringBoot使用RabbitMQ及RabbitMQ介绍

推荐

官方示例

下面2017年写的,但是比较全面的新手教材。

RabbitMQ简易教程 - 任务队列

RabbitMQ简易教程 - 发布订阅

RabbitMQ简易教程 - 路由

RabbitMQ简易教程 - 主题

RabbitMQ简易教程 - RPC

RabbitMQ简易教程 - WebSocket

RabbitMQ简易教程 - 并发调度

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342