Kafka源码分析-Producer(2)-RecordAccumulator分析(1)

一.RecordAccumulator介绍

KafkaProducer可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用KafkaProducer.send()方法发送消息的时候,先将消息放到RecordAccumulator中暂存,然后主线程就可以从send()方法中返回了,此时消息并没有真正地送给Kafka,而是缓存到了RecordAccumulator中。之后,业务线程通过KafkaProducer.send()方法不断向RecordAccumulator追加消息,当达到一定的条件,会唤醒Sender线程发送RecordAccumulator中的消息。
RecordAccumulator至少有一个业务线程和一个Sender线程并发操作,必须是线程安全的。
RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是ArrayDeque<RecordBatch>(ArrayDeque并不是线程安全的集合,以后会介绍其加锁处理过程),其中缓存了发往对应TopicPartition的消息。每个RecordBatch有一个MemoryRecords对象的引用。MemoryRecords才是消息最终存放的地方。三个类的依赖关系:


image.png

二.MemoryRecords:

MemoryRecords是最底层的类,表示多个消息的集合,其中封装了java nio 的ByteBuffer用来保存消息数据,Compressor用于对ByteBuffer中的消息进行压缩,还有其他的控制字段。
有第四个字段比较重要:

  • buffer:用于保存消息数据的Java NIO ByteBuffer。
  • writeLimit:记录buffer字段最多可以写入多少个字节的数据。
  • compressor:压缩器,对消息数据进行压缩,将压缩后的数据输出到buffer。
  • compressor:压缩器,对消息数据进行压缩,将压缩后的数据输出到buffer。
  • writable: MemoryRecords是只读模式还是,可写模式。在MemoryRecords发送前,会将其设置成只读模式。


    image.png

Compressor

Compressor类里有两个输出流类型的字段,分别是bufferStream和appendStream。bufferStream是在ByteBuffer上建立的ByteBufferOutputStream(Kafka自己实现)对象。ByteBufferOutputStream继承了java.io.OutputStream,封装了ByteBuffer,当写入的数据超出ByteBuffer容量时,ByteBufferOutputStream会进行自动扩容。appendStream是对bufferStream进行的一层装饰,为其添加了压缩的功能。MemoryRecords中的Compressor的压缩类型是由“compression.type”配置参数指定的,即KafkaProducer.compressionType字段的值。下面代码分析下创建压缩流的方式,目前KafkaProducer支持GZIP,SNAPPY三种压缩方式。

public Compressor(ByteBuffer buffer, CompressionType type) {
        this.type = type;//从KafkaProducer传过来的压缩类型。
        this.initPos = buffer.position();

        this.numRecords = 0;
        this.writtenUncompressed = 0;
        this.compressionRate = 1;
        this.maxTimestamp = Record.NO_TIMESTAMP;

        if (type != CompressionType.NONE) {
            // for compressed records, leave space for the header and the shallow message metadata
            // and move the starting position to the value payload offset
            buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
        }

        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
//下面根据压缩类型创建合适的压缩流。
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }


