绍圣--kafka之生产者(一)

学习kafka有一段时间了。关于它里面的知识还是需要总结一下,一来是能让自己对kafka能有一个比较成型的理解,二来是对知识的一种回顾。多总结加深理解。

前言

kafka作为应用最为广泛的消息中间件,其内部各个组件是怎么来协调工作的?其内部的设计思想是怎么样的?这些都很值得我们去细细的分析和研究。此处一系列的分析以kafka-0.10.1.0版本为基础进行解读。

新版本kafka的C端是采用java实现的(S端是由Scala ),对于java开发人员很好上手。在使用kafka的时候,最开始接触的就是它的生产者和消费者客户端。这些客户端对外提供的接口非常简洁,使用起来简单方便。这得益于kafka在幕后为我们做了很多工作。

注:系列文章中的代码均为代码

生产者(KafkaProducer)

KafkaProducer producer = new KafkaProducer(Map);

ProducerRecordrecord = new ProducerRecord(Topic, Key, Value); 

Future fu = producer.send(record);

producer.flush();

RecordMetadata rm = fu.get(1, TimeUnit.MINUTES);

可以看出,生产者的API确实非常简单。以上的伪代码是同步发送消息,kafka的生产者还提供异步提交消息的方法:

KafkaProducer producer = new KafkaProducer(Map);

ProducerRecord record = new ProducerRecord(Topic, Key, Value);

producer.send(record, new Callback(){

 public void onCompletion(RecordMetadata metadata, Exception e) {

}

});

KafkaProducer对象代表一个客户端进程,KafkaProducer.send()提交到服务端的消息,并不会直接发送到服务端,而是KafkaProducer先把消息存放到内存队列中。再由一个消息发送线程从队列中拉取出消息,以批量的方式发送给服务端。因此在kafka中有一个专门收集消息的地方叫记录收集器(RecordAccumulator):负责缓存生产者的消息。发送线程(Sender):负责读取记录收集器(RecordAccumulator)的批量消息,通过网络发送给服务端。

ProducerRecord

需要提交到服务器的消息都会封装成ProducerRecord对象。ProducerRecord对象中定义了消息相关信息:

private final String topic; // 要发送的topic名称

private final Integer partition; // 要发送的分区ID

private final K key; // 消息的KEY值

private final V value; // 消息的value

private final Long timestamp;

生产者中的拦截器(interceptor)

在发送消息之前,kafka允许用户对消息进行操作,因此拦截器(interceptor)孕育而生。并且允许用户指定多个interceptor从而形成一个拦截器链路来对用户所要发送的消息进行操作。

在构造KafkaProducer对象的时候,可以指定自定义的拦截链:

// 构建拦截链

 List interceptors = new ArrayList();

interceptors.add("xx.xx....");

interceptors.add("xx.xx....");

HashMap config = new HashMap();

config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

KafkaProducer producer = new KafkaProducer(config);

在KafkaProducer构造函数中,会给KafkaProducer中的拦截器变量进行赋值。

private final ProducerInterceptors interceptors;

private KafkaProducer() {

List interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);

this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

}

在调用发送方法进行消息发送的时候,会首先调用在初始化中设置的拦截器,来对消息进行处理。

public Future send(ProducerRecord record, Callback callback) {

ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

return doSend(interceptedRecord, callback);

}

调用拦截器里面的方法,对消息进行处理:

public ProducerRecord onSend(ProducerRecord record) {

ProducerRecord interceptRecord = record;

for (ProducerInterceptor interceptor : this.interceptors) {

interceptRecord = interceptor.onSend(interceptRecord);

}

return interceptRecord;

}

onSend()方法里面调用用户自定义的拦截器。用户自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口。

调用了用户自定义的拦截器后,就进入了KafkaProducer.doSend()。

KafkaProducer.doSend()

