Kafka Producer设计和源码分析

[TOC]

Producer源代码分析

Producer设计分析

客户端通过调用producer进行消息发送,这是消息的起源,所以我们最先分析Producer源代码。但如果一上来就切入代码,难免晦涩难以理解,在这里我先以生活中的例子作为开始,帮助理解Producer的设计理念。

我们举一个很常见的例子:发快递。为什么举这个例子呢,因为Kafka采用NIO通讯,如果大家学习过NIO,会知道经常用发快递来举例讲解NIO。另外发快递的场景确实也很像Kafka producer发送消息的场景。

快递公司一般是这样运作,由快递员上门取件,统一送回站点,站点进行分拣,同一个地区的包裹会装进一辆车,装满后发送出去。

在这个过程中,有如下几个角色

  1. 收件员,负责把快递运送回站点
  2. 分拣员,负责把快递按地区分类,比如唐山和秦皇岛,虽然具体的地址不一样但是都属于一个大区。
  3. 运输车,负责把快递运输出去,送到指定地区。

我们思考一下快递公司这样设计的好处

  1. 角色分工,收件员只负责收取快递,黏贴快递单,协助打包,然后送回站点。分拣员负责分类。运输车负责运输。分工明确,相互之间不需要知道对方在做什么。多人并行工作。

  2. 快递累积,分拣员把快递按地区分类,够一车后,发送出去。而不是收到一个件就发送出去。这是快递公司的通常做法,显然效率是更高的。然而实效性要求更高的闪送,则是收到一个件,马上发送出去,这样做是延迟最小的方式。

  3. 同区归并,真正发送的时候,唐山的快递和秦皇岛的快递会在一辆车发送出去,先统一发往河北总站。到达总站后再分车运往具体的城市。这样减少了发车的频次。如果唐山一车,秦皇岛一车,如果装不满一车,就会造成资源浪费,并且发车次数变多。

了解了快递公司的工作方式后,我们在宏观上看一下KafkaProducer的设计。

KafkaProducer的设计理念如出一辙,首先主线程把要发送的消息按照主题分区进行累积,达到一定数量后,触发发送线程进行发送。为了提高发送的效率,把发往同一个服务器的消息进行归并,一次性发往相应的服务器。

Producer设计中也有相应的角色:

  1. 收件员-KafkaProducer。实际除收件员外,它还承担了更多的工作。我们发送消息第一步就是调用KafkaProducer.send()方法
  2. 分拣员-RecordAccumulator。负责把消息按照分区分类,放入相应队列的ProducerBatch中
  3. 运输车-Sender。负责运输,把消息真正发送出去。其实它内部还很复杂,通过NIO实现网络传输。

此外还有些相关类如下:

  1. ProducerBatch,每个ProducerBatch是一个信件箱,而同一个patition的信件箱码放在一起,程序中这就是Deque<ProducerBatch>
  2. ClientRequest,可以理解为运输车的货箱,在运输前,我们会把发往同一个服务器的消息放入ClientRequest,那么只需要发送一次ClientRequest,就可以把不同主题不同分区,但发往同一台服务器上的消息,一次性发送过去。

通过以上的讲解,Producer涉及到的主要类都已经进行了简单讲解,各自负责的事情也很清晰。下面我给出一张图,通过此图来讲解producer工作的主流程:

image.png

图中可以看到,有两个线程同时在工作,一个线程负责把消息送往消息站进行分组,另外一个线程负责把消息真正发送出去。

客户端发送消息时调用KafkaProducer的send方法。内部逻辑如下:

  1. 首先对消息进行加工,如序列化,选择分区等。
  2. 然后通过RecordAccumulator把加工好的消息放入相应的ProducerBatch中。
  3. 当batch满时,触发sender线程工作
  4. sender线程首先把batche从原列表中取出来,按照发往broker进行分组,然后封装到ClientRequest中
  5. 最后通过NIO的方式把ClientRequest发往相应的broker

至此,我们应该已经理解了Kafka Producer的设计思路。可见所有软件设计都来源于生活,都是对生活中的相应场景进行抽象和面向对象的设计。软件是无形的,但是实际生活中我们所采用的工作方式是有型的。通过参照实际生活场景和解决方案做软件的设计,让你的设计贴近实际场景,这样的代码写出来易于理解和扩展。

源码分析

客户端发送消息时调用KafkaProducer的send方法,所以我们先分析KafkaProducer,再层层深入。

