RocketMQ-01、RocketMQ核心

1.RocketMQ的概念模型

image
image

Topic

消息主题,一级消息类型,通过 Topic 对消息进行分类。详情请见Topic 与 Tag 最佳实践。

Message

消息,消息队列中信息传递的载体。

Message ID

消息的全局唯一标识,由消息队列 RocketMQ 系统自动生成,唯一标识某条消息。

Message Key

消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

Tag

消息标签,二级消息类型,用来进一步区分某个 Topic 下的消息分类。详情请见Topic 与 Tag 最佳实践。

Producer

消息生产者,也称为消息发布者,负责生产并发送消息。

Producer 实例

Producer 的一个对象实例,不同的 Producer 实例可以运行在不同进程内或者不同机器上。Producer 实例线程安全,可在同一进程内多线程之间共享。

Consumer

消息消费者,也称为消息订阅者,负责接收并消费消息。

Consumer 实例

Consumer 的一个对象实例,不同的 Consumer 实例可以运行在不同进程内或者不同机器上。一个 Consumer 实例内配置线程池消费消息。

Group

一类 Producer 或 Consumer,这类 Producer 或 Consumer 通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

Group ID

Group 的标识。

Exactly-Once 投递语义

Exactly-Once 投递语义是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息也在消费端也只被消费一次。详情请见Exactly-Once 投递语义。

集群消费

一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。详情请见集群消费和广播消费。

广播消费

一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。详情请见集群消费和广播消费。

定时消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。详情请见定时和延时消息。

延时消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。详情请见定时和延时消息。

事务消息

消息队列 RocketMQ 提供类似 X/Open XA 的分布事务功能,通过消息队列 RocketMQ 的事务消息能达到分布式事务的最终一致。详情请见事务消息。

顺序消息

消息队列 RocketMQ 提供的一种按照顺序进行发布和消费的消息类型, 分为全局顺序消息和分区顺序消息。详情请见顺序消息。

全局顺序消息

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。详情请见顺序消息。

分区顺序消息

对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 key 是完全不同的概念。详情请见顺序消息。

消息堆积

Producer 已经将消息发送到消息队列 RocketMQ 的服务端,但由于 Consumer 消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列 RocketMQ 的服务端保存着未被消费的消息,该状态即消息堆积。

消息过滤

消费者可以根据消息标签(Tag)对消息进行过滤,确保消费者最终只接收被过滤后的消息类型。消息过滤在消息队列 RocketMQ 的服务端完成。详情请见消息过滤。

消息轨迹

在一条消息从生产者发出到订阅者消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由消息队列 RocketMQ 服务端,投递给消息消费者的完整链路,方便定位排查问题。详情请见消息轨迹简介。

重置消费位点

以时间轴为坐标,在消息持久化存储的时间范围内(默认 3 天),重新设置消息消费者对其订阅 Topic 的消费进度,设置完成后订阅者将接收设定时间点之后由消息生产者发送到消息队列 RocketMQ 服务端的消息。详情请见重置消费位点。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

消息队列 RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

详情请见死信队列。

消息路由

消息路由常用于不同地域之间的消息同步,保证地域之间的数据一致性。消息队列 RocketMQ 的全球消息路由功能依托阿里云优质基础设施实现的高速通道专线,可以高效地实现国内外不同地域之间的消息同步复制。详情请见全球消息路由。

2.RocketMQ的部署模型

image

总共有四大部分:NameServer,Broker,Producer,Consumer。

Broker 消息服务器在启动时向所有Name Server 注册,消息生产者(Producer)在发送消息之前先从Name Server 获取Broker 服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer 与每台Broker 服务器保持长连接,并间隔30s 检测Broker 是否存活,如果检测到Broker 右机, 则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer 实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性

NameServer 本身的高可用可通过部署多台Names 巳rver 服务器来实现,但彼此之间互不通信,也就是NameServer 服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer 设计的一个亮点, RocketMQNameServer 设计追求简单高效。

2.1.NameServer