private Future<RecordMetadata> doSend(ProducerRecord record, Callback callback) {

        TopicPartition tp = null;

        try {

// 发送之前首先需要确认topic对应的metadata(元数据)可用(topic的partition的主副本可用),如果没有metadata就要获取相应的metadata,获取metadata是阻塞的。总之必须metadata可用才会发送生产信息

ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

            Cluster cluster = clusterAndWaitTime.cluster; // 获取集群信息

            byte[] serializedKey;

// 序列化ProducerRecord中的KEY和VALUE

           serializedKey = keySerializer.serialize(record.topic(), record.key());


            byte[] serializedValue;

                serializedValue = valueSerializer.serialize(record.topic(), record.value());

// 根据ProducerRecord中的partition和KEY计算出要发送到的partition

            int partition = partition(record, serializedKey, serializedValue, cluster);

            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

// 根据ProducerRecord中计算出字节如果超出限制,则会抛出异常

            ensureValidRecordSize(serializedSize);

            tp = new TopicPartition(record.topic(), partition);

            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

// 追加数据到RecordAccumulator中(缓存)

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

// 如果追加数据后,RecordAccumulator中的数据已经达到限制,或者空间不足,则会唤醒发送线程(sender) ,把消息批量的提交到服务端。

            if (result.batchIsFull || result.newBatchCreated) {

                this.sender.wakeup();

            }

            return result.future;

        } catch (Exception e) {

            if (this.interceptors != null)

                this.interceptors.onSendError(record, tp, e);

            throw e;

        }

    }

发送过程分解分析

获取topic对应的metadata(元数据)

发送过程中,通过waitOnMetadata()来获取topic对应的metadata。因为metadata涉及的内容比较多。所以后面单独来写。总之必须metadata可用才会发送生产信息。

序列化ProducerRecord的Key和Value

KafkaProducer在发送消息之前需要对ProducerRecord中的Key和Value进行序列化操作,在KafkaComsumer端将对消息中的Key和Value进行反序列化操作。在kafka内部提供了序列化和反序列化默认实现类:

我们也可以自定义序列化(实现Serializer接口)和反序列化(实现Deserializer接口)类。然后同加载拦截器实现一样,使用自定义序列化和反序列化类。

HashMap config = new HashMap();

config.put("key.serializer", "xx.xx....");

config.put("value.serializer", "xx.xx....");

KafkaProducer  producer = new KafkaProducer(new Map(config));

接下来就应该选择把消息发送到topic的哪个分区上了

计算partition值

int partition = partition(record, serializedKey, serializedValue, cluster);

private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

Integer partition = record.partition();

return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

}

KafkaProducer中默认使用org.apache.kafka.clients.producer.internals.DefaultPartitioner来实现计算partition的值。

上面介绍过ProducerRecord,它里面的变量会用来计算partition。

算法如下:

1,如果指定了partition,那么消息会被发送到指定的分区(partition)中。

2,如果没有指定partition,但指定了key,那么会使用key进行hash计算,根据计算出来的值发送到对应的分区(partition)中。

3,如果没有指定partition,没有指定key,那么会使用round-robin模式(轮询模式)发送消息到分区(partition)中。

4,如果同时指定了partition和key。那么partition起作用(key无效),发送到partition指定的分区中。

代码如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

// 没有指定 key 的情况下

int nextValue = counter.getAndIncrement(); // counter(final类型的AtomicInteger)在计算的时候在此随机数的基础上自增;

// 获取topic中有效的分区信息(有效的分区代表这个分区的leader可以正常的提供读写服务)

List availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) { // 根据可用分区数量和随机数来计算partition

int part = Utils.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else { // 没有可用的分区,就使用topic下所有的分区数量来计算partition

return Utils.toPositive(nextValue) % numPartitions;

}

} else { // 有key的情况下,使用key的hash值进行计算partition

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

有了partition后就要向RecordAccumulator中追加消息了

向RecordAccumulator(记录收集器)追加消息

首先了解RecordAccumulator相关的生态类:

RecordAccumulator生态类

生产者向RecordAccumulator添加记录的原理如下:

就像向一个车间中各个不同的传送带上的箱子添加物品一样(一条传送带的名称代表一个TopicPartition,传送带代表一个队列,传送带上的箱子代表RecordBatch)。生产者把属于同一个传送带的物品,放到传送带的箱子里面,当箱子存满,或者当前箱子无法装下物品时,就会放在新的一个箱子里面,那个装满的箱子里面的物品就会等待被一次性的处理。如果传送带上面的箱子已经超出了传送带所能承受的容量,生产者后面生产的物品就必须等待,等待传送带释放出新的空间能放上新的箱子来装物品,如果等待一段时间后还没有空间来装物品,就放弃此物品。

看看RecordAccumulator周围的类:

RecordBatch

生产的批量记录的一个封装类,表示正在或者将要被发送的的一批记录(传送带上的箱子)。会拥有一个MemoryRecords的引用。

MemoryRecords

生产者发送的记录在内存中的一个记录集。记录最终存放的地方。

Compressor

负责执行追加写操作。

ProduceRequestResult

ProduceRequestResult是在初始化RecordBatch时建立。属于批次级的实例。ProduceRequestResult内置了CountdownLatch并且count为1。在外围调用FutureRecordMetadata的get方法中获取到记录的元数据时,会阻塞当前线程,必须等到ProduceRequestResult完成,也就是CountdownLatch变为0(在FutureRecordMetadata.get()中会调用ProduceRequestResult.CountdownLatch.await())。在Sender线程完成RecordBatch中的全部消息的发送并且收到服务端的响应后,会把CountdownLatch中的count变为0,这样外围阻塞的线程就会继续往下走获取到记录的元数据信息。

baseOffset变量:记录服务端响应客户端中的RecordBatch中第一个消息分配的的offset。

ReadyCheckResult

在发送线程(Sender)读取RecordAccumulator的消息进行发送的时候,首先会进行就绪检查:遍历所有的RecordAccumulator.ConcurrentMap中各个TopicPartition的Deque中的批记录集(RecordBatch),获取可以发送的RecordBatch对应的主节点(leader)信息,封装成ReadyCheckResult对象ReadyCheckResult:RecordAccumulator的内部类,有三个变量:

final Set<Node> readyNodes:可以发送的RecordBatch对应的主节点(leader)信息。

final long nextReadyCheckDelayMs:下一次就绪检查点的时间。

final Set<String> unknownLeaderTopics:分区的leader未知的topic信息。

FutureRecordMetadata

保存生产记录的元数据,包括:ProduceRequestResult实例,RecordBatch中保存的记录个数,产生出记录时候的时间,key的大小,value的大小,checksum(就是crc)等。每次追加完消息后返回FutureRecordMetadata实例,属于消息级的实例。提供get方法来让外围程序获取记录元数据信息,只是必须等到Sender线程完成RecordBatch的发送并且收到服务端的响应后,才返回元数据信息-RecordMetadata。

RecordMetadata

对批次元数据信息的封装。

RecordAppendResult

RecordAccumulator内部类持有FutureRecordMetadata对象实例。

final boolean batchIsFull:标识RecordBatch是否已满。

final boolean newBatchCreated:是否需要重新创建新的RecordBatch。

batchIsFull和newBatchCreated在调用RecordAccumulator.append()方法后来判断是否需要唤醒Sender线程进行发送消息。如果batchIsFull为true:代表双向队列里面有RecordBatch满了,可以唤醒发送线程发送消息了。如果newBatchCreated为true:代表旧的RecordBatch满了或者是装不下新的消息了,可以唤醒发送线程发送消息了。

BufferPool

每实例化一个KafkaProducer,对应一个RecordAccumulator实例,每一个RecordAccumulator实例对应一个BufferPool实例,BufferPool提供分配内存存放消息空间的方法:allocate和释放消息空间的方法:deallocate。每个BufferPool实例中包括:

totalMemory:池总的内存的总量。

poolableSize:控制Deque队列中每个ByteBuffer的大小。

lock(ReentrantLock):保证每次分配和释放空间线程安全。

Deque(free<ByteBuffer>):已经申请未使用的空间。

waiters(Deque):记录申请不到足够空间而阻塞的线程,队列中记录的是阻塞线程对应的Condition对象。

availableMemory:未申请未使用的空间。

分配空间的流程:

RecordAccumulator.BufferPool分配空间大小

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {

if (size > this.totalMemory) // 申请空间的大小已经超过了BufferPool总的字节大小抛出异常 throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations.");

this.lock.lock();

try {

// 申请的大小和Deque队列中ByteBuffer的大小一样,并且队列有存在已经申请未使用的内存,就直接使用队列中的ByteBuffer实例

if (size == poolableSize && !this.free.isEmpty())

return this.free.pollFirst();

// 计算出已经申请未使用的内存大小

int freeListSize = this.free.size() * this.poolableSize;

if (this.availableMemory + freeListSize >= size) { // 可用空间大于或等于要申请的空间 freeUp(size);

this.availableMemory -= size; // 减少size大小的未申请未使用的空间

lock.unlock();

return ByteBuffer.allocate(size); // 返回size大小的ByteBuffer

else { // 没有空间来分配申请的空间

int accumulated = 0;

ByteBuffer buffer = null;

Condition moreMemory = this.lock.newCondition();

long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory);

while (accumulated < size) {

long startWaitNs = time.nanoseconds();

long timeNs;

boolean waitingTimeElapsed;

try { // 如果累计的空间大小小于申请的空间大小,释放当前线程占有的锁,阻塞当前线程 // 阻塞remainingTimeToBlockNs时间后,没有被唤醒(unlock或者是await),返回false, waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

} catch (InterruptedException e) {

this.waiters.remove(moreMemory); throw e;

} finally {

long endWaitNs = time.nanoseconds();

timeNs = Math.max(0L, endWaitNs - startWaitNs); t

his.waitTime.record(timeNs, time.milliseconds());

}

if (waitingTimeElapsed) {

this.waiters.remove(moreMemory);

}

remainingTimeToBlockNs -= timeNs;

// 累计空间大小是0并且申请大小等于Deque队列中ByteBuffer的大小并且已经申请未使用的空间队列不为空

if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {

buffer = this.free.pollFirst(); // 使用Deque队列中的第一个ByteBuffer

accumulated = size; // accumulated=size 会跳出循环

} else {

freeUp(size - accumulated); // 释放空间

int got = (int) Math.min(size - accumulated, this.availableMemory);

this.availableMemory -= got; // 减少未申请未使用的空间的值

accumulated += got; // 增加累计大小的值,直到累计的大小大于申请的大小

}

}

Condition removed = this.waiters.removeFirst();

if (removed != moreMemory)

throw new IllegalStateException("Wrong condition: this shouldn't happen.");

if (this.availableMemory > 0 || !this.free.isEmpty()) {

if (!this.waiters.isEmpty())

this.waiters.peekFirst().signal(); // 修改Condition队列中节点的状态,让其中的节点可以被唤醒

}

lock.unlock();

if (buffer == null)

return ByteBuffer.allocate(size);

else

return buffer;

}

} finally {

if (lock.isHeldByCurrentThread())

lock.unlock();

}

}

IncompleteRecordBatches

在记录收集器添加发送数据的方法中(RecordAccumulator.append()),把那些发送尚未完成的RecordBatch保存到此集合中,作用是对所有的RecordBatch的监控和管理。在Sender线程把生产请求提交到服务端后(实际就是提交RecordBatch中的数据),服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,就从IncompleteRecordBatches集合中删除。(删除还有以下情况:Sender强制退出时,超时。此类操作的所有RecordBatch都加锁,保证线程安全。)

以上分析了记录收集器的生态圈,在KafkaProducer.doSend方法中把信息追加到记录收集器后,根据RecordAccumulator.append方法返回的RecordAccumulator.RecordAppendResult实例的batchIsFull和newBatchCreated变量来判断是否唤醒发送线程,进行信息发送。客户端可以从doSend方法返回的FutureRecordMetadata实例中获取到请求完成(收到服务器响应)后的元数据信息。

小结:以上就是客户端使用KafkaProducer发送消息,最后保存在记录收集器的过程。接下来将重点看看消息怎么进入记录收集器的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,699评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,074评论 0 43
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,424评论 0 34
  • 狼的动态
    JMLCY阅读 107评论 0 0