public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        try {
            switch (type) {
                case NONE:
                    return new DataOutputStream(buffer);
                case GZIP://使用java自带的GZIP压缩方式
                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                case SNAPPY:
                    try {
                        //使用反射方式创建snappy压缩
                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                case LZ4:
                    try {
                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                default://不支持的压缩方式,抛出异常
                    throw new IllegalArgumentException("Unknown compression type: " + type);
            }
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

GZIPOutputStream是JDK自带的包,所以直接实例化。而snappy压缩使用的是反射创建对象。好处是,避免了运行时的依赖,这里用了反射的方式动态创建。这种设计的小技巧,值得学习。

// dynamically load the snappy and lz4 classes to avoid runtime dependency if we are not using compression
    // caching constructors to avoid invoking of Class.forName method for each batch
    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new ConstructorSupplier() {
        @Override
        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
            return Class.forName("org.xerial.snappy.SnappyOutputStream")
                .getConstructor(OutputStream.class, Integer.TYPE);
        }
    });

snappyOutputStreamSupplier对snappy压缩算法的Constructor进行cache,避免每个batch都要Class.forName();
Compressor提供了一系列的putXXX()方法,向
appendStream流写入数据,这是一个典型的装饰器模式,通过bufferStream装饰ByteBuffer,给ByteBuffer添加扩容能力,通过appendStream装饰bufferStream添加压缩能力。

image.png

Compressor.estimatedBytesWritten()方法根据指定压缩方式的压缩率,写入的未压缩数据的字节数(writtenUncompressed字段记录), 估算因子COMPRESSION_RATE_ESTIMATION_FACTOR,估计已写入的(压缩后的)字节数,此方法主要用于在判断MemoryRecords是否写满的逻辑中使用。
我们继续分析MemoryRecords,MemoryRecords的构造方法是私有的,只能通过emptyRecords()方法得到其对象。MemoryRecords有四个比较重要的方法。

  • append()方法:先判断MemoryRecord是否为可写模式,然后调用Compressor.put*()方法,将消息写入ByteBuffer中。
    类MemoryRecords:
/**
     * Append the given record and offset to the buffer
     */
    public void append(long offset, Record record) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        int size = record.size();
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.put(record.buffer());
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        record.buffer().rewind();
    }

出现ByteBuffer扩容的情况时,MemoryRecords.buffer字段与ByteBufferOutputStream.buffer字段所指向的不再是同一个ByteBuffer对象,如下代码和图:
类ByteBufferOutputStream:

private void expandBuffer(int size) {
        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
        ByteBuffer temp = ByteBuffer.allocate(expandSize);
        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
        buffer = temp;
    }
image.png
  • close()方法:会将MemoryRecords.buffer指向扩容后的ByteBufferOutputStream.buffer指向的ByteBuffer。


    image.png
  • hasRoomFor()方法:根据Compressor估算的字节数,估计MemoryRecords剩余空间是否足够写入指定的数据。注意,这里是估算,通过hasRoomFor()判断之后写入数据,也可能会导致底层ByteBuffer出现扩容的情况。

*sizeInBytes()方法:对于可写的MemoryRecords,返回的是ByteBufferOutputStream.buffer大小;对于只读MemoryRecords,返回的是MemoryRecords.buffer的大小。(因为如果ByteBuffer扩容,扩容后的ByteBuffer在ByteBufferOutputStream里)

/**
     * The size of this record set
     */
    public int sizeInBytes() {
        if (writable) {
            return compressor.buffer().position();
        } else {
            return buffer.limit();
        }
    }

MemoryRecords还提供了迭代器,用于Consumer端读取其中的消息。

三.RecordBatch:

每个RecordBatch对象中封装了一个MemoryRecords对象,同时还封装了很多控制信息和统计信息:

  • recordCount:记录了保存的Record个数。
  • maxRecordSize:最大Record的字节数。
  • attempts:尝试发送当前RecordBatch的次数。
  • lastAttemptMs:最后一次尝试发送的时间戳。
  • records:指向用来存储数据的MemoryRecords对象。
  • topicPartition:当前RecordBatch中缓存的消息都会发送给此topicPartition。
  • produceFuture:ProduceRequestResult类型,标识RecordBatch状态的Future对象。
  • lastAppendTime:最后一次向RecordBatch追加消息的时间戳。
  • thunks:Thunk对象的集合,后面介绍。
  • offsetCounter:用来记录某消息在RecordBatch的偏移量。
  • retry:是否正在重试。如果RecordBatch中的数据发送失败,则会重新尝试发送。
    RecordBatch与相关类的关系:
image.png

ProduceRequestResult

