这篇文章我们来重点分析producer端一个重要组件:BufferPool
。
总览
从源码分析2这篇文章中可以了解到,每当我们想要新创建一个ProducerBatch
的时候,都需要向BufferPool
申请一块内存,这块内存其实就是一个ByteBuffer
,这块内存的大小只有两种情况:
- 如果申请的内存小于
batch.size
(默认16KB),那么分配一个batch.size
大小的ByteBuffer
- 如果申请的内存大于了
batch.size
,那么就按需分配一块正好的内存
但是BufferPool
也不能无脑的给所有请求分内存,我们回想一下,producer端有一个很重要的参数,buffer.memory
(默认32MB),这个参数代表的是RecordAccumulator
所能容纳的最大的recordSize
,其实这个参数经过层层传递,最终传给了BufferPool
,由BufferPool
来实际管理RecordAccumulator
的容量。RecordAccumulator
只需要关注自身的一些append、ready
等等逻辑就好啦,职能很明确。
所以这里引出了BufferPool
的第一个职责:
管理内存的使用情况
我们再继续来看一下BufferPool
这个名字,xxx池,让人不禁联想到线程池、连接池等等。作为一个池子,他也得具备另外一个很重要的功能:
资源复用
这里所提到的资源,指的就是ByteBuffer
,但是所有的ByteBuffer
都需要复用吗?上面我们提到了两种不同大小的ByteBuffer
,一种大小固定为batch.size
,一种大小超过batch.size
。我们想下,一般消息大小是不会超过batch.size
的,所以理论上来讲,由BufferPool
分配出来的ByteBuffer
,应该大部分都是batch.size
大小,只有极少数的消息很大,超过了batch.size
,那么BufferPool
会给他们单独分配一个“特殊的”ByteBuffer
。所以从这点看,超过batch.size
的ByteBuffer
复用并没有什么意义,即使复用了之后也不一定用得上,所以在BufferPool
内部,复用的都是大小为batch.size
的ByteBuffer
。
内部结构
文章的最后会贴上BufferPool的源代码,以及一大串带有我自己理解的注释。这里呢,我们先画一张简图,简单描述下BufferPool
内部都有哪些值得我们重点关注的东西。
-
totalMemory
:上面提到了buffer.memory
这个参数最终会传入到BufferPool
当中,没错,就是totalMemory这个内部成员变量; -
poolableSize
:这个变量的意义也很明确的。batch.size
这个参数最终也是需要传入到BufferPool
中的,没错,就是poolableSize
这个变量。而且这个名字大家可以感受下,batch.size
,一个batch的大小,很好理解;poolableSize
,“可池化的大小”?可能稍微得思考一下。不过想清楚之后发现这个命名真的是精准!因为只有这个大小的ByteBuffer
才可以进入下面即将提到的free这个“池子”,所以在BufferPool
内部称之为“可池化的大小”是真的很准确。 -
free
: 这是一个容器,实际上是一个队列,Deque<ByteBuffer> free
,上面也提到了BufferPool
需要做到ByteBuffer
的复用,就体现在这个队列上。这个队列中的所有ByteBuffer
的大小都是batch.size
,都是可以拿来直接使用的; -
nonPooledAvaliableMemory
:依然是一个很精准的命名:“未被池化的可使用空间”。首先,它是BufferPool
一部分可以使用的内存空间,注意,我这里提到的是一部分,因为它仅仅代表“未被池化的”这一部分,也就是说,BufferPool
内部还有另外一部分可使用的空间是“已被池化的”。那就有问题了,既然有nonPooledAvaliableMemory
,为什么没有一个pooledAvaliableMemory
变量呢?因为'pooledAvaliableMemory' = free.size() * poolableSize
。 -
lock
: 这就是一个很简单的可重入锁,那为什么还要单独拿出来作为BufferPool
一个重要的成员变量单独说一下呢?我们想一下,假如分配内存空间的过程没有加锁的控制,在多线程下的计算逻辑必定会出错,所以lock
在BufferPool
中是一个不可或缺的部分。同时,BufferPool
分配内存空间实际上是在业务线程中做的,所以加锁也不能对业务线程有太大的影响,BufferPool
内部加锁以及释放锁的位置都很巧妙,也很值得我们去学习,有关加锁和释放锁的部分都在后面贴的代码的注释中有所体现。
waiters
:这个变量翻译为“等待者”,也是一个队列,Deque<Condition> waiters
,具体怎么使用会在后面贴的源代码中有解释,这里不准备展开详细说,但是可以简单介绍一下它和我们调用的producer.send(xxx)
这个方法之间的联系、。我们知道send
这个方法是异步的,也就是说调用send
的线程只要将消息追加到RecordAccumualator
中就可以立即返回,而不需要等待消息真正被发送到broker
上。但是这个线程一定不会被阻塞吗?实际上不是的,从哪可以证实呢?producer
端有一个参数叫做:max.block.ms
(默认是60s),代表这个发方法可以阻塞的最长时间。其中有一个会阻塞的地方就是假如Accumualtor
满了,装不下新到来的ProducerRecord
,那么就会阻塞。根据上面重要成员变量的分析,“Accumualtor
满了,装不下新到来的ProducerRecord
”实际上指的就是BufferPool
无法给分配新的内存空间了,那么这个线程就得在这等待,waiters
就是存储了所有因为无法分配内存空间而等待的线程,只不过实际存的不是线程本身,而是给每个线程分配的一个Condition
对象。
这里简单说下,为什么需要等待,而不是直接返回分配失败?
实际上,只要我们producer参数配置合理,应该是不存在内存空间分配不足的情况的。一旦出现了这种情况,如果我们让这个线程稍微等一等,大概率是会有其他被发送出去的record
来归还ByteBuffer
,这样等一等的这次请求就也可以追加到RecordAccumulator
中了,而且等待的时间一般不会很长。但是可能有一些业务就是容忍不了block的这段时间,所以kafka把这个等待时间设置为一个参数,如果一点block的时间都容忍不了,可以将max.block.ms
设置为0。
分配内存空间流程
这里我简单整理了一下BufferPool
分配内存空间的主干流程,其中没有包含加锁解锁的过程,看起来仍然是很繁琐...因为这个分配过程确实需要判断的内容很多。
大家可以自行查看上图中的一些判断流程,下面会针对一些比较重要或者难以理解的步骤来做详细说明;
- 计算可分配容量:这里主要想简单说明一下可分配容量应该如何计算,其实上面在讲解
BufferPool
内部结构的一节中就简单谈过这个问题。可分配容量分为两部分,一部分是“已被池化的”可分配容量(free.size() * poolableSize
),一部分是“未被池化的”可分配容量(nonPooledAvaliableMemory
),两者求和就是总可分配容量; - 碎片整理:这个名字是我自己起的,不是官方给出的定义,大家可以尝试理解一下哈。不过在代码中的体现是一个方法的名字(
freeUp()
)。这个方法完成了什么事情呢?就是循环比较size
和nonPooledAvaliableMemory
的大小,如果size大于了nonPooledAvaliableMemory
,那么将free
中的头部第一个ByteBuffer
释放,将释放的空间加入到nonPooledAvaliableMemory
中,继续循环比较,直到size
小于了nonPooledAvaliableMemory
或者free
为空。整理一下思路,其实就是想给待分配的请求整理出一块连续的内存空间,所以我觉得和“碎片整理”的思路很相似,具体实现可以看下面源代码中的freeUp()
方法。 -
accmulated
:主要是说明一下这个变量的含义。从图中可以看出,这个字段的出现是在判断“size超过了可分配容量”成立之后,那后面就有可能会需要通过多次的“碎片整理”来给这次请求分配一个完整的内存空间,所以需要一个字段来记录一下当前已经给这次请求分配了多大的内存了。 -
await()
方法:就是在可分配内存不够时,阻塞当前的线程,等待其他内存空间的归还,再依次唤醒等待的线程; - 这是一个多条件的判断,综合来看,就是在等待被唤醒之后,先判断一下这次能不能从已经池化的空间中拿到一块。所以从这个判断上可以看出,整个分配的就成就是能尽量不分配新的就别分配新的,因为开辟一块内存空间相较来说还是一个比较重的操作;
- 这点简单说一下,是在阻塞线程被唤醒之后,发现当前所需空间是没办法从
free
中直接拿来用的,那就得给“整理碎片”,然后拼凑出来一块完整的内存,但是有可能“碎片整理”完之后还是不够,那就将目前能分配出来的最大内存存到acumulated
字段中,继续循环wait()
。
源代码分析
/*
这就是分配空间的入口,传入两个参数,size代表将要分配的内存空间,maxTimeToBlockMs代表如果现在空间不足以分配,最大的block时长
所以如果maxTimeToBlockMs这个参数大于0,BufferPool无法及时的分配出来需要的空间大小,那么调用的线程会被阻塞。
这里就要再延伸一点,调到这里的线程就是我们的业务线程。所以即使调的producer.send(xxx)是一个异步的接口,但是如果分配不出来新的ByteBuffer了,业务线程也是会被阻塞的。
*/
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
/*
这里先判断下所需要分配的内存空间是不是超过了最大容量,如果超过了,直接抛异常结束,这个异常会直接抛给业务线程
*/
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
// 这个buffer对象就是最终要分配出来的buffer对象,跟住这个对象
ByteBuffer buffer = null;
// 先上锁,代表同一时刻只能有一个线程在分配内存
this.lock.lock();
// 如果在分配过程中发现BufferPool已经close了,那也是直接抛异常出去
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
/*
首先检查需要分配的内存大小是不是poolableSize,如果是,且池子中恰好也有可用的内存空间,直接拿去用就好
要记得free这个队列中存的ByteBuffer大小都是poolableSize大小的
*/
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
/*
代码走到这里,说明要么待分配的内存空间大小>poolableSize,要么正好等于poolableSize,但是池子中已经没有分配好的ByteBuffer可以用了
那接下来就需要我们判断BufferPool剩余的内存空间还够不够这次分配,如果够,就给划分一个ByteBuffer,如果不够了,就阻塞当前线程等待,直到内存空间又充足了或者到了最大的阻塞时间
如何计算当前BufferPool中还有多少的剩余内存呢?
有两部分,就是下面if语句的条件:this.nonPooledAvailableMemory + this.free.size() * this.poolableSize
一部分是未被分配的内存大小nonPooledAvailableMemory,一部分是已经分配了但是还没有用的ByteBuffer的总大小
*/
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
/*
走到这儿,代表现在剩余的内存空间是够的
那么接下来的freeUp方法所要做事情就是要凑出来一个size大小的内存空间
什么意思呢?我们可以看一下这个方法的实现
*/
freeUp(size);
/*
到这里,我们是一定可以凑足size大小的内存空间的,还没有弄清楚的同学可以理一下上面的逻辑
所以这里直接将这部分内存空间从nonPooledAvailableMemory中划出。具体分配ByteBuffer的操作是在finally块中完成的
*/
this.nonPooledAvailableMemory -= size;
} else {
/*
走到这儿,说明当前BufferPool中剩余的内存已经不足以分配了,现在需要阻塞待分配的线程,等待BufferPool的内存重新充足。
说一下accumulated这个int值的含义:如果我们要分配一个比较大的内存空间(大于了poolableSize),走到这儿代表BufferPool剩余的内存不够,所以需要等着内存慢慢回收回来。
但是不一定一次就都回收回来,所以每次回收一点这个值就加一点,直到accumulated达到了size大小,认为内存已经回收够了
*/
int accumulated = 0;
/*
从这里开始,到while循环这段代码统一说一下。
首先就是每个在上面获取到锁的线程,到这里都会给分配一个Condition,这个东西就是类似于monitor的作用,
在这里Condition的作用就是当一个线程因为BufferPool内存空间不足以分配自己所需的空间大小时,就开始等待,并释放锁,让其他想要来分配内存空间的的线程也可以继续分配。
如果其他线程所需的内存空间比你小,BufferPool足够,那这个线程就直接带着分配好的内存空间离开,当前线程还是继续等待,这就是这个Condition的意义。
所以kafka对外宣称producer是线程安全的,支持多个线程调用同一个producer发送数据。都是得益于BufferPool这里的lock,因为每一个想要发送数据的操作都绕不开分配内存空间。
那这里有一个问题:
为什么每个线程都要个分配一个Condition呢?Condition的用法不应该是多个线程竞争一个对象吗?
这里说一下我自己的猜测,因为BufferPool需要自己管理起来当前正在有哪些线程正在wait,在等待分配内存空间,同时需要自己管理这些线程分配空间的顺序。
举一个例子,当BufferPool需要销毁的时候,会调用close()方法将close这个状态置为true。那么此时会遍历waiters列表将所有的waiter唤醒,让这些线程感知到close状态的变化及时地处理(抛异常出去)。这一点是一个Condition实例无法做到的。
所以每到有线程走到这里,等待BufferPool的剩余内存达到自己的要求,都会获取一个condition然后加到waiters队列中管理起来。然后进入一个循环,开始wait,等待被唤醒。
*/
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 进入循环体之后就开始wait,直到被唤醒,具体唤醒时机这里就不详细展开说了
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
// 每次循环时都需要检查一下当前的closed状态。我们上边提到过了,如果这个BufferPool销毁了,会唤醒所有在wait的线程然后走到这里,抛出异常
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
// 这个状态代表是说当前线程从wait状态醒来不是因为其他线程的唤醒,而是因为到了超时时间,说明到了超时时间还没有得到足够的内存空间,那么直接抛异常,这次发送失败
if (waitingTimeElapsed) {
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
/*
循环走到这里说明是被其他线程唤醒,之后需要再检查一下当前的BufferPool的内存空间是否已经满足。
因为是存在仍然不满足的情况的,如果仍然不满足,那么再一次循环的时候需要继续休眠,所以这里需要计算一下再次休眠的超时时长。
*/
remainingTimeToBlockNs -= timeNs;
/*
下面开始判断当前BufferPool中是否有足够的空间。
首先if表达式中的条件判断的是假如当前待分配的空间大小就是poolableSize,而且当前free队列中已经有了可使用的ByteBuffer了,那就直接把这个拿出去用就可以了
*/
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// 这里注意buffer这个对象,buffer这个对象之前一直都是空,在这里直接指向了free出栈的第一个ByteBuffer
buffer = this.free.pollFirst();
accumulated = size;
} else {
/*
否则,不管当前需要的空间大小是不是poolableSize,都得从nonPooledAvailableMemory这一块中向外划分
所以这里得需要再整理一下未分配的内存空间,相当于是碎片整理
*/
freeUp(size - accumulated);
/*
然后开始给当前线程实际分配,如果nonPooledAvailableMemory已经可以满足当前的需求,那直接取所需即可,如果还不能满足,那先把当前nonPooledAvailableMemory已有的容量先都拿着,再来一次循环。
*/
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
/*
到这是跳出循环了,然后将accumulated变量清零。这个逻辑要结合finally块中的代码一起看,finally中又将accumulated变量加回到了nonPooledAvailableMemory中。
如果在循环中没有抛异常,正常是会执行到这行代码的,说明此时我们已经给这个线程的需求分配了所需的内存空间,那么内存就实际已经给分配出来了,我们不需要将内存归还给nonPooledAvailableMemory。
但是如果循环中抛了异常,直接进finally代码块,不管抛了什么异常,都代表此次内存分配失败,我们现在都需要将之前分配出来的内存归还回去。
*/
accumulated = 0;
} finally {
/*
这个方法中有很多finally块,分别使用在了不同地方。这种写法也很值得我们学习。
比如说这个finally块,try保护的范围基本就是while循环,换句话说就是如何等待足够的内存空间这一部分代码。
我们将这一部分代码作为一个整体来看,不管这部分代码有没有出现异常,我们终归是要做一些后置工作,比如这两行代码:
1. 重置一下nonPooledAvailableMemory,因为while循环中会修改这个成员变量
2. 修改waiters中的成员
*/
this.nonPooledAvailableMemory += accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
/*
这个finally对应的try的范围比上一个finally对应的try的范围要大一些,基本囊括了整个分配内存空间的过程,
那这里就有一个问题了,为什么需要两个finally块分别做不同的事情呢?
首先我们需要认识到,在finally块内的逻辑,是要求不管代码中是否有抛异常都必须要执行的逻辑。
我们来观察这两个finally对应的try块所包括的范围就可以发现端倪。
上一个try+finally的代码是在一个else中的,这个else分支代表的是当前BufferPool内的容量不够当前的分配需求。所以搞了一个Condition,又走了一个while循环,所以针对这个分支,最后需要做一些后置工作,比如重置nonPooledAvailableMemory,比如更新waiters,但这些后置处理对于另一个if分支是不需要处理的。
但是当前这个finally块就不一样了,这个finally可以理解为只要开始分配内存,就一定会走到这里。
所以这里完成了一些更加基础的操作,我们来看一下:
1. 如果当前还有内存可用,且还有waiter在等待,那么继续唤醒下一个waiter。一个简单的使用场景就是当前有很多waiter排队等着分配内存,这时BufferPool回收了一块内存,然后唤醒了一个线程,这个线程取走了自己所需的内存之后发现还有剩余,那就顺便再唤醒下一个等待的线程。
2. 解锁,解锁这个不用多说了,而且解锁还需要再套一个finally才行,因为上一个操作也可能抛异常,十分严谨。
*/
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
lock.unlock();
}
}
/*
接近这个方法的尾声了。我们回头想一下,上面一通操作,除了待分配的内存大小是poolableSize且正好有一个buffer可用的情况,其余从nonPooledAvailableMemory分配内存的情况,就只是减去了一个数量,还没有实际分配呢。
是这样的,所以在这里,通过判断buffer是不是空,才真正做到了ByteBuffer的创建。
这里可以看出,分配内存实际上是一个相对来说比较重的操作,所以kafka把它包成一个方法,并尽量少的使用。这个方法前面通过各种判断,到这里才终于确认可以创建一块新内存了。
而且注意一下,方法到这里已经解锁了,说明实际分配内存这个比较耗时的地方并没有在同步代码块中,不过上面的同步代码块已经可以保证线程安全性,所以这里即使不加锁也不会有问题的。这样极大地提高了同步代码的执行效率。
但是创建的时候不是简单的allocate就完了,我们看下这个方法都做了些什么。
*/
if (buffer == null)
return safeAllocateByteBuffer(size);
else
return buffer;
}
/*
方法很简单,如果nonPooledAvailableMemory > size,那么方法都不会执行,因为本身剩余的空间就够分配size大小的内存空间了
但是如果nonPooledAvailableMemory不够了,就需要把free队列中的一些ByteBuffer让出来增加nonPooledAvailableMemory的大小,知道nonPooledAvailableMemory > size
这里可以看到,循环体中做了两件事,一个是将一个ByteBuffer从列表中移除(本来分配的也是堆上内存,所以会被GC),然后将让出来的这部分空间大小加到nonPooledAvailableMemory上
这里加的值不是直接加的poolableSize,而且ByteBuffer真实的capacity(),具体为什么之后会讲到
*/
private void freeUp(int size) {
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
/**
* Allocate a buffer. If buffer allocation fails (e.g. because of OOM) then return the size count back to
* available memory and signal the next waiter if it exists.
*/
private ByteBuffer safeAllocateByteBuffer(int size) {
boolean error = true;
try {
ByteBuffer buffer = allocateByteBuffer(size);
error = false;
return buffer;
} finally {
if (error) {
// 如果分配内存没有分配成功,这个地方涉及到nonPooledAvailableMemory和waiters的修改,所以又要加锁了
this.lock.lock();
try {
this.nonPooledAvailableMemory += size;
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
this.lock.unlock();
}
}
}
}