KafkaProducer相当于整个快递公司的总控员,操作收件员收件,命令分拣员进行分拣,最终通知货车可以发车了。货车发货则在另外一个线程中进行。

消息发送的顶层逻辑都在KafkaProducer中。在看代码前,我先宏观介绍下KafkaProducer中send方法的主流程,帮助代码理解,括号中以发快递类比:

  1. 拦截器处理(对快递进行发送前的预处理)
  2. 判断producer是否可用。依据是负责io的线程是否工作。(车队罢工了还如何发送快递?)
  3. 判断metadata是否可用。metadata相当于调度员的指挥图,存储了kafka集群的各种信息,包括topic、分区等等。(如果没有快递网点的信息,如何进行调度派发?)
  4. 对key和value进行序列化。(对快递进行包装)
  5. 获取要发往的分区编号(快递目的地部分地址)
  6. 计算序列化后大小(快件称重)
  7. 通过RecordAccumulator把消息加入batch中(分拣员进行分拣)
  8. 如果batch满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。(已经有封箱的货物了,发货吧!)

send方法的整体逻辑如上,每一步其实都和我们真实场景相对应,这就是设计的巧妙之处。

接下来我们进入代码部分,我们先看下KafkaProducer中的部分重要属性

private final Partitioner partitioner;
说明:分区选择器,根据一定策略,选择出消息要发往的分区
private final int maxRequestSize;
说明:消息最大程度,包括了消息头、序列化后的key和value
private final long totalMemorySize;
说明:单个消息发送的缓冲区大小
private final Metadata metadata;
说明:kafka集群元数据
private final RecordAccumulator accumulator;
说明:前面所说的分拣员,维护不同分区的batch,负责分拣消息。等待sender来发送
private final Sender sender;
说明:发送消息,实现Runable,ioThread中启动运行
private final Thread ioThread;
说明:运行sender的线程,负责发送消息到kafka集群
private final CompressionType compressionType;
说明:压缩算法类型,gzip等。消息数量越多,压缩效果越好
private final Serializer<K> keySerializer;
说明:key的序列化器
private final Serializer<V> valueSerializer;
说明:value的序列化器
private final ProducerConfig producerConfig;
说明:生产者的配置
private final long maxBlockTimeMs;
说明:等待更新kafka集群元数据的最大时长
private final ProducerInterceptors<K, V> interceptors;
说明:发送前消息钱,要先通过一组拦截器的处理。也可以先于用户的callback预处理

KafkaProducer构造器会初始化上面的属性。另外在构造函数最后可以看到启动了ioThread。

this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

接下来我们看一下send方法代码:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

ProducerInterceptors对象中维护了List<ProducerInterceptor<K, V>> interceptors。在onSend方法中,会循环这个列表,调用每个ProducerInterceptor的onSend方法做预处理。

拦截器处理后,再调用doSend方法。发送的主要逻辑都在doSend方法中,我按照上面介绍的发送主流程结合代码来讲解。这里不粘贴doSend方法的整段代码,大家自行参考源代码。

1、判断producer是否可用。

可以看到一进来就调用了throwIfProducerClosed()方法,这个方法里逻辑如下:

private void throwIfProducerClosed() {
    if (ioThread == null || !ioThread.isAlive())
        throw new IllegalStateException("Cannot perform operation after producer has been closed");
}

很简单,就是在检查io线程状态。

2、判断这个topic的metadata是否可用。

代码如下:

ClusterAndWaitTime clusterAndWaitTime;
try {
    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
    if (metadata.isClosed())
        throw new KafkaException("Producer closed while send in progress", e);
    throw e;
}

主要逻辑在waitOnMetadata方法中。这里不展开讲,方法中做的事情是从缓存获取元数据中topic的信息,判断是否可用,如果缓存中存在分区并且请求分区在范围内,则直接返回cluster信息,否则发送更新请求。再判断分区是否正常并且请求更新的分区在此topic分区的范围内。这个方法的返回值是Cluster原数据以及所花费的时长。

3、对key和value进行序列化。

主要是如下两行代码

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

4、获取消息要发往的分区编号

int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

TopicPartition对象实际上只是封装了topic和partition,那么消息的发送地址就齐全了。

4、计算序列化后大小

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
        compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);

ensureValidRecordSize方法中验证size是否未超过maxRequestSize及totalMemorySize。

5、通过RecordAccumulator把消息加入batch中

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs);

