IndexFile作用
MessageStore
中存储的消息除了通过ConsumeQueue
提供给consumer消费之外,还支持通过MessageID或者MessageKey来查询消息。使用ID查询时,因为ID就是用broker+offset生成的,所以很容易就找到对应的commitLog
文件来读取消息。对于用MessageKey来查询消息,MessageStore
通过构建一个index来提高读取速度。
Index文件组织方式
在了解源码之前,最重要的是要知道Index文件的存储结构是怎么样的,下面的图引用自CSDN斩秋的文章RocketMQ原理解析
上图中下半部分是索引文件的结构,一个索引文件时有文件头,slotTable,和index List组成的。
Header中存储的信息有:文件中第一个和最后一个索引对应的消息的存储时间,第一个和最后一个索引对应消息的offset的最大和最小值,文件中的索引个数。
整个
slotTable+indexLinkedList
可以理解成java的HashMap
。每当放一个新的消息的index进来,首先取MessageKey的hashCode,然后用hashCode对slot总数取模,得到应该放到哪个slot中,slot总数系统默认500W个。只要是取hash就必然面临hash冲突的问题,跟HashMap
一样,IndexFile
也是使用一个链表结构来解决hash冲突。只是这里跟HashMap
稍微有点区别的地方是,slot中放的是最新index的指针。这个是因为一般查询的时候肯定是优先查最近的消息。每个slot中放的指针值是索引在indexFile中的偏移量,如上图,每个索引大小是20字节,所以根据当前索引是这个文件中的第几个(偏移量),就很容易定位到索引的位置。然后每个索引都保存了跟它同一个slot的前一个索引的位置,以此类推形成一个链表的结构。下面通过代码来看下新建一个索引的过程:
Index生成过程
上一篇讲ConsumeQueue
的时候,有一个ReputMessageService
在分发消息的时候还会调用CommitLogDispatcherBuildIndex
用来创建index。这个类实现就是直接调用的IndexService.buildIndex()
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
直接看IndexService的实现如下:
public void buildIndex(DispatchRequest req) {
//1、获取或者新建当前可写入的index file
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
//2、获取当前indexFile中记录的最大offset
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
//3、新来消息是之前的,不应该出现
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback消息不更新index
return;
}
//4、单条消息
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
//5、多个msg,循环逐个存入
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
- 第1步,跟
CommitLog
和ConsumeQueue
一样,IndexFile也是用MappedFile来存储数据,每个MappedFile大小也是固定的。所以这里第一步获取当前正在写入的文件,没有的话则新建,indexFile是用当前时间戳作为文件名的。 - 第2、3步,判断是否是新的消息,不是则跳过
- 第4、5步,这里分为单条消息和批量消息,其实最终的逻辑是一样的,都是调用putKey的方法
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
这里就是调用IndexFile的putKey的方法,包含了重试逻辑,因为有可能在写index的时候,上一个文件已经写满了,需要创建一个新的文件写入。
IndexFile数据写入
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//1、判断index是否已满,已满返回失败,由调用方来处理
if (this.indexHeader.getIndexCount() < this.indexNum) {
//2、计算key的非负hashCode,调用的java String的hashcode方法
int keyHash = indexKeyHashMethod(key);
//3、key应该放的slot
int slotPos = keyHash % this.hashSlotNum;
//4、slot的数据存储位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//获取slot的position
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//5、如果存在hash冲突,获取这个slot存的前一个index的计数,如果没有则值为0
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//6、计算当前msg的存储时间和第一条msg相差秒数
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//7、获取该条index实际存储position
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//8、生成一个index的unit内容
this.mappedByteBuffer.putInt(absIndexPos, keyHash);//key的hash,不会记录完整的key
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息在commitlog中偏移
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//时间差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//相同hashcode的前一条index的顺序号
//9、更新slot中的值为本条消息的顺序号
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//10、如果是第一条消息,更新header中的起始offset和起始time
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//11、更新header中的计数
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();//增加index计数
this.indexHeader.setEndPhyOffset(phyOffset);//最后一条消息的offset
this.indexHeader.setEndTimestamp(storeTimestamp);//最后一个index的时间
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
以上的逻辑中,第5步获取slot之前是否已经有value,如果有代表hash冲突了,则在第8步中会把value设置成当前index的前一个index,同时将slot中的value更新成当前消息的序号。这样整个索引的生成就结束了,我们看下使用MsgKey来查询消息的时候是怎么使用索引文件的。
通过IndexFile查询消息
MessageStore
中提供了根据MessageKey查询消息的接口,接口如下:
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
//1、从indexService中查询所有offset
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
Collections.sort(queryOffsetResult.getPhyOffsets());
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
try {
boolean match = true;
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
}
if (match) {
//2、根据offset,到CommitLog读取消息详情
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}
} catch (Exception e) {
log.error("queryMessage exception", e);
}
}
...
}
return queryMessageResult;
}
以上的接口输入参数如下:
topic:只能按topic维度来查询消息,因为索引生成的时候key是用的topic+MessageKey
key: messageKey
maxNum : 最多返回的消息数,因为key是由用户设置的,并不保证唯一,所以可能取到多个消息;同时index中只存储了hash,所以hash相同的消息也会取出来
begin,end:起始和结束时间,只会查询指定时间段的消息
这个接口的具体实现就是,先从IndexService中读取到offset,然后到CommitLog中读取消息详情。下面主要看下indexService中的实现:
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum);
long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
//1、从最新的index文件开始向前找
for (int i = this.indexFileList.size(); i > 0; i--) {
IndexFile f = this.indexFileList.get(i - 1);
boolean lastFile = i == this.indexFileList.size();
if (lastFile) {
indexLastUpdateTimestamp = f.getEndTimestamp();
indexLastUpdatePhyoffset = f.getEndPhyOffset();
}
//2、index文件的时间包含了begin和end的全部或者部分
if (f.isTimeMatched(begin, end)) {
//3、从文件中读取index中的offset
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
}
if (f.getBeginTimestamp() < begin) {
break;
}
if (phyOffsets.size() >= maxNum) {
break;
}
}
}
} catch (Exception e) {
log.error("queryMsg exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
- 第1步,循环扫描indexFile,先扫描最近的
- 第2步,从header中读出最小和最大的timestamp,然后看是否符合请求的时间范围
- 第3步,从符合条件的index文件中获取offset。下面看下indexFile的读取步骤
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {//根据key来获取消息offset
if (this.mappedFile.hold()) {
//1、跟生成索引时一样,找到key的slot
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
}
//2、获取该槽位上的最后一条索引的序号
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {//到达最大返回条数
break;
}
//3、找到index的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
//4、在commitlog中偏移
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
//5、相同hashcode的前一条消息的序号
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
//6、Hash和time都符合条件,加入返回列表
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
//7、前一条不等于0,继续读入前一条
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
- 第1步,首先根据Key获取索引对应的slot,这里逻辑跟存入索引时时一样的
- 第2步,slot中value存储的是最后一个index的序号
- 第3-6步,将符合条件的offset加入返回列表
- 第7步,如果存在相同hash的前一条index,并且返回列表没到最大值,则继续向前搜索
总结
从通过index查询消息的逻辑可以看出,相同的hashCode的message都会返回客户端,如果调用这个接口通过key来查询消息,需要在客户端再做一次过滤。为了提高查询效率,在发送消息时应该在保证便于查询的同时,尽量在一段时间内让消息有不同key。