NameServer主要包括两个主要功能:

  1. 管理brokers:broker服务器启动时会注册到NameServer上,并且两者之间保持心跳监测机制,以此来保证NameServer知道broker的存活状态;
  2. 路由信息管理:每一台NameServer都存有全部的broker集群信息和生产者/消费者客户端的请求信息;

一些特点:

Namesrv用于存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间相互没有通信,单台Namesrv宕机不影响其他Namesrv与集群;即使整个Namesrv集群宕机,已经正常工作的Producer,Consumer,Broker仍然能正常工作,但新起的Producer, Consumer,Broker就无法工作

Namesrv压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向Namesr发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败

2.2.Broker

Broker的四大作用:

  1. 请求分发:是client的入口,接收来自生产者消费者的请求
  2. client管理:管理客户(产品/消费者)并维护消费者的主题订阅。
  3. 数据存储:提供简单的api来查询磁盘上的临时数据
  4. 高可用:主从节点间同步数据保证高可用

一些特点:

  1. 负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
  2. 动态伸缩能力(非顺序消息):Broker的伸缩性体现在两个维度:Topic, Broker。
    1. Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
    2. Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后想Namesrv注册,Producer、Consumer通过Namesrv发现新Broker,立即跟该Broker直连,收发消息。
  3. 高可用&高可靠
    1. 高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
    2. 高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

2.3.Producer

Producer启动时,需要指定Namesrv的地址,从Namesrv集群中选一台建立长连接。如果该Namesrv宕机,会自动连其他Namesrv。直到有可用的Namesrv为止。生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。

2.3.1.Producer三种发送方式

  • 同步:在广泛的场景中使用可靠的同步传输,如重要的通知信息、短信通知、短信营销系统等。
  • 异步:异步发送通常用于响应时间敏感的业务场景,发送出去即刻返回,利用回调做后续处理。
  • 一次性:一次性发送用于需要中等可靠性的情况,如日志收集,发送出去即完成,不用等待发送结果,回调等等。

生产者端的负载均衡

生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。

2.4.Consumer

消费客户端的连接方式和生产者类似。

消费者端的负载均衡

先讨论消费者的消费模式,消费者有两种模式消费:集群消费,广播消费

集群:使用相同 Group ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。

2.4.1.广播消费

广播消费:每个消费者消费Topic下的所有队列。当使用广播消费模式时,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

image

适用场景&注意事项

  • 广播消费模式下不支持顺序消息。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端第一次启动时默认从最新消息消费。客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

2.4.2.集群消费

集群消费:一个topic可以由同一个ID下所有消费者分担消费。当使用集群消费模式时,消息队列 RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

image

适用场景&注意事项

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

2.4.3.使用集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic

image

适用场景&注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。 当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。 假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。 同时,实例之间订阅关系必须保持一致。

消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。

消费者从用户角度来看有两种类型:

  • PullConsumer:主动从brokers处拉取消息。一旦拉取到批量的数据,用户应用的消费进程初始化。
  • PushConsumer:封装消息拉取、消费进程和内部其他工作维护,留下一个回调接口让用户实现,当消息到达时即可执行用户实现逻辑。

3.消息存储

image

RocketMQ 主要存储的文件包括Comitlog 文件、ConsumeQueue 文件、IndexFile 文件。RocketMQ 将所有主题的消息存储在同-个文件中,确保消息发送时顺序写文件,尽仅最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率, RocketMQ 引入了ConsumeQueue 消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。Ind 巳xFile 索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog 文件中检索消息。RocketMQ 是一款高性能的消息中间件,存储部分的设计是核心,存储的核心是IO 访问性能,本章也会重点剖析RocketMQ 是如何提高IO 访问性能的。进入RocketMQ 存储剖析之前,先看一下RocketMQ 数据流向,如图4-1 所示。

  • CommitLog:消息存储文件,所有消息主题的消息都存储在CommitLog 文件中。

  • ConsumeQueue:消息消费队列,消息到达CommitLog 文件后,将异步转发到消息消费队列,供消息消费者消费。

  • IndexFile:消息索引文件,主要存储消息Key 与Offset 的对应关系。

  • 事务状态服务: 存储每条消息的事务状态。

  • 定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度。

  • commitlog :消息存储目录。

  • config :运行期间一些配置信息,主要包括下列信息。

    • consumerFilter.json : 主题消息过滤信息。
    • consumerOffset.json : 集群消费模式消息消费进度。
    • delayOffset.json :延时消息队列拉取进度。
    • subscriptionGroup.json : 消息消费组配置信息。
    • topics.json: topic 配置属性。
