Broker将消息存储抽象成MessageStore
接口,默认实现类是DefaultMessageStore
。主要提供如下方法:
- 保存消息,包括单条和批量保存
- 根据topic、queue和offset批量获取消息,consumer使用该方法来拉取消息
- 根据消息offset读取消息详情,根据messageId查询消息时使用该方法
- 根据messageKey查询消息,可提供给终端用户使用
下面我们根据一个MessageStore
的数据结构图来看下消息是如何存储的
数据结构图
【注】以上图片转载自博客RocketMQ消息存储流程图及数据结构
数据结构
通过上面的图可以看到消息存储涉及到一下几个数据结构:
CommitLog,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量。举个例子,当前commitLog文件的大小是12413435字节,那下一条消息到来后它的offset就是12413436。这个说法不是非常准确,但是offset大概是这么计算来的。commitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
ConsumeQueue,既然所有消息都是存储在一个commitLog中,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue文件。ConsumeQueue文件中存储的是消息在commitLog中的offset,可以理解成一个按queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset。
IndexFile,CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile中查询offset,然后根据offset去commitLog中查询消息详情。
线程服务
MessageStore
除了上面的数据结构以外,还需要相应的服务来对数据做操作。
IndexService,负责读写IndexFile的服务
ReputMessageService,消息存储到commitLog后,MessageStore
的接口调用就直接返回了,后续由ReputMessageService
负责将消息分发到ConsumeQueue
和IndexService
HAService,负责将master-slave之间的消息数据同步
以上就是MessageStore
的整体结构了,下面看下它的启动过程。
MessageStore启动
启动入口在DefaultMessageStore.start()
方法:
public void start() throws Exception {
//1、写lock 文件,尝试获取lock文件锁,保证磁盘上的文件只会被一个messageStore读写
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
//2、启动FlushConsumeQueueService,是一个单线程的服务,定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1sec
this.flushConsumeQueueService.start();
//3、启动CommitLog
this.commitLog.start();
//4、消息存储指标统计服务,RT,TPS...
this.storeStatsService.start();
//5、针对master,启动延时消息调度服务
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
//6、启动reputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
//7、启动haService,数据主从同步的服务
this.haService.start();
//8、对于新的broker,初始化文件存储的目录
this.createTempFile();
//9、启动定时任务
this.addScheduleTask();
this.shutdown = false;
}
以上就是整个MessageStore
服务启动的过程,其中有几项下面解释一下:
- 第2步,数据写入文件后,因为多级缓存的原因不会马上写到磁盘上,所以会有一个单独的线程定时调用flush,这里是flush consumeQueue文件的。
CommitLog
和IndexFile
的也有类似的逻辑,只是不是在这里启动的 - 第3步,启动CommitLog,
CommitLog.start()
代码如下:
public void start() {
//加载刷盘服务
this.flushCommitLogService.start();
//storePool flush
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
FlushCommitLogService
,跟第2步类似的,该服务负责将CommitLog的数据flush到磁盘,针对同步刷盘和异步刷盘,有两种实现方式
CommitLogService
,这个service只有在采用内存池缓存消息的时候才需要启动。在使用内存池的时候,这个服务会定时将内存池中的数据刷新到FileChannel中,这个我们后面讲CommitLog的文章中再详细讲。
- 第5步,在consumer的时候讲过,如果消息失败,broker会延时重发。对于延时重发消息(
topic=SCHEDULE_TOPIC_XXXX
),这个服务负责检查是否有消息到了发送时间,到了时间则从延时队列中取出后重新发送 - 第7步,如果是Master,
HAService
默认监听10912端口,接收slave的连接请求,然后将消息推送给slave;如果是Slave,则通过该服务连接Master接收数据 - 第9步,这里的定时任务主要有以下几个:
- 定时清理过期的commitLog、cosumeQueue和Index数据文件, 默认文件写满后会保存72小时
- 定时自检commitLog和consumerQueue文件,校验文件是否完整。主要用于监控,不会做修复文件的动作。
- 定时检查commitLog的Lock时长(因为在write或者flush时侯会lock),如果lock的时间过长,则打印jvm堆栈,用于监控。
以上就是整个启动的过程了,后续的文章开始讲解Broker是怎样接收Producer消息,还有怎样将消息交给Consumer的。