分析下ProduceRequestResult这个类的功能。ProduceRequestResult并未实现java.util.concurrent.Future接口,但是通过包含一个count值为1的CountDownLatch对象,实现了,类似Future的功能。
当RecordBatch中全部的消息被正常响应,或超时,或关闭生产者时,会调用ProduceRequestResult.done()这个方法,将produceFuture标记为完成并通过ProduceRequestResult.done()方法,将producerFuture标记为完成并通过ProduceRequestResult.error字段区分“异常完成”还是“正常完成”,之后调用CountDownLatch对象的countDown()方法。此时,会唤醒阻塞在CountDownLatch对象的await()方法的线程(这些线程通过c的await方法等待上述三个事件的发生)。

  • baseOffset
    前面提到过,分区会为记录的消息分配一个offset并通过此offset维护消息顺序。ProduceRequestResult内的一个字段baseOffset表示服务端为此RecordBatch中第一条消息分配的offset,每个信息可以根据baseOffset 和自身在此RecordBatch中的相对偏移量,计算出其在服务器端分区中的偏移量了。
  • Thunk
    KafkaProducer.send()方法的第二个参数,是一个Callback对象,它是针对单个消息的回调函数(每个消息都会对应一个Callback对象作为回调)。RecordBatch.thunks字段可以理解为消息的回调对象队列,Thunk中的callback字段指向对应消息的Callback对象,另一个字段future是FutureRecordMetadata类型。FutureRecordMetadata类有两个关键字段。
  1. result:ProduceRequestResult类型,指向对应消息所在RecordBatch的produceFuture字段。
  2. relativeOffset:long类型,记录了对于消息在RecordBatch中的偏移量。
    FutureRecordMetadata实现了java.util.concurrent.Future接口,但是实现都由引用的ProduceRequestResult对应的方法实现。所以相信消息是按照RecordBatch进行发送和确认的。
    当生产者已经收到某消息的响应时,FutureRecordMetadata.get()方法会返回RecordMetadata对象,包含消息所在Partition中的offset等其他元数据,供用户自定义的Callback使用。
    分析完RecordBatch依赖的组件,再来看RecordBatch类的核心方法。tryAppend()是最核心的方法,功能是尝试将消息添加到当前的RecordBatch中缓存:
/**
     * Append the record to the current record set and return the relative offset within that record set
     * 
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        //估算剩余空间是否不足,是个估计值,不是准确值。
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
           //向MemoryRecords中添加数据,offsetCounter是
           //在RecordBatch中的偏移量
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            //相关统计信息
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
           //将用户自定义Callback和FutureRecordMetadata
           //封装成Thunk,保存到thunks集合中
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

当RecordBatch成功收到正常响应,超时,关闭生产者时,会调用RecordBatch的done()方法。在done()方法中,会回调RecordBatch中全部消息的Callback回调,并调用produceFuture字段的done()方法。RecordBatch.done()方法的调用场景如下:


image.png

1.发送超时。
2.收到服务端发回的正常响应。
3.生产者关闭时被丢弃。

/**
     * Complete the request
     * 
     * @param baseOffset The base offset of the messages assigned by the server
     * @param timestamp The timestamp returned by the broker.
     * @param exception The exception that occurred (or null if the request was successful)
     */
    public void done(long baseOffset, long timestamp, RuntimeException exception) {
        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
                  topicPartition,
                  baseOffset,
                  exception);
        // execute callbacks
        //循环执行每个消息的callback
        for (int i = 0; i < this.thunks.size(); i++) {
            try {
                Thunk thunk = this.thunks.get(i);
                if (exception == null) {
                    //将服务端返回的信息(offset和timestamp)和消息的其他信息封装成RecordMetadata
                    // If the timestamp returned by server is NoTimestamp, that means CreateTime is used. Otherwise LogAppendTime is used.
                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                                 timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                                                                 thunk.future.checksum(),
                                                                 thunk.future.serializedKeySize(),
                                                                 thunk.future.serializedValueSize());
                    //调用消息对应的自定义Callback
                    thunk.callback.onCompletion(metadata, null);
                } else {
                    thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
            }
        }
        //标识整个RecordBatch都已经处理完成了
        this.produceFuture.done(topicPartition, baseOffset, exception);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,082评论 1 32
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,292评论 8 265
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,378评论 25 707
  • 自从去年离开那个公司,总是不断在梦中出现断断续续的片段,这一年了没有一丝一毫的后悔,但在家总是不断反省,自己在工作...
    幸运的xiaotang阅读 333评论 0 0
  • 叙: 铃兰族族长——我铃兰族在华夏土地生生不息,一直秉持着“维我独尊”的戒训,想沾染我族的任何外人,死! 铃兰族“...
    赵凡一阅读 322评论 1 1