image

image
  • consumequeue :消息消费队列存储目录。
  • index :消息索引文件存储目录。
  • abort :如果存在abort 文件说明Broker 非正常关闭,该文件默认启动时创建,正常退出之前删除。
  • checkpoint :文件检测点,存储commitlog 文件最后一次刷盘时间戳、consumequeue最后一次刷盘时间、index 索引文件最后一次刷盘时间戳。
image
image

3.1.PageCache

MappedByteBuffer

RocketMQ中的文件读写主要就是通过MappedByteBuffer进行操作,来进行文件映射。利用了nio中的FileChannel模型,可以直接将物理文件映射到缓冲区,提高读写速度。

了解了每个文件都在什么位置存放什么内容,那接下来就正式开始讨论这种存储方案为什么在性能带来的提升。

通常文件读写比较慢,如果对文件进行顺序读写,速度几乎是接近于内存的随机读写,为什么会这么快,原因就是Page Cache。

image

先来个直观的感受,整个OS有3.7G的物理内存,用掉了2.7G,应当还剩下1G空闲的内存,但OS给出的却是175M。当然这个数学题肯定不能这么算。

OS发现系统的物理内存有大量剩余时,为了提高IO的性能,就会使用多余的内存当做文件缓存,也就是图上的buff / cache,广义我们说的Page Cache就是这些内存的子集。

OS在读磁盘时会将当前区域的内容全部读到Cache中,以便下次读时能命中Cache,写磁盘时直接写到Cache中就写返回,由OS的pdflush以某些策略将Cache的数据Flush回磁盘。

但是系统上文件非常多,即使是多余的Page Cache也是非常宝贵的资源,OS不可能将Page Cache随机分配给任何文件,Linux底层就提供了mmap将一个程序指定的文件映射进虚拟内存(Virtual Memory),对文件的读写就变成了对内存的读写,能充分利用Page Cache。不过,文件IO仅仅用到了Page Cache还是不够的,如果对文件进行随机读写,会使虚拟内存产生很多缺页(Page Fault)中断。

image

每个用户空间的进程都有自己的虚拟内存,每个进程都认为自己所有的物理内存,但虚拟内存只是逻辑上的内存,要想访问内存的数据,还得通过内存管理单元(MMU)查找页表,将虚拟内存映射成物理内存。如果映射的文件非常大,程序访问局部映射不到物理内存的虚拟内存时,产生缺页中断,OS需要读写磁盘文件的真实数据再加载到内存。如同我们的应用程序没有Cache住某块数据,直接访问数据库要数据再把结果写到Cache一样,这个过程相对而言是非常慢的。

但是顺序IO时,读和写的区域都是被OS智能Cache过的热点区域,不会产生大量缺页中断,文件的IO几乎等同于内存的IO,性能当然就上去了。

说了这么多Page Cache的优点,也得稍微提一下它的缺点,内核把可用的内存分配给Page Cache后,free的内存相对就会变少,如果程序有新的内存分配需求或者缺页中断,恰好free的内存不够,内核还需要花费一点时间将热度低的Page Cache的内存回收掉,对性能非常苛刻的系统会产生毛刺。

image

3.2.刷盘

刷盘一般分成:同步刷盘和异步刷盘

image

3.2.1.同步刷盘

在消息真正落盘后,才返回成功给Producer,只要磁盘没有损坏,消息就不会丢。

image