accumulator.append方法中做分拣逻辑处理,后面会重点讲解RecordAccumulator。这里我们只需要知道通过这个方法处理,你的消息已经缓存到待发送Batch中。

6、如果batch正好满了,或者创建了新batch来容纳消息,则唤醒sender线程执行发送。

if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}

至此Producer中send方法的主要代码逻辑已经分析完毕。下一小节我们看一下分拣员RecordAccumulator是如何实现的。

附KafkaProducer类部分注释翻译:

producer 线程安全,跨线程共享一个producer对象,通常会比多个对象更快。

producer由一个buffer空间池组成,它容纳了没有被传输到server的数据。同时有一个后台 I/O线程负责把这些数据记录转化为reqeust,然后把他们发送给集群。如果producer用后未成功关闭,这些资源将被泄漏。

send()方法是异步的。当调用他添加记录到等待发送的数据缓冲区会立即得到返回。这允许producer把独立的消息打包起来,这样会更为高效。

ack配置项控制认为请求完成的条件。设置为“all”,数据全部提交前,是不会返回结果的,这是最慢,但是持久性最好的。

producer维护了每个partition对于未发送消息的缓冲区。缓冲区的大小通过batch.size配置项指定。设置的大一点,可以让batch更大,但是也需要更多的内存。

RecordAccumulator

我们知道RecordAccumulator是缓存待发送消息的地方,KafkaProducer把消息放进来,当消息满了的时候,通知sender来把消息发出去,释放空间。RecordAccumulator就相当于货运站的仓储,货物不断的往里放,每装满一箱就会通知发货者来取货运走。如下图所示:

image.png

从上图可以看到,至少有一个业务主线程和一个sender线程同时操作RecordAccumulator,所以他必须是线程安全的。

下面我们来详细分析RecordAccumulator。

RecordAccumulator设计

我们直接一头扎入程序的设计和代码,会有一定的理解难度。我还是先以真实世界的某个事物做类比来入手。

前文说RecordAccumulator是一个累积消息的仓库,那么我们就拿快递仓库来类比,看看RecordAccumulator是个怎样的仓库,看下图:


image.png

上图是一个快递站的仓库,堆满了货物。分拣员在这里工作。我们可以看到发往不同目的地的大货箱放置在各自对应的区域,分拣员把不同目的地的包裹放入对应目的地的大货箱,每装满一箱就放置在对应的堆放区域。

分拣员工作流程如下:

  1. 分拣员收到一个包裹,先查看目的地是哪里。假设是北京朝阳,他需要找到目的地为北京朝阳的大货箱装进去。
  2. 当这个大箱子装满后,分拣员会把它封箱,然后搬运到挂有北京朝阳牌子的区域,堆放起来。
  3. 当分拣员再拿到北京朝阳的包裹时,由于没有可用的北京朝阳大货箱,他需要再拿来一个北京朝阳的大货箱来放置包裹。

以上就是分拣员所做的工作,分拣员是谁呢?分拣员就是RecordAccumulator!而那些大货箱以及各自所属的堆放区域,就是RecordAccumulator中缓存消息的地方。所有封箱的大货箱都会等待sender来取货发送出去。

如果你看懂了上面这张图,那么你已经充分理解了RecordAccumulator的设计,后面已经不用继续看了!开个玩笑~不过上图确实完全反映了RecordAccumulator设计和运行的基本逻辑。

我们总结下仓库里有什么:

  1. 分拣员
  2. 货物
  3. 目的地
  4. 货箱
  5. 货箱堆放区域

记住这些概念,这些仓库里的东西最终都会体现在代码里。

下面我们来真正讲解RecordAccumulator的设计。

RecordAccumulator实现了接收消息,然后以主题分区为单元,把消息以ProducerBatch为单位累积缓存。多个ProducerBatch保存在Deque队列中。当Deque中最新的batch已不能容纳消息时,就会创建新的batch来继续缓存,并将其加入Deque。

RecordAccumulator缓存消息的存储结构如下:

image.png

RecordAccumulator内部存储消息使用的容器是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,通过上图可以看到消息以主题分区划分存储单元。消息实际是放在ProducerBatch中。ProducerBatch相当于一个个箱子,箱子上写着收件地址:xx主题xx分区。当一个ProducerBatch箱子装满时,就会封箱贴上封条,然后在对应的队列里生成一个新的ProducerBatch,来放置本主题分区新的消息。

