RocketMQ
本文内容:描述RocketMQ的概念与术语,最下方解释各种MQ之间的区别与选型
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:
- 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
- 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
- 支持拉(pull)和推(push)两种消息模式
- 单一队列百万消息的堆积能力
- 支持多种消息协议,如 JMS、MQTT 等
- 分布式高可用的部署架构,满足至少一次消息传递语义
- 提供 docker 镜像用于隔离测试和云集群部署
- 提供配置、指标和监控等功能丰富的 Dashboard
架构
系统架构组成:
系统启动流程:
概念模型
专业术语及其关联
1. Name Server
Name Server 为 producer 和 consumer 提供路由信息。
相对来说,nameserver的稳定性非常高。原因有二:
1. nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。无状态
2. nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
2. Broker
Broker 是 RocketMQ 系统的主要角色,其实就是MQ??topic管理?
Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。
2.1. 与NameServer关系
- 连接
- 单个broker和所有nameserver保持长连接
- 心跳
- 心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
- 心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
- 断开
- 时机:broker挂掉;心跳超时导致nameserver主动关闭连接
- 动作:一旦连接断开,nameserver会立即感知,更新topic与队列的对应关系,但不会通知生产者和消费者
2.2. 负载均衡
- 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。
- 如果某个topic消息量很大,应该给它多配置几个队列?这什么意思,并且尽量多分布在不同broker上,减轻某个broker的压力。
- topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。
2.3. 可用性
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。
这里有两个关键点:
- 一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。
- 消费者得到master宕机通知后,转向slave消费(重定向,对于2次开发者透明),但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。
2.4. 可靠性
- 所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高
- 同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠
- 异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电。
2.5. 消息清理
- 扫描间隔
- 默认10秒,由broker配置参数cleanResourceInterval决定
- 空间阈值
- 物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%
- 清理时机
- 默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值
- 文件保留时长
- 默认72小时,由broker配置参数fileReservedTime决定
2.6. 读写性能
- 文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高
- 永远一个文件在写,其他文件在读
- 顺序写,随机读
- 利用linux的sendfile???mmap+write吧机制,将消息内容直接输出到sokect管道,避免系统调用
2.7. 系统特性要求
- 大内存,内存越大性能越高,否则系统swap会成为性能瓶颈
- IO密集
- cpu load高,使用率低,因为cpu占用后,大部分时间在IO WAIT
- 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID10阵列
- 磁盘读取速度要求快,要求高转速大容量磁盘
3. Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。
也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
3.1. 与NameServer关系
- 连接
- 单个生产者者和一台NameServer保持长连接,定时查询topic配置信息,如果该NameServer挂掉,生产者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。
- 心跳
- 无心跳连接
- 轮询时间
- 默认情况下,生产者每隔30秒从NameServer获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。
3.2. 与Borker关系
- 连接
- 单个生产者和该生产者关联的所有broker保持长连接。
- 心跳
- 默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。
- 断开
- 移除broker上的生产者信息
3.3. 负载均衡
- 莫得均衡
4. Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。
5. Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。
5.1. 与NameServer关系
- 连接
- 单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。
- 心跳
- 不心跳关联
- 轮询时间
- 默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。
5.2. 与Borker关系
- 连接
- 单个消费者和该消费者关联的所有broker保持长连接。
- 心跳
- 默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
- 断开
- 时机:消费者挂掉;心跳超时导致broker主动关闭连接
- 动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费
5.3. 负载均衡
- 集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。
- 如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
5.4. 消费机制
- 本地队列
- 消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。
- 轮询间隔
- 消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。
- 消息消费数量
- 监听器每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。
5.5. 消费进度存储
- 每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。
5.6. 连接数
- 如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者和这个broker有几个连接?
- 一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务
6. Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组(集群)。
- 标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
- 消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
- 注: RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。
7. Filter Server(可选)
RocketMQ可以允许消费者上传一个Java类给Filter Server进行过滤。暂不使用
8. Topic
Topic是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。
9. Tag
标签可以被认为是对Topic进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。
10. Message
Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。
Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。
也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。
11. Message Queue
简称queue或Q,消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
- 无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
- 每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
是不是可以这样理解:这个队列的存在是为了可以让一个topic通过不同路径发布出去,
consumer指定topic实际上就是关注所有的Q,
也就是说,这样操作的意义就在于可以吧Q放到不同Borker启动负载均衡的作用呗。
所以,如果要顺序消费,就让topic只有Q就可以了呗,是不是这样得存疑?
12. Offset
RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。
- 可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1。下标就是offset。
- 一条message queue中的max offset表示消息的最大offset。注:这里从源码上看,max_offset并不是最新的那条消息的offset,而是表示最新消息的offset+1。
- 而min offset则标识现存在的最小offset。
- 由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。
13. Consumer Offset
用于标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里了。注:从源码上看,这个数值是最新消费的那条消息的offset+1,所以实际上这个值存储的是【下次拉取的话,从哪里开始拉取的offset】。
- 消费者拉取消息的时候需要指定offset,broker不主动推送消息,而是接受到请求的时候把存储的对应offset的消息返回给客户端。这个offset在成功消费后会更新到内存,并定时持久化。在集群消费模式下,会同步持久化到broker。在广播模式下,会持久化到本地文件。
- 实例重启的时候会获取持久化的consumer offset,用以决定从哪里开始消费。
其他术语
1. 集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度的存储会持久化到Broker。
2. 广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度会存储持久化到实例本地。
3. 顺序消息
消费消息的顺序要同发送消息的顺序一致。
- 由于Consumer消费消息的时候是针对Message Queue顺序拉取并开始消费,且一条Message Queue只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于Q上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行)。
- 在RocketMQ中,顺序消费主要指的是都是Queue级别的局部顺序。
- 这一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息
生产者发送的时候可以用MessageQueueSelector为某一批消息(通常是有相同的唯一标示id)选择同一个Queue
或者Message Queue的数量只有1,但这样消费的实例也会只有一个,多出来的实例都会空跑。(集群模式)
4. 普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息。
这种是一旦发生异常,Broker宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致。
如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
5. 严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序。
牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前并未实现).
RocketMQ 架构
由这张图可以看到有四个集群,分别是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:
- NameServer:提供轻量级的服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
- Broker:通过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
- Producer:生产者,产生消息的实例,拥有相同 Producer Group 的 Producer 组成一个集群。
- Consumer:消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的Consumer 组成一个集群。
简单说明一下图中箭头含义,从 Broker 开始,
Broker Master1 和 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。
同时每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer 中。
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,
定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,
且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,
但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave
建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
RocketMQ 集群部署模式
1.单master模式
也就是只有一个 master 节点,称不上是集群,
一旦这个 master 节点宕机,那么整个服务就不可用,适合个人学习使用。
2.多 master 模式
多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高
缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
*注意*:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。
3.多 master 多 slave 异步复制模式
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。
master节点可读可写,但是slave只能读不能写,类似于 mysql 的主备模式。
优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
4.多 master 多 slave 同步双写模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
*注意*:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。
对MQ产品的总结
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
设计定位 | 小型项目常规成员 | 非日志可靠信息传输 | 非日志可靠信息传输 | 系统间数据流管道,实时数据处理 |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级(Kafka目的) |
topic数量对性能的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降;这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | Kopic从几十个到几百个的时候,吞吐量会大幅度下降;所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | us级 | ms级 | ms级以内 |
Available | 中 主从 | 中 主从 | 高 分布式 | 高 分布式 |
Reliability | 较低的概率丢失数据 | 极低或可为0 | 极低或可为0 | |
功能&扩展 | 功能完善&扩展一般 | 功能完善&很难扩展(erlang) | 功能完善&扩展良好 | 功能简单&为了不同环境 |
社区活跃度 | 低 | 高 | 中 | 高 |
** 以上基本排除ActiveMQ **
** 以下是其他三种精细对比 **