[TOC]
Producer源代码分析
Producer设计分析
客户端通过调用producer进行消息发送,这是消息的起源,所以我们最先分析Producer源代码。但如果一上来就切入代码,难免晦涩难以理解,在这里我先以生活中的例子作为开始,帮助理解Producer的设计理念。
我们举一个很常见的例子:发快递。为什么举这个例子呢,因为Kafka采用NIO通讯,如果大家学习过NIO,会知道经常用发快递来举例讲解NIO。另外发快递的场景确实也很像Kafka producer发送消息的场景。
快递公司一般是这样运作,由快递员上门取件,统一送回站点,站点进行分拣,同一个地区的包裹会装进一辆车,装满后发送出去。
在这个过程中,有如下几个角色
- 收件员,负责把快递运送回站点
- 分拣员,负责把快递按地区分类,比如唐山和秦皇岛,虽然具体的地址不一样但是都属于一个大区。
- 运输车,负责把快递运输出去,送到指定地区。
我们思考一下快递公司这样设计的好处
角色分工,收件员只负责收取快递,黏贴快递单,协助打包,然后送回站点。分拣员负责分类。运输车负责运输。分工明确,相互之间不需要知道对方在做什么。多人并行工作。
快递累积,分拣员把快递按地区分类,够一车后,发送出去。而不是收到一个件就发送出去。这是快递公司的通常做法,显然效率是更高的。然而实效性要求更高的闪送,则是收到一个件,马上发送出去,这样做是延迟最小的方式。
同区归并,真正发送的时候,唐山的快递和秦皇岛的快递会在一辆车发送出去,先统一发往河北总站。到达总站后再分车运往具体的城市。这样减少了发车的频次。如果唐山一车,秦皇岛一车,如果装不满一车,就会造成资源浪费,并且发车次数变多。
了解了快递公司的工作方式后,我们在宏观上看一下KafkaProducer的设计。
KafkaProducer的设计理念如出一辙,首先主线程把要发送的消息按照主题分区进行累积,达到一定数量后,触发发送线程进行发送。为了提高发送的效率,把发往同一个服务器的消息进行归并,一次性发往相应的服务器。
Producer设计中也有相应的角色:
- 收件员-KafkaProducer。实际除收件员外,它还承担了更多的工作。我们发送消息第一步就是调用KafkaProducer.send()方法
- 分拣员-RecordAccumulator。负责把消息按照分区分类,放入相应队列的ProducerBatch中
- 运输车-Sender。负责运输,把消息真正发送出去。其实它内部还很复杂,通过NIO实现网络传输。
此外还有些相关类如下:
- ProducerBatch,每个ProducerBatch是一个信件箱,而同一个patition的信件箱码放在一起,程序中这就是Deque<ProducerBatch>
- ClientRequest,可以理解为运输车的货箱,在运输前,我们会把发往同一个服务器的消息放入ClientRequest,那么只需要发送一次ClientRequest,就可以把不同主题不同分区,但发往同一台服务器上的消息,一次性发送过去。
通过以上的讲解,Producer涉及到的主要类都已经进行了简单讲解,各自负责的事情也很清晰。下面我给出一张图,通过此图来讲解producer工作的主流程:
图中可以看到,有两个线程同时在工作,一个线程负责把消息送往消息站进行分组,另外一个线程负责把消息真正发送出去。
客户端发送消息时调用KafkaProducer的send方法。内部逻辑如下:
- 首先对消息进行加工,如序列化,选择分区等。
- 然后通过RecordAccumulator把加工好的消息放入相应的ProducerBatch中。
- 当batch满时,触发sender线程工作
- sender线程首先把batche从原列表中取出来,按照发往broker进行分组,然后封装到ClientRequest中
- 最后通过NIO的方式把ClientRequest发往相应的broker
至此,我们应该已经理解了Kafka Producer的设计思路。可见所有软件设计都来源于生活,都是对生活中的相应场景进行抽象和面向对象的设计。软件是无形的,但是实际生活中我们所采用的工作方式是有型的。通过参照实际生活场景和解决方案做软件的设计,让你的设计贴近实际场景,这样的代码写出来易于理解和扩展。
源码分析
客户端发送消息时调用KafkaProducer的send方法,所以我们先分析KafkaProducer,再层层深入。
KafkaProducer相当于整个快递公司的总控员,操作收件员收件,命令分拣员进行分拣,最终通知货车可以发车了。货车发货则在另外一个线程中进行。
消息发送的顶层逻辑都在KafkaProducer中。在看代码前,我先宏观介绍下KafkaProducer中send方法的主流程,帮助代码理解,括号中以发快递类比:
- 拦截器处理(对快递进行发送前的预处理)
- 判断producer是否可用。依据是负责io的线程是否工作。(车队罢工了还如何发送快递?)
- 判断metadata是否可用。metadata相当于调度员的指挥图,存储了kafka集群的各种信息,包括topic、分区等等。(如果没有快递网点的信息,如何进行调度派发?)
- 对key和value进行序列化。(对快递进行包装)
- 获取要发往的分区编号(快递目的地部分地址)
- 计算序列化后大小(快件称重)
- 通过RecordAccumulator把消息加入batch中(分拣员进行分拣)
- 如果batch满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。(已经有封箱的货物了,发货吧!)
send方法的整体逻辑如上,每一步其实都和我们真实场景相对应,这就是设计的巧妙之处。
接下来我们进入代码部分,我们先看下KafkaProducer中的部分重要属性
private final Partitioner partitioner;
说明:分区选择器,根据一定策略,选择出消息要发往的分区
private final int maxRequestSize;
说明:消息最大程度,包括了消息头、序列化后的key和value
private final long totalMemorySize;
说明:单个消息发送的缓冲区大小
private final Metadata metadata;
说明:kafka集群元数据
private final RecordAccumulator accumulator;
说明:前面所说的分拣员,维护不同分区的batch,负责分拣消息。等待sender来发送
private final Sender sender;
说明:发送消息,实现Runable,ioThread中启动运行
private final Thread ioThread;
说明:运行sender的线程,负责发送消息到kafka集群
private final CompressionType compressionType;
说明:压缩算法类型,gzip等。消息数量越多,压缩效果越好
private final Serializer<K> keySerializer;
说明:key的序列化器
private final Serializer<V> valueSerializer;
说明:value的序列化器
private final ProducerConfig producerConfig;
说明:生产者的配置
private final long maxBlockTimeMs;
说明:等待更新kafka集群元数据的最大时长
private final ProducerInterceptors<K, V> interceptors;
说明:发送前消息钱,要先通过一组拦截器的处理。也可以先于用户的callback预处理
KafkaProducer构造器会初始化上面的属性。另外在构造函数最后可以看到启动了ioThread。
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
接下来我们看一下send方法代码:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
ProducerInterceptors对象中维护了List<ProducerInterceptor<K, V>> interceptors。在onSend方法中,会循环这个列表,调用每个ProducerInterceptor的onSend方法做预处理。
拦截器处理后,再调用doSend方法。发送的主要逻辑都在doSend方法中,我按照上面介绍的发送主流程结合代码来讲解。这里不粘贴doSend方法的整段代码,大家自行参考源代码。
1、判断producer是否可用。
可以看到一进来就调用了throwIfProducerClosed()方法,这个方法里逻辑如下:
private void throwIfProducerClosed() {
if (ioThread == null || !ioThread.isAlive())
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}
很简单,就是在检查io线程状态。
2、判断这个topic的metadata是否可用。
代码如下:
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
主要逻辑在waitOnMetadata方法中。这里不展开讲,方法中做的事情是从缓存获取元数据中topic的信息,判断是否可用,如果缓存中存在分区并且请求分区在范围内,则直接返回cluster信息,否则发送更新请求。再判断分区是否正常并且请求更新的分区在此topic分区的范围内。这个方法的返回值是Cluster原数据以及所花费的时长。
3、对key和value进行序列化。
主要是如下两行代码
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
4、获取消息要发往的分区编号
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
TopicPartition对象实际上只是封装了topic和partition,那么消息的发送地址就齐全了。
4、计算序列化后大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
ensureValidRecordSize方法中验证size是否未超过maxRequestSize及totalMemorySize。
5、通过RecordAccumulator把消息加入batch中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
accumulator.append方法中做分拣逻辑处理,后面会重点讲解RecordAccumulator。这里我们只需要知道通过这个方法处理,你的消息已经缓存到待发送Batch中。
6、如果batch正好满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
至此Producer中send方法的主要代码逻辑已经分析完毕。下一小节我们看一下分拣员RecordAccumulator是如何实现的。
附KafkaProducer类部分注释翻译:
producer 线程安全,跨线程共享一个producer对象,通常会比多个对象更快。
producer由一个buffer空间池组成,它容纳了没有被传输到server的数据。同时有一个后台 I/O线程负责把这些数据记录转化为reqeust,然后把他们发送给集群。如果producer用后未成功关闭,这些资源将被泄漏。
send()方法是异步的。当调用他添加记录到等待发送的数据缓冲区会立即得到返回。这允许producer把独立的消息打包起来,这样会更为高效。
ack配置项控制认为请求完成的条件。设置为“all”,数据全部提交前,是不会返回结果的,这是最慢,但是持久性最好的。
producer维护了每个partition对于未发送消息的缓冲区。缓冲区的大小通过batch.size配置项指定。设置的大一点,可以让batch更大,但是也需要更多的内存。
RecordAccumulator
我们知道RecordAccumulator是缓存待发送消息的地方,KafkaProducer把消息放进来,当消息满了的时候,通知sender来把消息发出去,释放空间。RecordAccumulator就相当于货运站的仓储,货物不断的往里放,每装满一箱就会通知发货者来取货运走。如下图所示:
从上图可以看到,至少有一个业务主线程和一个sender线程同时操作RecordAccumulator,所以他必须是线程安全的。
下面我们来详细分析RecordAccumulator。
RecordAccumulator设计
我们直接一头扎入程序的设计和代码,会有一定的理解难度。我还是先以真实世界的某个事物做类比来入手。
前文说RecordAccumulator是一个累积消息的仓库,那么我们就拿快递仓库来类比,看看RecordAccumulator是个怎样的仓库,看下图:
上图是一个快递站的仓库,堆满了货物。分拣员在这里工作。我们可以看到发往不同目的地的大货箱放置在各自对应的区域,分拣员把不同目的地的包裹放入对应目的地的大货箱,每装满一箱就放置在对应的堆放区域。
分拣员工作流程如下:
- 分拣员收到一个包裹,先查看目的地是哪里。假设是北京朝阳,他需要找到目的地为北京朝阳的大货箱装进去。
- 当这个大箱子装满后,分拣员会把它封箱,然后搬运到挂有北京朝阳牌子的区域,堆放起来。
- 当分拣员再拿到北京朝阳的包裹时,由于没有可用的北京朝阳大货箱,他需要再拿来一个北京朝阳的大货箱来放置包裹。
以上就是分拣员所做的工作,分拣员是谁呢?分拣员就是RecordAccumulator!而那些大货箱以及各自所属的堆放区域,就是RecordAccumulator中缓存消息的地方。所有封箱的大货箱都会等待sender来取货发送出去。
如果你看懂了上面这张图,那么你已经充分理解了RecordAccumulator的设计,后面已经不用继续看了!开个玩笑~不过上图确实完全反映了RecordAccumulator设计和运行的基本逻辑。
我们总结下仓库里有什么:
- 分拣员
- 货物
- 目的地
- 货箱
- 货箱堆放区域
记住这些概念,这些仓库里的东西最终都会体现在代码里。
下面我们来真正讲解RecordAccumulator的设计。
RecordAccumulator实现了接收消息,然后以主题分区为单元,把消息以ProducerBatch为单位累积缓存。多个ProducerBatch保存在Deque队列中。当Deque中最新的batch已不能容纳消息时,就会创建新的batch来继续缓存,并将其加入Deque。
RecordAccumulator缓存消息的存储结构如下:
RecordAccumulator内部存储消息使用的容器是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,通过上图可以看到消息以主题分区划分存储单元。消息实际是放在ProducerBatch中。ProducerBatch相当于一个个箱子,箱子上写着收件地址:xx主题xx分区。当一个ProducerBatch箱子装满时,就会封箱贴上封条,然后在对应的队列里生成一个新的ProducerBatch,来放置本主题分区新的消息。
由此可见,RecordAccumulator累积消息的过程,就是把消息装进不同收件地址箱子(ProducerBatch),装满封箱,堆放起来(加入Deque<ProducerBatch>),然后继续产生新箱子装消息的过程。
每次封箱操作后都会返回可以发货的结果给调用者,调用者KafkaProducer再唤醒sender把已经封箱的ProducerBatch发送出去。
图中可以看到,消息真实存储的地方是DataOutputStream。ProducerBatch内部有一个MemoryRecordsBuilder对象,图中未画,而DataOutputStream在MemoryRecordsBuilder中。三者关系:ProducerBatch-->MemoryRecordsBuilder-->DataOutputStream。
接下来对RecordAccumulator的代码分析,主要围绕以下三个类:
1、RecordAccumulator
消息累积器的顶层逻辑,维护存放消息的容器
2、ProducerBatch
封装MemoryRecordsBuilder,并且有很多控制用的信息及统计信息
3、MemoryRecordsBuilder
消息真正累积存放的地方
RecordAccumulator代码分析
append()方法是RecordAccumulator暴露的累积消息入口,KafkaProducer通过此接口累积消息。我们也先从此方法开始层层递进,分析累积消息的逻辑。
append()方法
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
代码的主要逻辑如下:
1、appendsInProgress.incrementAndGet();
首先通过原子操作,把追加消息的线程计数器+1
2、 Deque<ProducerBatch> dq = getOrCreateDeque(tp);
获取主题分区对应的ProducerBatch队列。getOrCreateDeque方法中判断如果没有队列,则新建队列。
3、RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
同步代码块中尝试去做一次追加操作tryAppend(),如果成功就直接返回追加的结果对象。tryAppend方法中逻辑是:
a)如果dq中有ProducerBatch则往新一个batch中追加
a.1)追加不成功,说明最新的batch空间不足,返回null。需要外层逻辑创建新的batch
a.2)追加成功,返回RecordAppendResult
b)dq中无producerBatch,返回null,代表没有能追加成功。
第一个到达此方法的线程肯定是返回了null,因为还没有消息累积进来,也不存在ProducerBatch对象。
如果tryAppend返回null,说明没能直接在现有的batch上追加成功(也可能还没有batch),此时需要初始化新的ProducerBatch
4、预估size大小,从BufferPool申请ByteBuffer。
如果BufferPool空间不足就轮询等待。大家可以自己看一下BufferPool的代码,这里就不展开讲了,他的目的是控制总的内存消耗以及实现ByteBuffer复用。
5、再次tryAppend
由于第4步代码不在同步代码块中,所以接下来的同步代码块中,首先需要再次调用tryAppend,因为可能别的线程已经创建了本主题分区新的ProducerBatch,那么消息直接追加成功。
6、创建ProducerBatch
如果上一步返回null,说明还是没有可用的ProducerBatch,我们需要创建新的ProducerBatch。首先构建MemoryRecordsBuilder对象,真正做消息累加的就是这个对象,每个ProducerBatch都有一个MemoryRecordsBuilder的引用。
7、真正去追加消息
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
首先调用ProducerBatch的tryAppend()方法,这是真正做消息追加的地方,内部通过MemoryRecordsBuilder实现。后续再详细分析。
然后把新的ProducerBatch放入队列中
8、把batch放入incomplete集合
incomplete本质是个存放未完成发送batch的Set
9、释放BufferPool空间
10、累积消息完成后的处理
finally代码块中再最终确保释放BufferPool空间,然后通过原子操作把追加消息的线程计数器-1
通过以上分析,我们捋清楚了RecordAccumulator追加消息的主逻辑,其中有两个步骤还需要展开代码说一下,我们先说RecordAccumulator中的tryAppend方法,也就是上面步骤中的第3步。
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
逻辑上文已经讲过,这里再对照代码多说两句:
方法如果返回null,说明没有可以成功追加此消息的ProducerBatch,有两种情况:
- deque是空的,可能是第一次被进入,也可能是batch都被发送完了。
- deque存在batch,但是所剩空间已经不足以容纳此消息。
如果能取得队列中最新的batch,并且能够成功追加消息,那么就会返回RecordAppendResult。
append方法返回对象RecordAppendResult,代码如下:
public final static class RecordAppendResult {
public final FutureRecordMetadata future;
public final boolean batchIsFull;
public final boolean newBatchCreated;
public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
this.future = future;
this.batchIsFull = batchIsFull;
this.newBatchCreated = newBatchCreated;
}
}
我们可以看到里面有异步发送的Future对象,此外还有两个标识,batchIsFull代表batch是否满了,newBatchCreated代表是否本次append增加了新的batch。
大家是否还记得KafkaProducer调用accumulator的apend方法后的逻辑是什么吗?不记得也没有关系,代码如下:
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
KafkaProducer通过这两个标识位来决定是否唤醒sender。翻译过来就是,“已经有封箱的消息了!sender快点把消息发走吧!不要再睡了”
讲到这里,其实整个消息追加的流程已经讲通了。不过消息追加的具体实现我们还没有讲解,那么接下来我们讲解发生消息追加的真正地方:ProducerBatch和MemoryRecordsBuilder。
ProducerBatch类分析
前文说过ProducerBatch可以理解为存放消息的大货箱。此类中的主要方法是tryAppend,也就是把消息放入箱子的操作,它是消息追加的顶层逻辑,代码如下:
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
主要做了三件事
- 检查是否有足够空间容纳消息,这是通过调用MemoryRecordsBuilder的hasRoomFor()方法。
- 追加消息,通过调用MemoryRecordsBuilder的append()方法
- 保存存放了callback和对应FutureRecordMetadata对象的thunk到List<Thunk> thunks中。
另外还有一个重要的方法就是closeForRecordAppends(),当batch无空间容纳新的消息的时候会调用此方法封箱,这里不展开来讲。
MemoryRecordsBuilder类分析
讲到这里,终于讲到消息追加真正落地的地方了。每个ProducerBatch都维护了一个MemoryRecordsBuilder。ProducerBatch追加消息,实际是调用MemoryRecordsBuilder完成的。
消息最终通过MemoryRecordsBuilder的append方法,追加到MemoryRecordsBuilder的DataOutputStream中。
上一节我们可以看到它有两个主要的方法hasRoomFor()和append()。
1、hasRoomFor()
这个方法比较简单,通过计算消息的预估大小,以及剩余空间,返回true或者false。代码就不贴了,感兴趣的话自行查看。
2、append()
这个方法我们需要仔细分析一下,消息的追加最终发生在这里。我先简述下逻辑:
- 把字节数组形式的key和value转为HeapByteBuffer
- 计算写入的offset,如果不是第一次写入,那么lastOffset+1,否则是baseOffset
- 如果magic号大于1,那么调用appendDefaultRecord(),否则调用appendLegacyRecord()
我们继续看一下appendDefaultRecord()方法,在此方法中最终调用了DefaultRecord.writeTo()来写入appendStream
最后做检查,更新一些统计状态,如消息的数量、lastOffset等。
在DefaultRecord.writeTo()方法中,通过调用Utils.writeTo(DataOutput out, ByteBuffer buffer, int length),往appendStream写入key,value,header。
Utils.writeTo()代码如下:
if (buffer.hasArray()) {
out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
} else {
int pos = buffer.position();
for (int i = pos; i < length + pos; i++)
out.writeByte(buffer.get(i));
}
我们总结一下MemoryRecordsBuilder:它内部维护了一个buffer,并记录能写入的大小,以及写入的位置在哪里,而每条消息都被追加到DataOutput对象appendStream中。appendStream是消息在RecordAccumulator中最终的去处。
小结
1、RecordAccumulator使用ProducerBatch缓存消息。每个主题分区拥有一个ProducerBatch的队列。
2、当ProducerBatch队列的队尾batch不能再容纳新消息时,对其进行封箱操作,同时新建ProducerBatch放入队尾来存放新消息。
3、ProducerBatch对消息追加的操作都是通过MemoryRecordsBuilder进行的。消息最终被追加到MemoryRecordsBuilder中的DataOutputStream appendStream中