由此可见,RecordAccumulator累积消息的过程,就是把消息装进不同收件地址箱子(ProducerBatch),装满封箱,堆放起来(加入Deque<ProducerBatch>),然后继续产生新箱子装消息的过程。

每次封箱操作后都会返回可以发货的结果给调用者,调用者KafkaProducer再唤醒sender把已经封箱的ProducerBatch发送出去。

图中可以看到,消息真实存储的地方是DataOutputStream。ProducerBatch内部有一个MemoryRecordsBuilder对象,图中未画,而DataOutputStream在MemoryRecordsBuilder中。三者关系:ProducerBatch-->MemoryRecordsBuilder-->DataOutputStream。

接下来对RecordAccumulator的代码分析,主要围绕以下三个类:

1、RecordAccumulator

消息累积器的顶层逻辑,维护存放消息的容器

2、ProducerBatch

封装MemoryRecordsBuilder,并且有很多控制用的信息及统计信息

3、MemoryRecordsBuilder
消息真正累积存放的地方

RecordAccumulator代码分析

append()方法是RecordAccumulator暴露的累积消息入口,KafkaProducer通过此接口累积消息。我们也先从此方法开始层层递进,分析累积消息的逻辑。

append()方法

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // check if we have an in-progress batch
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }
 
        // we don't have an in-progress record batch try to allocate a new batch
        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new KafkaException("Producer closed while send in progress");
 
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;
            }
 
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
 
            dq.addLast(batch);
            incomplete.add(batch);
 
            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    } finally {
        if (buffer != null)
            free.deallocate(buffer);
        appendsInProgress.decrementAndGet();
    }
}

代码的主要逻辑如下:

1、appendsInProgress.incrementAndGet();

首先通过原子操作,把追加消息的线程计数器+1

2、 Deque<ProducerBatch> dq = getOrCreateDeque(tp);

获取主题分区对应的ProducerBatch队列。getOrCreateDeque方法中判断如果没有队列,则新建队列。

3、RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);

同步代码块中尝试去做一次追加操作tryAppend(),如果成功就直接返回追加的结果对象。tryAppend方法中逻辑是:
a)如果dq中有ProducerBatch则往新一个batch中追加
a.1)追加不成功,说明最新的batch空间不足,返回null。需要外层逻辑创建新的batch
a.2)追加成功,返回RecordAppendResult

b)dq中无producerBatch,返回null,代表没有能追加成功。

第一个到达此方法的线程肯定是返回了null,因为还没有消息累积进来,也不存在ProducerBatch对象。

如果tryAppend返回null,说明没能直接在现有的batch上追加成功(也可能还没有batch),此时需要初始化新的ProducerBatch

4、预估size大小,从BufferPool申请ByteBuffer。

如果BufferPool空间不足就轮询等待。大家可以自己看一下BufferPool的代码,这里就不展开讲了,他的目的是控制总的内存消耗以及实现ByteBuffer复用。

5、再次tryAppend

由于第4步代码不在同步代码块中,所以接下来的同步代码块中,首先需要再次调用tryAppend,因为可能别的线程已经创建了本主题分区新的ProducerBatch,那么消息直接追加成功。

6、创建ProducerBatch

如果上一步返回null,说明还是没有可用的ProducerBatch,我们需要创建新的ProducerBatch。首先构建MemoryRecordsBuilder对象,真正做消息累加的就是这个对象,每个ProducerBatch都有一个MemoryRecordsBuilder的引用。

7、真正去追加消息

FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

dq.addLast(batch);

首先调用ProducerBatch的tryAppend()方法,这是真正做消息追加的地方,内部通过MemoryRecordsBuilder实现。后续再详细分析。
然后把新的ProducerBatch放入队列中

8、把batch放入incomplete集合

incomplete本质是个存放未完成发送batch的Set

9、释放BufferPool空间

10、累积消息完成后的处理

finally代码块中再最终确保释放BufferPool空间,然后通过原子操作把追加消息的线程计数器-1

通过以上分析,我们捋清楚了RecordAccumulator追加消息的主逻辑,其中有两个步骤还需要展开代码说一下,我们先说RecordAccumulator中的tryAppend方法,也就是上面步骤中的第3步。

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}

逻辑上文已经讲过,这里再对照代码多说两句:

方法如果返回null,说明没有可以成功追加此消息的ProducerBatch,有两种情况:

  1. deque是空的,可能是第一次被进入,也可能是batch都被发送完了。
  2. deque存在batch,但是所剩空间已经不足以容纳此消息。