一般只用于金融场景,这种方式不是本文讨论的重点,因为没有利用Page Cache的特点,RMQ采用GroupCommit的方式对同步刷盘进行了优化。

3.2.2.异步刷盘

读写文件充分利用了Page Cache,即写入Page Cache就返回成功给Producer,RMQ中有两种方式进行异步刷盘,整体原理是一样的。

刷盘由程序和OS共同控制

先谈谈OS,当程序顺序写文件时,首先写到Cache中,这部分被修改过,但却没有被刷进磁盘,产生了不一致,这些不一致的内存叫做脏页(Dirty Page)。

image

脏页设置太小,Flush磁盘的次数就会增加,性能会下降;脏页设置太大,性能会提高,但万一OS宕机,脏页来不及刷盘,消息就丢了。

image

一般不是高配玩家,用OS的默认值就好,如上图。

image

RMQ想要性能高,那发送消息时,消息要写进Page Cache而不是直接写磁盘,接收消息时,消息要从Page Cache直接获取而不是缺页从磁盘读取。

好了,原理回顾完,从消息发送和消息接收来看RMQ中被mmap后的Commit Log和Consume Queue的IO情况。

3.3.commitLog(RMQ发送逻辑)

发送时,Producer不直接与Consume Queue打交道。上文提到过,RMQ所有的消息都会存放在Commit Log中,为了使消息存储不发生混乱,对Commit Log进行写之前就会上锁。

image

消息持久被锁串行化后,对Commit Log就是顺序写,也就是常说的Append操作。配合上Page Cache,RMQ在写Commit Log时效率会非常高。

3.3.Consume Queue(RMQ消费逻辑)

CommitLog持久后,会将里面的数据Dispatch到对应的Consume Queue上。

image
image

消费时,Consumer不直接与Commit Log打交道,而是从Consume Queue中去拉取数据

image

拉取的顺序从旧到新,在文件表示每一个Consume Queue都是顺序读,充分利用了Page Cache。

上述的消息存储只是把消息主体存储到了物理文件中,但是并没有把消息处理到consumeQueue文件中,那么到底是哪里存入的?

任务处理一般都分为两种:

一种是同步,把消息主体存入到commitLog的同时把消息存入consumeQueue,rocketMQ的早期版本就是这样处理的。

另一种是异步处理,起一个线程,不停的轮询,将当前的consumeQueue中的offSet和commitLog中的offSet进行对比,将多出来的offSet进行解析,然后put到consumeQueue中的MapedFile中。

image

3.4.commitLog随机读(RMQ消费逻辑)

光拉取Consume Queue是没有数据的,里面只有一个对Commit Log的引用,所以再次拉取Commit Log。

image

Commit Log会进行随机读

image

但整个RMQ只有一个Commit Log,虽然是随机读,但整体还是有序地读,只要那整块区域还在Page Cache的范围内,还是可以充分利用Page Cache。

image

在一台真实的MQ上查看网络和磁盘,即使消息端一直从MQ读取消息,也几乎看不到进程从磁盘拉数据,数据直接从Page Cache经由Socket发送给了Consumer。

3.5.Index

3.5.1.消息索引的作用

这里的消息索引主要是提供根据起始时间、topic和key来查询消息的接口。

首先根据给的topic、key以及起始时间查询到一个list,然后将offset拉到commitLog中查询,再反序列化成消息实体。

image

构建consumeQueue的同时会buildIndex构建索引

  • 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目, 例如图中所示 slotNum=5000000) 。
  • 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是挃吐最新的一个项目开源主页:https://github.com/alibaba/RocketMQ
  • 遍历索引项列表迒回查询时间范围内的结果集(默讣一次最大迒回的 32 条记彔)
  • Hash 冲突: 寻找 key 的 slot 位置时相当亍执行了两次散列函数,一次 key 的 hash,一次 key 的 hash 值取模, 因此返里存在两次冲突的情冴;第一种,key 的 hash 值丌同但模数相同,此时查询的时候会在比较一次 key 的 hash 值(每个索引项保存了 key 的 hash 值),过滤掉 hash 值丌相等的项。第二种,hash 值相等但 key 相等, 出亍性能的考虑冲突的检测放到客户端处理(key 的原始值是存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。
  • 存储: 为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引文件头中), 整个索引文件是定长的,结构也是固定的。

