前言
从本章开始,将会以系列的方式,分章节讲解Rocket Mq是如何实现消息的消费模型。
消息的广播或单播
不管是JMS规范中的Topic/Queue,还是Kafka的Topic/Paratiton/ConsumerGroup,或者是RabbitMq里的Exchange等,从本质上来说,无非是广播与单播的区别。广播,指的是单点对多点;而单播,则是点对点。当然,对于互联网中的大部分应用来说,组间广播,组内单播是最常见的事情。例如,在kafka中,消息会广播给所有的不同ConsumerGroup,而在相同的ConsumerGroup的消费者中,只能由其中一个消费者消费某条消息队列。RocketMq 使用同样的方式来实现组间广播,组内单播。
对于RocketMq 来说,它使用了 消息模式来 划分 广播与单播的概念,BROADCASTING
对应广播,CLUSTERING
对应单播。当Cosnumer客户端指定消息模式为BROADCASTING
时,该Consumer可以消费所订阅topic对应的所有队列。当Cosnumer客户端指定消息模式为CLUSTERING
时,只能消费所订阅topic的指定队列,而CLUSTERING
模式中,又有并发消费和顺序消费之分。我们通过【图1-1】来详细讲解上述几种模式:
在【图1-1】中,有一个名为[topic_x]的topic
,并且在broker1与broker2均创建了4条队列,换言之,对于所有的客户端来说,[topic_x] 的视图共有8条队列。因此,对于同一个nameser集群中,可以通过topicName、brokerName以及queueId 可以确定集群中一条消费队里。
有一个ConsumerGroup 为 [ConsumerGroup1]的消费者客户端[consumer_client3],其消息模式 为BROADCASTING
,并且订阅了topic_x,因此,消费者客户端[consumer_client3]可以消费topic_x 在[broker1]中的4条队列【queue0,queue1,queue2,queue4】以及[broker2]中的4条队列【queue0,queue1,queue2,queue4】。
而组内单播是 是针对 设置了相同的[ConsumerGroup] 以及 消息模式为CLUSTERING
的 消费者客户端来说的。
如 [ConsumerGroup1] 中的消费者客户端[consumer_client1] 以及 消费者客户端[consumer_client2] ,他们均为CLUSTERING
模式,因此根据队列的分配算法,[consumer_client1] 消费[broker1]中的4条队,[consumer_client2] 消费[broker2]中的4条队。当然对于消费组[ConsumerGroup2]里的消费客户端[consumer_client1]一样可以消费[broker1]中的4条队列,从而到达了组内单播,组外广播的目的。
对于CLUSTERING
模式的消费者来说,又分顺序消费与并发消费。以[ConsumerGroup1]中的[consumer_client1]为例,图中,它消费了队列[queu-0]。当[consumer_client1]为顺序消费时,它的消费模式为,通过一条线程逐条消息消费,这样一来,就可以保证了这条队列里,消息被消费的顺序性了,换言之,当业务系统希望依赖rmq的顺序消费时,即可使用该特性。当[consumer_client1]为并发消费时,它的消费模式为,例如,这次消费端一次拉取了三条消息,那么,消费端会开启三条线程,并发处理这三条消息。
消息的拉取方式
消息的获取本质就两种,一种是推送push,另一种是拉取pull。两种方式各有什么优劣势、业界是如何选择的、我们来详细分析一下。
push方式,就是由消息的存储方broker主动把消息推给消费者,传统的实现JMS标准的mq,例如rabbit mq,就是使用这种方式。这种方式好处就是消息的实时性很强,正常情况下,只要broker一接受到生产者发送的新消息,即可立即把消息推给消费者;并且,实现方式相对来说会简单很多。但push方式有一个比较严重的问题,就是慢消费。怎么说?就是在消息的生产速度远大于消费速度,并且这些消息都是无法丢失时,势必会造成消息在broker里堆积。更严重的是,broker会推送一大堆consumer无法处理的消息,而consuemr不是拒绝reject就是出现错误。满消费的情况多了,就会严重影响broker的整性能及吞吐量。
对于pull方式,即消费者主动向broker拉取消息,换言之,consumer可以实现按需消费,更不用考虑应为自身消费慢而受到处理不了的消息影响。相对的,broker处理消息堆积的方式也会简单很多,broker只需维护所有的消息队列以及对应的消息偏移量即可,无需记录每条消息的发送情况。所以,对于慢消费、消息生产速度不均匀以及需要强大的消息堆积能力等情况下,使用pull模式就很合适。
pull模式也有一个很大的短板,就是消费方很难准确地决定拉取消息的时机。如果消费者在一次pull过程中取到了新的消息,那么,消费者可以继续发起pull请求。但没有拉取到消息时,则需要重新等待一段时间后,才会继续拉取,因为我们不可能不断向broker发起拉取消息请求,如果这样做,一来会导致许多无用功,二来很可能会导致broker的性能下降。换言之,我们只能采取等待一段时间的方案。
但等多久就比较难以判断了。业界比较成熟的方案是,等待从比较短的时间开始,然后指数级增长等待的时间。比如第一次等待5s,然后10s,在然后是20s...直到有新的消息到来,在重设等待时间为5s。但是不管怎么样,还是会存在长时间等待或者做无用功的情况。例如,在等待10s后,消息立马过来,可消息下次拉取的时间是20s,换言之,消息会存在20s的延时。又或者消息确实一个小时后在过来,那么在一个消息内,一样会存在多次无用功。是不是比较沮丧。
我们来看看RocketMq是如何解决这个问题的。rmq独辟蹊径,使用了一种长轮询的做法,来平衡推拉模型各自的缺点。何为为长轮询,正常情况下,发起拉取消息请求,只要broker有消息返回,那么,消费者还会不等待,继续发起拉取消息请求。直到拉取消息失败,即无最新消息。这时,对于消费者来说,不是直接return掉,而是通过broker把拉取的请求await在那里,直到broker接受到新的消息后,在把请求notify。这也是一种很不错的思路。但海量的阻塞拉取请求对于broker来说,开销还是不小的,rmq通过合理的时间评估,给await加上一个合适的时间。
基于长轮询的做法,Rocket Mq封装了Push模式和Pull模式的客户端,但核心还是pull模式。
我们将在接下来的章节中,从源码上分析Rocket Mq 是如何实现上述所提到的全部细节