如果能取得队列中最新的batch,并且能够成功追加消息,那么就会返回RecordAppendResult。

append方法返回对象RecordAppendResult,代码如下:

public final static class RecordAppendResult {
    public final FutureRecordMetadata future;
    public final boolean batchIsFull;
    public final boolean newBatchCreated;
 
    public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
        this.future = future;
        this.batchIsFull = batchIsFull;
        this.newBatchCreated = newBatchCreated;
    }
}

我们可以看到里面有异步发送的Future对象,此外还有两个标识,batchIsFull代表batch是否满了,newBatchCreated代表是否本次append增加了新的batch。

大家是否还记得KafkaProducer调用accumulator的apend方法后的逻辑是什么吗?不记得也没有关系,代码如下:

if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}

KafkaProducer通过这两个标识位来决定是否唤醒sender。翻译过来就是,“已经有封箱的消息了!sender快点把消息发走吧!不要再睡了”

讲到这里,其实整个消息追加的流程已经讲通了。不过消息追加的具体实现我们还没有讲解,那么接下来我们讲解发生消息追加的真正地方:ProducerBatch和MemoryRecordsBuilder。

ProducerBatch类分析

前文说过ProducerBatch可以理解为存放消息的大货箱。此类中的主要方法是tryAppend,也就是把消息放入箱子的操作,它是消息追加的顶层逻辑,代码如下:

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length,
                                                               Time.SYSTEM);
        // we have to keep every future returned to the users in case the batch needs to be
        // split to several new batches and resent.
        thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}

主要做了三件事

  1. 检查是否有足够空间容纳消息,这是通过调用MemoryRecordsBuilder的hasRoomFor()方法。
  2. 追加消息,通过调用MemoryRecordsBuilder的append()方法
  3. 保存存放了callback和对应FutureRecordMetadata对象的thunk到List<Thunk> thunks中。

另外还有一个重要的方法就是closeForRecordAppends(),当batch无空间容纳新的消息的时候会调用此方法封箱,这里不展开来讲。

MemoryRecordsBuilder类分析

讲到这里,终于讲到消息追加真正落地的地方了。每个ProducerBatch都维护了一个MemoryRecordsBuilder。ProducerBatch追加消息,实际是调用MemoryRecordsBuilder完成的。

消息最终通过MemoryRecordsBuilder的append方法,追加到MemoryRecordsBuilder的DataOutputStream中。

上一节我们可以看到它有两个主要的方法hasRoomFor()和append()。

1、hasRoomFor()

这个方法比较简单,通过计算消息的预估大小,以及剩余空间,返回true或者false。代码就不贴了,感兴趣的话自行查看。

2、append()

这个方法我们需要仔细分析一下,消息的追加最终发生在这里。我先简述下逻辑:

  • 把字节数组形式的key和value转为HeapByteBuffer
  • 计算写入的offset,如果不是第一次写入,那么lastOffset+1,否则是baseOffset
  • 如果magic号大于1,那么调用appendDefaultRecord(),否则调用appendLegacyRecord()

我们继续看一下appendDefaultRecord()方法,在此方法中最终调用了DefaultRecord.writeTo()来写入appendStream

最后做检查,更新一些统计状态,如消息的数量、lastOffset等。

在DefaultRecord.writeTo()方法中,通过调用Utils.writeTo(DataOutput out, ByteBuffer buffer, int length),往appendStream写入key,value,header。

Utils.writeTo()代码如下:

if (buffer.hasArray()) {
    out.write(buffer.array(), buffer.position() + buffer.arrayOffset(), length);
} else {
    int pos = buffer.position();
    for (int i = pos; i < length + pos; i++)
        out.writeByte(buffer.get(i));
}

我们总结一下MemoryRecordsBuilder:它内部维护了一个buffer,并记录能写入的大小,以及写入的位置在哪里,而每条消息都被追加到DataOutput对象appendStream中。appendStream是消息在RecordAccumulator中最终的去处。

小结

1、RecordAccumulator使用ProducerBatch缓存消息。每个主题分区拥有一个ProducerBatch的队列。

2、当ProducerBatch队列的队尾batch不能再容纳新消息时,对其进行封箱操作,同时新建ProducerBatch放入队尾来存放新消息。

3、ProducerBatch对消息追加的操作都是通过MemoryRecordsBuilder进行的。消息最终被追加到MemoryRecordsBuilder中的DataOutputStream appendStream中

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342