4.事务消息

4.1.业务逻辑

image
image
  1. Producer发送Half(prepare)消息到broker;

  2. half消息发送成功之后执行本地事务;

  3. (由用户实现)本地事务执行如果成功则返回commit,如果执行失败则返回roll_back。

  4. Producer发送确认消息到broker(也就是将步骤3执行的结果发送给broker),这里可能broker未收到确认消息,下面分两种情况分析:

  5. 如果broker收到了确认消息:

    1. 如果收到的结果是commit,则broker视为整个事务过程执行成功,将消息下发给Conusmer端消费;
    2. 如果收到的结果是rollback,则broker视为本地事务执行失败,broker删除Half消息,不下发给consumer。
  6. 如果broker未收到了确认消息:

    1. broker定时回查本地事务的执行结果;
    2. (由用户实现)如果本地事务已经执行则返回commit;如果未执行,则返回rollback;
    3. Producer端回查的结果发送给broker;
    4. broker接收到的如果是commit,则broker视为整个事务过程执行成功,将消息下发给Conusmer端消费;如果是rollback,则broker视为本地事务执行失败,broker删除Half消息,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。

4.2.实现细节

image
image
image

4.3.使用细节

image

4.4.多分支事务

image

5.顺序消息

5.1.引入

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!——沈询

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  • 不关注乱序的应用实际大量存在
  • 队列无序并不意味着消息无序

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?

RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

image

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。

image

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。如图:

image

而消费端消费的时候,是会分配到多个queue的,多个queue是同时拉取提交消费。如图:

image

但是同一条queue里面,RocketMQ的确是能保证FIFO的。那么要做到顺序消息,应该怎么实现呢——把消息确保投递到同一条queue。

5.2.使用细节

下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。


/**
 * Producer,发送顺序消息
 */
public class Producer {
    
    public static void main(String[] args) throws IOException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 
            producer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
 
            producer.start();
 
            String[] tags = new String[] { "TagA", "TagC", "TagD" };
            
            // 订单列表
            List<OrderDemo> orderList =  new Producer().buildOrders();
            
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String dateStr = sdf.format(date);
            for (int i = 0; i < 10; i++) {
                // 加个时间后缀
                String body = dateStr + " Hello RocketMQ " + orderList.get(i);
                Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes());
 
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Long id = (Long) arg;
                        long index = id % mqs.size();
                        return mqs.get((int)index);
                    }
                }, orderList.get(i).getOrderId());//订单id
 
                System.out.println(sendResult + ", body:" + body);
            }
            
            producer.shutdown();
 
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.in.read();
    }
    
    /**
     * 生成模拟订单数据 
     */
    private List<OrderDemo> buildOrders() {
        List<OrderDemo> orderList = new ArrayList<OrderDemo>();
 
        OrderDemo orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        
        orderDemo = new OrderDemo();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        
        return orderList;

image

从图中红色框可以看出,orderId等于15103111039的订单被顺序放入queueId等于7的队列。queueOffset同时在顺序增长。

发送时有序,接收(消费)时也要有序,才能保证顺序消费。如下这段代码演示了普通消费(非有序消费)的实现方式。

/**
 * 普通消息消费
 */
public class Consumer {
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
 
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            Random random = new Random();
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}
image

可见,订单号为15103111039的订单被消费时顺序完成乱了。所以用MessageListenerConcurrently这种消费者是无法做到顺序消费的,采用下面这种方式就做到了顺序消费

/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */
public class ConsumerInOrder {
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("10.11.11.11:9876;10.11.11.12:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
 
        consumer.registerMessageListener(new MessageListenerOrderly() {
 
            Random random = new Random();
 
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
                for (MessageExt msg: msgs) {
                    System.out.println(msg + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
 
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}
image

MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。

image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342