RabbitMQ
1. 什么是mq?
消息队列(MessageQueue,简称MQ):是在消息的传输过程中保存消息的容器。
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ.
2. MQ选型和对比?
1).从社区活跃度
按照目前网络上的资料,RabbitMQ 、activeMQ 、ZeroMQ 三者中,综合来看,RabbitMQ是首选。
2.)持久化消息比较
ZeroMq不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。
3.)综合技术实现
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。
4.)高并发
毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。
5.)比较关注的比较, RabbitMQ 和Kafka
RabbitMq比Kafka 成熟,在可用性上,稳定性上,可靠性上, RabbitMq 胜于 Kafka (理论上)。
另外,Kafka 的定位主要在日志等方面,因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以如果业务方面还是建议选择 RabbitMq 。
还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多
2什么是RabbitMQ?
RabbitMQ是一个使用 erlang 编写的 AMQP(高级消息队列协议)的服务实现,简单来说,就是一个功能强大的消息队列服务。
3.为什么使用RabbitMQ?
1)解耦合,避免代码的出错
2)同步变异步,提升效率
3)解决高并发
4.安装RabbitMQ
4.1安装RabbitMQ的语言环境--Erlang
1) 下载Erlang
https://packages.erlang-solutions.com/erlang/rpm/centos/6/x86_64/esl-erlang_21.0-1~centos~6_amd64.rpm
2) 安装Erlang依赖:
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
3) 安装erlang:rpm -ivh esl-erlang_21.0-1_centos_6_amd64.rpm
4)测试: erl -version
4.2 安装RabbitMQ
1)下载RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.18/rabbitmq-server-3.7.18-1.el6.noarch.rpm
2)安装RabbitMQ依赖:'
rpm -ivh esl-erlang-compat-R14B-1.el6.noarch.rpm
rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
3) 安装RabbitMQ:rpm -ivh rabbitmq-server-3.7.18-1.el6.noarch.rpm
4) 测试 : rabbitmq-server start
5)设置开机启动:chkconfig rabbitmq-server on
4.3安装Web管理界面插件
rabbitmq-plugins enable rabbitmq_management
4.4 创建用户
1)创建用户:rabbitmqctl add_user admin 1111
2) 给用户分配角色: rabbitmqctl set_user_tags admin administrator
3)给用户分配权限 : rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
4)测试:http://192.168.75.140:15672
5. 消息队列的运行原理:
5.1 相关术语:
Provider : 消息生产者,就是投递消息的程序。
Broker : 表示消息队列服务器实体。
Virtual Hosts: 相当于分组,每个vhost下数据是隔离的虚拟主机。
Exchange:
用来接收生产者发送的消息并将这些消息路由给服务器中的队列
direct:
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “test”,则只有被标记为“test”的消息才被转发,不会转发test.aaa,也不会转发dog.123,只会转发test。
topic:
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
fanout:
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
bindind:
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息
队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
RoutingKey:
路由键。RabbitMQ 决定消息该投递到哪个队列的规则、队列通过路由键绑定到交换器。
消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将其和绑定使用的路由键进行匹配。如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞。
Queue: 队列,用于存储消息
Connection:就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。
Channel(信道)
1,Channel 中文叫做信道,是 TCP 里面的虚拟链接
2,TCP 一旦打开,就会创建 AMQP 信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
Consumer : 消息消费者,就是接受消息的程序。
6. 使用RabbitMq 实现简单的发送,接收消息
6.1 添加依赖:
6.2 消息生产者previder发送消息
6.3 消息消费者(consumer)接收消息
7. 多个消费者
7.1 轮询分发
7.1.1 消息生产者previder发送消息
7.1.2 消息消费者(consumer)接收消息
7.2 公平分发
7.2.1 消息生产者previder发送消息
7.1.3 测试结果:
消费者1比消费者2获取的消息一样多。
7.2.2消息消费者(consumer)接收消息
7.2.3 测试结果:
消费者1比消费者2获取的消息更多。
8 . Rabbit交换器
8.1 交换器类型
1. fanout(广播)
sender
receive
2. Direct交换器( 发布与订阅)
sender
receive
3. topic(主题,规则匹配)
sender
receive
9.RabbitMQ 中的消息确认模式
1) 确认消息值接收一条: channel.basicQos(1);
2) 手动为false,自动为true
channel.basicConsume(queueName, false, consumer);
3)返回确认的状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
10.RabbitMQ的消息持久化处理
消息的可靠性是RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化
持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。
10.1设置路由器的持久化
false:服务器宕机,或重启服务器,exchange自动销毁
true:服务器宕机,或重启服务器,exchange保存
10.2 设置序列的持久化
10.2.1 设置autodelete属性,true:没有消费者 自动删除,false 没有消费者 不删除,一般为false
11.Spring Boot 整合RabbitMQ
11.1 需求1 (fanout类型)
步骤如下:
1)创建maven 项目
Consumer
sender:
2) 修改pom.xml文件,添加RabbitMQd的坐标
sender与consumer 一致
3) 修改全局配置文件application.properties
consumer
sender
4) 编写代码
consumer:
MessageReceive类
PushReceive类
sender:
5)创建启动类
sender与consumer一致
5)测试
sender:
结果:
11.2 需求2 (direct类型)
步骤如下:
1)创建maven 项目
consumer:
sender:
2) 修改pom.xml文件,添加RabbitMQd的坐标
consumer与sender一致
3) 修改全局配置文件application.properties
consumer:
sender:
4) 编写代码
consumer:
ErrorReceive类
InfoReceive类
sender:
5)创建启动类
sender与consumer一致
6)测试
sender
结果:
11.3 需求3 ( topic类型)
步骤如下:
1)创建maven 项目
consumer
sender:
2) 修改pom.xml文件,添加RabbitMQd的坐标
consumer与sender一致、
3) 修改全局配置文件application.properties
consumer:
sender:
4) 编写代码
consumer:
AllReceive类
ErrorReceive类
InfoReceive类
sender:
OrderSender类
StoreSender类
UsersSender类
5)创建启动类
sender与consumer一致
6)测试
结果:
routingkey为*.log.*的接收所有的log
routingkey为*.log.error的只接收error的log
routingkey为*.log.info的只接收info的log