集群环境规划
操作系统的选型
目前部署Kafka最多的3类操作系统分别是Linux、OS X和Windows,其中部署在Linux上的最多,而Linux也是推荐的操作系统。为什么呢?且不说当前的现状的确是Linux服务器数量最多,单论它与Kafka本身的相适性,Linux也要比Windows等其他操作系统更加适合部署Kafka。这里笔者罗列出自己能想到的两个主要原因:I/O模型的使用和数据网络传输效率。
谈到I/O模型,就不能不说当前主流且耳熟能详的5种模型:阻塞I/O、非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O。每一种I/O模型都有典型的使用场景,比如Socket的阻塞模式和非阻塞模式就对应于前两种模型,而Linux中的select函数就属于I/O多路复用模型,至于第5种模型其实很少有UNIX和类UNIX系统支持,Windows的IOCP(I/O Completion Port,简称IOCP)属于此模型。至于大名鼎鼎的Linux epoll模型,则可以看作兼具第3种和第4种模型的特性。
由于篇幅有限,我们不会针对每种I/O模型进行详细的展开,但通常情况下我们会认为epoll比select模型高级。毕竟epoll取消了轮询机制,取而代之的是回调机制(callback)。这样当底层连接Socket数较多时,可以避免很多无意义的CPU时间浪费。另外,Windows的IOCP模型可以说是真正的异步I/O模型,但由于其母系统的局限性,IOCP并没有广泛应用。
说了这么多,这些和Kafka又有什么关系呢?关键就在于clients底层网络库的设计。Kafka新版本clients在设计底层网络库时采用了Java的Selector机制,而后者在Linux上的实现机制就是epoll;但是在Windows平台上,Java NIO的Selector底层是使用select模型而非IOCP实现的,只有Java NIO2才是使用IOCP实现的。因此在这一点上,在Linux上部署Kafka要比在Windows上部署能够得到更高效的I/O处理性能。
对于第二个方面,即数据网络传输效率而言,Linux也更有优势。具体来说,Kafka这种应用必然需要大量地通过网络与磁盘进行数据传输,而大部分这样的操作都是通过Java的FileChannel.transferTo方法实现的。在Linux平台上该方法底层会调用sendfile系统调用,即采用了Linux提供的零拷贝(Zero Copy)技术。
如前面章节所言,这种零拷贝技术可以有效地改善数据传输的性能。在内核驱动程序处理I/O数据的时候,它可以减少甚至完全规避不必要的CPU数据拷贝操作,避免数据在操作系统内核地址空间和用户应用程序地址空间的缓冲区间进行重复拷贝,因而可以获得很好的性能。Linux提供的诸如mmap、sendfile以及splice等系统调用即实现了这样的技术。
然而对于Windows平台而言,虽然它也提供了TransmitFile函数来支持零拷贝技术,但是直到Java 8u60版本Windows平台才正式让FileChannel的transferTo方法调用该函数。具体详见这个JDK bug:http://bugs.java.com/view_bug.do?bug_id=8064407。
鉴于很多公司目前的生产环境中还没有正式上线Java 8,因而在Windows平台上部署Kafka将很有可能无法享受到零拷贝技术带来的高效数据传输。
磁盘规划
如果问哪个因素对Kafka性能最重要?磁盘无疑是排名靠前的答案。众所周知,Kafka是大量使用磁盘的。Kafka的每条消息都必须被持久化到底层的存储中,并且只有被规定数量的broker成功接收后才能通知clients消息发送成功,因此消息越是被更快地保存在磁盘上,处理clients请求的延时越低,表现出来的用户体验也就越好。
在确定磁盘时,一个常见的问题就是选择普通的机械硬盘(HDD)还是固态硬盘(SSD)。机械硬盘成本低且容量大,而SSD通常有着极低的寻道时间(seek time)和存取时间(access time),性能上的优势很大,但同时也有着非常高的成本。因此在规划Kafka线上环境时,读者就需要根据公司自身的实际条件进行有针对性的选型。但以笔者使用Kafka的经验来看,Kafka使用磁盘的方式在很大程度上抵消了SSD提供的那些突出优势。众所周知,SSD强就强在它不是机械装置,而全部由电子芯片及电路板组成,因而可以极大地避免传统机械硬盘缓慢的磁头寻道时间。一般机械硬盘的寻道时间都是毫秒级的。若有大量的随机I/O操作,则整体的磁盘延时将是非常可观的,但SSD则不受这样的拖累。可是这点差异对于Kafka来说又显得不是那么重要。为什么?因为Kafka是顺序写磁盘的,而磁盘顺序I/O的性能,即使机械硬盘也是不弱的——顺序I/O不需要频繁地移动磁头,因而节省了耗时的寻道时间。所以从磁盘的使用这个方面来看,笔者并不认为两者有着巨大的性能差异。关于Kafka底层的持久化实现,我们会在第6章中详细讨论。因此对于预算有限且追求高性价比的公司而言,机械硬盘完全可以胜任Kafka存储的任务。
既然是资源规划和硬件选型,我们不妨看下LinkedIn公司是怎么做的。
之前提到过,LinkedIn公司目前的Kafka就搭建于RAID 10之上。他们在Kafka层面设定的副本数是2,因此根据RAID 10的特性,这套集群实际上提供了4倍的数据冗余,且只能容忍一台broker宕机(因为副本数=2)。若LinkedIn公司把副本数提高到3,那么就提供了6倍的数据冗余。这将是一笔很大的成本开销。但是,如果我们假设LinkedIn公司使用的是JBOD方案。虽然目前JBOD有诸多限制,但其低廉的价格和超高的性价比的确是非常大的优势。另外通过一些简单的设置,JBOD方案可以达到和RAID方案一样的数据冗余效果。比如说,如果使用JBOD并且设置副本数为4,那么Kafka集群依然提供4倍的数据冗余,但是这个方案中整个集群可以容忍最多3台broker宕机而不丢失数据。对比之前的RAID方案,JBOD方案没有牺牲任何高可靠性或是增加硬件成本,同时还提升了整个集群的高可用性。
事实上,LinkedIn公司目前正在计划将整个Kafka集群从RAID 10迁移到JBOD上,只不过在整个过程中JBOD方案需要解决当前Kafka一些固有缺陷,比如:
任意磁盘损坏都会导致broker宕机——普通磁盘损坏的概率是很大的,因此这个缺陷从某种程度上来说是致命的。不过社区正在改进这个问题,未来版本中只要为broker配置的多块磁盘中还有状态良好的磁盘,broker就不会挂掉。
JBOD的管理需要更加细粒度化——目前Kafka没有提供脚本或其他工具用于在不同磁盘间进行分区手动分配,但这是使用JBOD方案中必要的功能。
- JBOD也应该提供类似于负载均衡的功能——目前只是简单地依赖轮询的方式为新副本数据选择磁盘,后续需要提供更加丰富的策略。
结合JBOD和RAID之间的优劣对比以及LinkedIn公司的实际案例,笔者认为:对于一般的公司或组织而言,选择JBOD方案的性价比更高。另外推荐用户为每个broker都配置多个日志路径,每个路径都独立挂载在不同的磁盘上,这使得多块物理磁盘磁头同时执行物理I/O写操作,可以极大地加速Kafka消息生产的速度。
最后关于磁盘的一个建议就是,尽量不要使用NAS(Network Attached Storage)这样的网络存储设备。对比本地存储,人们总是以为NAS方案速度更快也更可靠,其实然。NAS一个很大的弊端在于,它们通常都运行在低端的硬件上,这就使得它们的性能很差,可能比一台笔记本电脑的硬盘强不了多少,表现为平均延时有很大的不稳定性,而几乎所有高端的NAS设备厂商都售卖专有的硬件设备,因此成本的开销也是一个需要考虑的因素。
综合以上所有的考量,笔者给硬盘规划的结论性总结如下。
- 追求性价比的公司可以考虑使用JBOD。
- 使用机械硬盘完全可以满足Kafka集群的使用,SSD更好。
磁盘容量规划
Kafka集群到底需要多大的磁盘容量?
Kafka的每条消息都保存在实际的物理磁盘中,这些消息默认会被broker保存一段时间之后清除。这段时间是可以配置的,因此用户可以根据自身实际业务场景和存储需求来大致计算线上环境所需的磁盘容量。
让我们以一个实际的例子来看下应该如何思考这个问题。假设在你的业务场景中,clients每天会产生1亿条消息,每条消息保存两份并保留一周的时间,平均一条消息的大小是1KB,那么我们需要为Kafka规划多少磁盘空间呢?如果每天1亿条消息,那么每天产生的消息会占用1亿×2×1KB/1000/1000=200GB的磁盘空间。我们最好再额外预留10%的磁盘空间用于其他数据文件(比如索引文件等)的存储,因此在这种使用场景下每天新发送的消息将占用210GB左右的磁盘空间。因为还要保存一周的数据,所以整体的磁盘容量规划是210×7≈1.5TB。当然,这是无压缩的情况。如果在clients启用了消息压缩,我们可以预估一个平均的压缩比(比如0.5),那么整体的磁盘容量就是0.75TB。
总之对于磁盘容量的规划和以下多个因素有关。
- 新增消息数。
- 消息留存时间。
- 平均消息大小。
- 副本数。
- 是否启用压缩。
内存规划
乍一看似乎关于内存规划的讨论没什么必要,毕竟用户能做的就只是分配一个合适大小的内存,其他也没有可以调整的地方了。其实不然!Kafka对于内存的使用可称作其设计亮点之一。虽然在前面我们强调了Kafka大量依靠文件系统和磁盘来保存消息,但其实它还会对消息进行缓存,而这个消息缓存的地方就是内存,具体来说是操作系统的页缓存(page cache)。
Kafka虽然会持久化每条消息,但其实这个工作都是底层的文件系统来完成的,Kafka仅仅将消息写入page cache而已,之后将消息“冲刷”到磁盘的任务完全交由操作系统来完成。另外consumer在读取消息时也会首先尝试从该区域中查找,如果直接命中则完全不用执行耗时的物理I/O操作,从而提升了consumer的整体性能。不论是缓冲已发送消息还是待读取消息,操作系统都要先开辟一块内存区域用于存放接收的Kafka消息,因此这块内存区域大小的设置对于Kafka的性能就显得尤为关键了。
Kafka对于Java堆内存的使用反而不是很多,因为Kafka中的消息通常都属于“朝生夕灭”的对象实例,可以很快地垃圾回收(GC)。一般情况下,broker所需的堆内存都不会超过6GB。所以对于一台16GB内存的机器而言,文件系统page cache的大小甚至可以达到10~14GB!
除以上这些考量之外,用户还需要把page cache大小与实际线上环境中设置的日志段大小相比较(关于日志段的描述会在第6章中详细展开)。假设单个日志段文件大小设置为10GB,那么你至少应该给予page cache 10GB以上的内存空间。这样,待消费的消息有很大概率会保存在页缓存中,故consumer能够直接命中页缓存而无须执行缓慢的磁盘I/O读操作。
总之对于内存规划的建议如下。
- 尽量分配更多的内存给操作系统的page cache。
- 不要为broker设置过大的堆内存,最好不超过6GB。
- page cache大小至少要大于一个日志段的大小。
CPU规划
比起磁盘和内存,CPU于Kafka而言并没有那么重要——严格来说,Kafka不属于计算密集型(CPU-bound)的系统,因此对于CPU需要记住一点就可以了:追求多核而非高时钟频率。简单来说,Kafka的机器有16个CPU核这件事情比该机器CPU时钟高达4GHz更加重要,因为Kafka可能无法充分利用这4GHz的频率,但几乎肯定会用满16个CPU核。Kafka broker通常会创建几十个后台线程,再加上多个垃圾回收线程,多核系统显然是最佳的配置选择。
当然,凡事皆有例外。若clients端启用了消息压缩,那么除了要为clients机器分配足够的CPU资源外,broker端也有可能需要大量的CPU资源——尽管Kafka 0.10.0.0改进了在broker端的消息处理,免除了解压缩消息的负担以节省磁盘占用和网络带宽,但并非所有情况下都可以避免这种解压缩(比如clients端和broker端配置的消息版本号不匹配)。若出现这种情况,用户就需要为broker端的机器也配置充裕的CPU资源。
基于以上的判断依据,我们对CPU资源规划的建议如下。
- 使用多核系统,CPU核数最好大于8。
- 如果使用Kafka 0.10.0.0之前的版本或clients端- 与broker端消息版本不一致(若无显式配置,这种情况多半由clients和broker版本不一致造成),则考虑多配置一些资源以防止消息解压缩操作消耗过多CPU。
带宽规划
对于Kafka这种在网络间传输大量数据的分布式数据管道而言,带宽资源至关重要,并且特别容易成为系统的瓶颈,因此一个快速且稳定的网络是Kafka集群搭建的前提条件。低延时的网络以及高带宽有助于实现Kafka集群的高吞吐量以及用户请求处理低延时。
当前主流的网络环境皆是使用以太网,带宽主要也有两种:1Gb/s和10Gb/s,即平时所说的千兆位网络和万兆位网络。无论是哪种带宽,对于大多数的Kafka集群来说都足矣了。
举一个实际的例子来说明如何规划带宽资源。假设用户网络环境中的带宽是1Gb/s,用户的业务目标是每天用1小时处理1TB的业务消息,那么在这种情况下Kafka到底需要多少台机器呢?让我们来计算一下:网络带宽是1Gb/s,即每秒传输1Gb的数据,假设分配的机器为Kafka专属使用(通常不建议与其他应用或是框架部署在同一台机器上)且为Kafka分配70%的带宽资源——考虑到机器上还有其他的进程使用网络且网卡通常不能用满,超过一定阈值可能出现网络丢包的情况,因此70%的设定实际上是很合理的——那么Kafka单台broker的带宽就是1Gb/s×0.7≈710Mb/s,但事实上这是Kafka所使用的最高带宽,用户不能奢望Kafka集群平时就一直使用如此多的带宽,毕竟万一碰到突发流量,会极容易把网卡“打满”,因此在70%的基础上,一般再截取1/3,即710Mb/s/3≈240Mb/s。这里的1/3是一个相对保守的数字,用户可以根据自身的业务特点酌情增加。好了,根据现有的网络情况,我们明确了单台broker的带宽是240Mb/s。如果要在1小时内处理1TB的业务消息,即每秒需要处理292MB左右的数据,也就是每秒2336Mb数据,那么至少需要2336/240≈10台broker机器。若副本数是2,那么这个数字还需要再翻1倍,即20台broker机器。根据万兆位网卡来评估broker机器的方法是类似的。
关于带宽资源方面的规划,用户还需要注意的是尽量避免使用跨机房的网络环境,特别是那些跨城市甚至是跨大洲的网络。因为这些网络条件下请求的延时将会非常高,不管是broker端还是clients端都需要额外做特定的配置才能适应。
综合上述内容,我们对带宽资源规划的建议如下。
- 尽量使用高速网络。
- 根据自身网络条件和带宽来评估Kafka集群机器数量。
- 避免使用跨机房网络。
典型线上配置
下面给出一份典型的线上环境配置,用户可以参考这份配置以及结合自己的实际情况进行二次调整。
- CPU 24核。
- 内存32GB。
- 磁盘1TB 7200转SAS盘两块。
- 带宽1Gb/s。
- ulimit-n 1000000.
- Socket Buffer至少64KB——适用于跨机房网络传输。
伪分布式环境安装
单节点伪分布式环境是指集群由一台ZooKeeper服务器和一台Kafka broker服务器组成,如图3.1所示。
为了搭建图3.1中的单节点Kafka集群,我们必须依次执行以下操作。
- 安装Java。
- 安装ZooKeeper。
- 安装Apache Kafka。
安装java
不论是ZooKeeper还是Kafka都需要提前安装好Java并且正确配置好Java环境。鉴于Java 7自2015年8月便不再更新,笔者强烈建议Java虚拟机版本使用Java 8,并且使用比较成熟的Oracle公司的HotSpot虚拟机。为节省篇幅,本章将以Linux CentOS 6 64位作为安装演示的操作系统,其他Linux发行版的安装方法是类似的。Windows和Mac OS平台上的安装步骤参考官网。
Java安装可以使用java -version命令来进行验证。默认情况下,该命令输出显示安装过的Java版本。若没有安装Java,该命令会提示“无法找到Java虚拟机。另外笔者推荐使用Oracle版本的虚拟机,而非OpenJDK版本的虚拟机。
安装ZooKeeper
ZooKeeper是安装Kafka集群必要的组件,并且Kafka大量地使用ZooKeeper来保存集群的元数据信息以及consumer位移信息(老版本)。虽然在伪分布式集群中直接使用Kafka自带的ZooKeeper可能更方便,但其实单独安装一个外部的ZooKeeper服务器同样很简单。
安装单节点Kafka集群
第一步我们下载Apache Kafka,官网地址是http://kafka.apache.org/downloads.html。截至笔者写稿时,当前最新的版本是1.0.0,因此我们需要下载的文件是kafka_2.11-1.0.0.tgz。笔者在这里选用由Scala 2.11编译的版本,因为社区刚刚加入对Scala 2.12的支持且Scala 2.12不支持Java 7,某些用户可能无法使用该版本编译的Kafka。
文件下载完成之后执行解压缩操作并创建保存Kafka数据的文件目录:
tar -zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.11-1.0.0 kafka
mkdir -p /home/work/kafka/data-logs
cd kafka
之后打开config目录下的server.properties文件,修改下列配置:
log.dirs=/home/work/kafka/data-logs
然后保存修改并退出。最后通过下列命令启动Kafka broker:
bin/kafka-server-start.sh config/server.properties
如果你想要在后台运行Kafka broker,只需要在启动命令中加入-daemon:
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
看到以下输出证明启动成功:
INFO [Kafka Server 0], started(kafka.server.KafkaServer)
多节点环境安装
单节点的Kafka伪集群应付日常的应用开发或是功能验证绰绰有余,但若在生产环境中直接使用则无法充分利用Kafka提供的分布式特性,比如负载均衡和故障转移等。此外,Kafka还具有优秀的准线性扩容的能力,因此用户可以很容易地扩展Kafka节点数量以应对不断增长的消息处理需求。同时由于Kafka提供了完备的备份机制,多节点集群天然地为用户提供高可用保障,极大地降低了人工维护的成本。
从本质上来说,多节点Kafka集群由一套多节点ZooKeeper集群和一套多节点Kafka集群组成,如图3.5所示。
- 安装多节点ZooKeeper集群。
- 安装多节点Kafka集群。
安装多节点ZooKeeper集群
目前来说Kafka可以说是强依赖ZooKeeper的,因此生产环境中一个高可用、高可靠的ZooKeeper集群也是必不可少的。ZooKeeper集群通常被称为一个ensemble。只要这个ensemble中的大多数节点存活,那么ZooKeeper集群就能正常提供服务。显然,既然是大多数,那么最好使用奇数个服务器,即2n+1个服务器,这样整个ZooKeeper集群最多可以容忍n台服务器宕机而保证依然提供服务。如果使用偶数个服务器则通常会浪费一台服务器的资源。
下面举一个例子来说明:假设我们使用5台ZooKeeper服务器构建集群,倘若2台服务器宕机,剩下的3台服务器占了半数以上,故而ZooKeeper服务正常工作;但假如我们使用了4台服务器,若2台服务器宕机,剩下的2台服务器不满足“半数以上服务器存活”的条件,因此此时ZooKeeper集群将停止服务——由此可见,虽然使用了4台服务器,但我们依然只能容忍1台服务器崩溃,这就是为什么ZooKeeper集群节点数量通常是奇数的原因。
基于上面的规则,一个生产环境中最少的ZooKeeper集群节点数量是3,这样1个节点“挂掉”了不会影响整个集群的运作。在实际使用场景中,5台服务器构成的ZooKeeper集群也是十分常见的,而再多数量的集群则不常见。当然具体数量的确定要依据用户对高可靠性的需求。通常我们都需要在高可靠性与成本间取得适当的平衡。
在安装ZooKeeper集群之前,我们假定Java已经被正确地安装和配置到了生产环境中,这里不再赘述。本例将安装一个有3个节点的ZooKeeper集群,并假设3个节点机器的主机名分别是zk1、zk2和zk3。多节点模式中所用到的配置文件与我们在单节点ZooKeeper安装相关章节中的配置文件大部分相同,只是有些微小的差别。下面的例子给出了一份典型的多节点环境配置文件zoo.cfg:
-dataDir=/usr/zookeeper/data_dir
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
tickTime:ZooKeeper最小的时间单位,用于丈量心跳时间和超时时间等。通常设置成默认值2秒即可。
dataDir:非常重要的参数!ZooKeeper会在内存中保存系统快照,并定期写入该路径指定的文件夹中。生产环境中需要注意该文件夹的磁盘占用情况。
clientPort:ZooKeeper监听客户端连接的端口,一般设置成默认值2181即可。
initLimit:指定follower节点初始时连接leader节点的最大tick次数。假设是5,表示follower必须要在5×tickTime时间内(默认是10秒)连接上leader,否则将被视为超
syncLimit:设定了follower节点与leader节点进行同步的最大时间。与initLimit类似,它也是以tickTime为单位进行指定的。
server.X=host:port:port:配置文件中的最后3行都是这种形式的。这里的X必须是一个全局唯一的数字,且需要与myid文件中的数字相对应(关于myid文件的设置稍后会做详细讨论)。一般设置X值为1~255之间的整数。这行的后面还配置了两个端口,通常是2888和3888。第一个端口用于使follower节点连接leader节点,而第二个端口则用于leader选举。
设置好配置文件,下面就该创建上面提到的myid文件。众所周知,每个ZooKeeper服务器都有一个唯一的ID。这个ID主要用在两个地方:一个就是刚刚我们配置的zoo.cfg文件,另一个则是myid文件。myid文件位于zoo.cfg中dataDir配置的目录下,其内容也很简单,仅是一个数字,即ID。
首先,在ZooKeeper的conf目录下创建3个配置文件zoo1.cfg、zoo2.cfg和zoo3.cfg。
创建好ZooKeeper配置文件,下一步就是创建myid文件了。myid文件必须位于配置文件中的dataDir中,即/mnt/disk/huxitest/data_logs/zookeeper1,2,3下,具体命令如下:
echo "1" > /mnt/disk/huxitest/data_logs/zookeeper1/myid
echo "2" > /mnt/disk/huxitest/data_logs/zookeeper2/myid
echo "3" > /mnt/disk/huxitest/data_logs/zookeeper3/myid
下一步就是启动3个控制台终端分别在ZooKeeper的安装目录下执行以下命令启动ZooKeeper服务器:
echo "1" > /mnt/disk/huxitest/data_logs/zookeeper1/myid
echo "2" > /mnt/disk/huxitest/data_logs/zookeeper2/myid
echo "3" > /mnt/disk/huxitest/data_logs/zookeeper3/myid
如果是多节点安装方案,既可以使用上面的命令启动ZooKeeper,也可以直接运行zkServer脚本启动ZooKeeper,比如:
bin/zkServer.sh(bat) start conf/zoo.cfg
当所有ZooKeeper服务器启动成功后,我们还需要检查一下整个集群的状态,分别执行以下命令:
cd /mnt/disk/huxitest/zookeeper
bin/zkServer.sh status conf/zoo1.cfg
bin/zkServer.sh status conf/zoo2.cfg
bin/zkServer.sh status conf/zoo3.cfg
~~
另外还可以执行JDK的jps命令来查看ZooKeeper进程以查看集群状态。
ZooKeeper的主进程名是QuorumPeerMain,图3.10的jps命令输出也可以证明已经成功地启动了3个ZooKeeper进程。值得一提的是,如果是在多台机器上搭建ZooKeeper集群,那么每台机器上都至少应该有一个这样的进程被启动。
安装多节点Kafka
搭建Kafka集群的第一步是要创建多份配置文件
在上面3个配置文件中我们需要每台Kafka服务器指定不同的broker ID。该ID在整个集群中必须是唯一的。而配置listeners时最好使用节点的FQDN(Fully Qualified Domain Name),即全称域名,尽量不要使用IP地址。另外配置文件中还有一个特别重要的参数,即zookeeper.connect。鉴于我们之前已经搭建了一个3节点ZooKeeper集群,此时配置zookeeper.connect必须同时指定所有的ZooKeeper节点。注意本例中使用的端口分别是8001、8002和8003。
创建Kafka配置文件之后,剩下的工作就很简单了,只需要执行下列命令启动Kafka broker服务器:
bin/kafka-server-start.sh -daemon config/server1.properties
查看位于Kafka logs目录下的server.log,确认Kafka broker已经启动成功,如图3.14所示。
另外,也可以使用jps命令来确认broker进程启动成功,如图3.15所示。
至此,一个3节点的Kafka分布式集群就搭建成功了。如果需要为集群增加更多的Kafka broker节点,只需要配置一份类似的配置文件,然后利用该文件直接运行启动命令即可。应该说,Kafka对于集群扩展操作是很友好的,不需要用户承担太多的维护和管理成本。
验证部署
成功搭建起多节点的Kafka集群还不够,我们还需要验证线上环境是没有错误且可以使用的。下面将从以下几个方面分别来验证Kafka集群部署-的正确性。
- 测试topic创建与删除。
- 测试消息的生产与发送。
- 生产者吞吐量测试。
- 消费者吞吐量测试。
测试topic创建与删除
topic能够正确地创建与删除才能说明Kafka集群在正常工作,因为通常它都表明了Kafka的控制器(controller)已经被成功地选举出来并开始履行自身的职责。很多用户喜欢在集群搭建后就立即开始运行producer和consumer,甚至完全依靠Kafka的自动topic创建功能,而不去手动创建并做验证。笔者其实并不推荐这种做法,以笔者多年使用Kafka的经验,建议读者在开始使用Kafka集群前最好提前把所需的topic创建出来,并执行对应的命令做验证。这样既可以测试整个集群的运行状况,也保证了producer和consumer运行时不会因为topic分区leader的各种问题导致短暂停顿现象。
接下来,我们首先创建一个测试topic,名为test-topic。为了充分利用搭建的3台服务器,我们创建3个分区,每个分区都分配3个副本,执行如下命令:
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --create --topic test-topic --partitions 3 --replication-factor 3
Created topic "test-topic".
上面的输出表明topic已经被成功创建,但我们还需要运行一些命令来做详细验证,如下:
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 –list
test-topic
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --describe --topic test-topic
topic详细分区信息如图3.16所示。
显而易见,该topic下有3个分区,每个分区有3个副本。每个分区的leader分别是0、1、2,表明Kafka将该topic的这3个分区均匀地在3台broker上进行了分配。
下面来测试删除topic,执行如下命令:
>bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --delete --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
上面的输出仅仅表示该topic被成功地标记为“待删除”,至于topic是否会被真正删除取决于broker端参数delete.topic.enable。该参数在当前Kafka 1.0.0版本中被默认设置为true,即表明Kafka默认允许删除topic。事实上,该参数在旧版本中默认值一直是false,故若用户显式设置该参数为false,或使用了1.0.0之前版本的默认值,那么即使运行了上面的命令,Kafka也不会删除该topic。
由于在之前搭建Kafka集群时我们配置了该参数为true,因此Kafka会将所有与该topic相关的数据全部删除,不过这是一个异步过程,具体的删除操作对命令执行者来说是完全透明的。我们可以查询底层的文件系统来验证topic分区的日志目录是否被删除
另外,也可以再次执行kafka-topics脚本来列出当前的topic列表,如果test-topic不在该列表中,则表明该topic被删除成功,
有些时候,用户可能发现topic没有被成功删除,这可能是topic分区数过多或数据过多的原因。Kafka当前只能一个分区一个分区地删除数据,无法做到同时删除,因此用户可以查询底层的文件系统来判断删除操作是否在正常执行。如果发现删除操作停滞了,这通常表明该Kafka集群有问题,此时便需要进一步地详查才能确定无法删除的真正原因,比如是否有分区正处于被分配过程中等。
测试消息发送与消费
成功测试了topic之后接下来我们测试一下Kafka集群是否可以正常地发送消息和读取消息。为了实现这一目的,我们需要用到Kafka默认提供的kafka-console-producer和kafka-console-consumer脚本。这对脚本可以很方便地用来测试消息的发送和读取。发送消息时,用户从键盘输入消息,按回车键后即表示发送该条消息。下面我们打开两个终端,一个用于发送消息,另一个用于接收消息,如图3.17和图3.18所示。
生产者吞吐量测试
除了基本的console-producer和console-consumer脚本可以用于测试简单的消息发送与接收,Kafka还提供了性能吞吐量测试脚本,它们分别是kafka-producer-perf-test脚本和kafka-consumer-perf-test脚本。
kafka-producer-perf-test脚本是Kafka提供的用于测试producer性能的脚本,该脚本可以很方便地计算出producer在一段时间内的吞吐量和平均延时,具体使用说明如图3.20所示。
图3.20中该脚本的输出结果表明在这台测试机上运行一个Kafka producer的平均吞吐量是8MB/s,即占用64Mb/s左右的带宽,平均每秒能发送41963条消息,平均延时是2.36秒,最大延时是3.51秒,平均有50%的消息发送需要花费2.79秒,95%的消息发送需要花费3.14秒,99%的消息发送需要花费3.36秒,而99.9%的消息发送需要花费3.5秒。本例测试出来的producer占用的带宽是64Mb/s(8 × 8.0.0 MB/s),和千兆位网卡相比,这点带宽远远没有达到网卡的上限,因此这也说明producer还有很大的优化空间。
在kafka-producer-perf-test脚本的实际使用过程中,最好让该脚本长时间稳定地运行一段时间,这样测试出来的结果才能准确。毕竟脚本运行之初会执行很多初始化工作,运行之处的吞吐量不能真实反映系统的实际情况。
消费者吞吐量测试
和kafka-producer-perf-test脚本类似,Kafka为consumer也提供了方便、便捷的性能测试脚本,即kafka-consumer-perf-test脚本。我们首先用它在刚刚搭建的Kafka集群环境中测试一下新版本consumer的吞吐量,如图3.21所示。
图3.21的例子中我们测试消费50万条消息的consumer的吞吐量。结果表明,在该环境中consumer在1秒多的时间内总共消费了95MB的消息,因此吞吐量大约是92MB/s,即736Mb/s。这样我们就对本机consumer的性能有了一个大致的了解,为后续评估整体consumer性能目标提供了一个很有力的基础。
图3.21测试的是新版本consumer的吞吐量,下面依然使用该脚本测试一下旧版本consumer,如图3.22所示。
图3.22中运行consumer指定的是ZooKeeper的连接信息,说明我们使用的是旧版本consumer。结果表明本次测试共消费190MB左右的消息,吞吐量大约是147MB/
参数设置
broker端参数
Kafka broker端提供了很多参数用于调优系统的各个方面,有一些参数是所有Kafka环境都需要考虑和配置的,不论是单机环境还是分布式环境。这些参数都是Kafka broker的基础配置,一定要明确它们的含义。
broker端参数需要在Kafka目录下的config/server.properties文件中进行设置。当前对于绝大多数的broker端参数而言,Kafka尚不支持动态修改——这就是说,如果要新增、修改,抑或是删除某些broker参数的话,需要重启对应的broker服务器。
- broker.id——Kafka使用唯一的一个整数来标识每个broker,这就是broker.id。该参数默认是-1。如果不指定,Kafka会自动生成一个唯一值。总之,不管用户指定什么都必须保证该值在Kafka集群中是唯一的,不能与其他broker冲突。在实际使用中,推荐使用从0开始的数字序列,如0、1、2……
- log.dirs——非常重要的参数!该参数指定了Kafka持久化消息的目录。若待保存的消息数量非常多,那么最好确保该文件夹下有充足的磁盘空间。该参数可以设置多个目录,以逗号分隔,比如/home/kafka1,/home/kafka2。在实际使用过程中,指定多个目录的做法通常是被推荐的,因为这样Kafka可以把负载均匀地分配到多个目录下。若用户机器上有N块物理硬盘(并且假设这台机器完全给Kafka使用),那么设置N个目录(须挂载在不同磁盘上的目录)是一个很好的选择。N个磁头可以同时执行写操作,极大地提升了吞吐量。注意,这里的“均匀”是根据目录下的分区数进行比较的,而不是根据实际的磁盘空间。值得一提的是,若不设置该参数,Kafka默认使用/tmp/kafka-logs作为消息保存的目录。把消息保存在/tmp目录下,在实际的生产环境中是极其不可取的。
zookeeper.connect——同样是非常重要的参数。如果说前两个参数还有默认值可以使用的话(虽然极其不推荐将默认值应用到线上环境),那么此参数则完全没有默认值,是必须要设置的。该参数也可以是一个CSV(comma-separated values)列表,比如在前面的例子中设置的那样:zk1:2181,zk2:2181,zk3:2181。如果要使用一套ZooKeeper环境管理多套Kafka集群,那么设置该参数的时候就必须指定ZooKeeper的chroot,比如zk1:2181,zk2:2181,zk3:2181/kafka_cluster1。结尾的/kafka_cluster1就是chroot,它是可选的配置,如果不指定则默认使用ZooKeeper的根路径。在实际使用过程中,配置chroot可以起到很好的隔离效果。这样管理Kafka集群将变得更加容易。
listeners——broker监听器的CSV列表,格式是[协议]://[主机名]:[端口],[[协议]]://[主机名]:[端口]]。该参数主要用于客户端连接broker使用,可以认为是broker端开放给clients的监听端口。如果不指定主机名,则表示绑定默认网卡;如果主机名是0.0.0.0,则表示绑定所有网卡。Kafka当前支持的协议类型包括PLAINTEXT、SSL及SASL_SSL等。对于新版本的Kafka,笔者推荐只设置listeners一个参数就够了,对于已经过时的两个参数host.name和port,就不用再配置了。对于未启用安全的Kafka集群,使用PLAINTEXT协议足矣。如果启用了安全认证,可以考虑使用SSL或SASL_SSL协议。
advertised.listeners——和listeners类似,该参数也是用于发布给clients的监听器,不过该参数主要用于IaaS环境,比如云上的机器通常都配有多块网卡(私网网卡和公网网卡)。对于这种机器,用户可以设置该参数绑定公网IP供外部clients使用,然后配置上面的listeners来绑定私网IP供broker间通信使用。当然不设置该参数也是可以的,只是云上的机器很容易出现clients无法获取数据的问题,原因就是listeners绑定的是默认网卡,而默认网卡通常都是绑定私网IP的。在实际使用场景中,对于配有多块网卡的机器而言,这个参数通常都是需要配置的。
unclean.leader.election.enable——是否开启unclean leader选举。何为unclean leader选举?在第1章中我们提到了ISR的概念。ISR中的所有副本都有资格随时成为新的leader,但若ISR变空而此时leader又宕机了,Kafka应该如何选举新的leader呢?为了不影响Kafka服务,该参数默认值是false,即表明如果发生这种情况,Kafka不允许从剩下存活的非ISR副本中选择一个当leader。因为如果允许,这样做固然可以让Kafka继续提供服务给clients,但会造成消息数据的丢失,而在一般的用户使用场景中,数据不丢失是基本的业务需求,因此设置此参数为false显得很有必要。事实上,Kafka社区在1.0.0版本才正式将该参数默认值调整为false,这表明社区在高可用性与数据完整性之间选择了后者。
delete.topic.enable——是否允许Kafka删除topic。默认情况下,Kafka集群允许用户删除topic及其数据。这样当用户发起删除topic操作时,broker端会执行topic删除逻辑。在实际生产环境中我们发现允许Kafka删除topic其实是一个很方便的功能,再加上自Kafka 0.9.0.0新增的ACL权限特性,以往对于误操作和恶意操作的担心完全消失了,因此设置该参数为true是推荐的做法。
log.retention.{hours|minutes|ms}——这组参数控制了消息数据的留存时间,它们是“三兄弟”。如果同时设置,优先选取ms的设置,minutes次之,hours最后。有了这3个参数,用户可以很方便地在3个时间维度上设置日志的留存时间。默认的留存时间是7天,即Kafka只会保存最近7天的数据,并自动删除7天前的数据。当前较新版本的Kafka会根据消息的时间戳信息进行留存与否的判断。对于没有时间戳的老版本消息格式,Kafka会根据日志文件的最近修改时间(last modified time)进行判断。可以这样说,这组参数定义的是时间维度上的留存策略。实际线上环境中,需要根据用户的业务需求进行设置。保存消息很长时间的业务通常都需要设置一个较大的值。
log.retention.bytes——如果说上面那组参数定义了时间维度上的留存策略,那么这个参数便定义了空间维度上的留存策略,即它控制着Kafka集群需要为每个消息日志保存多大的数据。对于大小超过该参数的分区日志而言,Kafka会自动清理该分区的过期日志段文件。该参数默认值是-1,表示Kafka永远不会根据消息日志文件总大小来删除日志。和上面的参数一样,生产环境中需要根据实际业务场景设置该参数的值。
min.insync.replicas——该参数其实是与producer端的acks参数配合使用的。关于acks含义的介绍,我们留到第4章中详细展开。这里只需要了解acks=-1表示producer端寻求最高等级的持久化保证,而min.insync.replicas也只有在acks=-1时才有意义。它指定了broker端必须成功响应clients消息发送的最少副本数。假如broker端无法满足该条件,则clients的消息发送并不会被视为成功。它与acks配合使用可以令Kafka集群达成最高等级的消息持久化。在实际使用中如果用户非常在意被发送的消息是否真的成功写入了所有副本,那么推荐将参数设置为副本数-1。举一个例子,假设某个topic的每个分区的副本数是3,那么推荐设置该参数为2,这样我们就能够容忍一台broker宕机而不影响服务;若设置参数为3,那么只要任何一台broker宕机,整个Kafka集群将无法继续提供服务。因此用户需要在高可用和数据一致性之间取得平衡。
num.network.threads——一个非常重要的参数。它控制了一个broker在后台用于处理网络请求的线程数,默认是3。通常情况下,broker启动时会创建多个线程处理来自其他broker和clients发送过来的各种请求。注意,这里的“处理”其实只是负责转发请求,它会将接收到的请求转发到后面的处理线程中。在真实的环境中,用户需要不断地监控NetworkProcessorAvgIdlePercent JMX指标。如果该指标持续低于0.3,笔者建议适当增加该参数的值。在第8章中我们将会详细探讨各种JMX监控。
num.io.threads——这个参数就是控制broker端实际处理网络请求的线程数,默认值是8,即Kafka broker默认创建8个线程以轮询方式不停地监听转发过来的网络请求并进行实时处理。Kafka同样也为请求处理提供了一个JMX监控指标Request HandlerAvgIdlePercent。如果发现该指标持续低于0.3,则可以考虑适当增加该参数的值。
message.max.bytes——Kafka broker能够接收的最大消息大小,默认是977KB,还不到1MB,可见是非常小的。在实际使用场景中,突破1MB大小的消息十分常见,因此用户有必要综合考虑Kafka集群可能处理的最大消息尺寸并设置该参数值。
上面这些参数是笔者认为最重要的broker端参数,但不可否认的是,Kafka broker端的参数远远不止这些。笔者粗略数了一下,以Kafka 1.0.0版本为例,broker端的参数有167个之多。显然我们不可能涵盖所有的参数,其他的参数含义以及使用方法详见Kafka官网https://kafka.apache.org/documentation/#brokerconfigs。
topic级别参数
除broker端参数之外,Kafka还提供了一些topic级别的参数供用户使用。所谓的topic级别,是指覆盖broker端全局参数。每个不同的topic都可以设置自己的参数值。举一个例子来说,上面提到的日志留存时间。显然,在实际使用中,在全局设置一个通用的留存时间并不方便,因为每个业务的topic可能有不同的留存策略。如果只能设置全局参数,那么势必要取所有业务留存时间的最大值作为全局参数值,这样必然会造成空间的浪费。因此Kafka提供了很多topic级别的参数,常见的包括如下几个。
delete.retention.ms——每个topic可以设置自己的日志留存时间以覆盖全局默认值。
max.message.bytes——覆盖全局的message.max.bytes,即为每个topic指定不同的最大消息尺寸。
retention.bytes——覆盖全局的log.retention.bytes,每个topic设不同的日志留存尺寸。
GC参数
kafka broker端代码虽然使用scala语言编写,但终归要编译为.class文件在JVM 上运行。既然是JVM上运行,垃圾回收参数设置就显得非常重要
对于使用java7的用户 ,那么在选择GC收集器时可以根据以下法则进行确认
如果用户机器上的CPU资源非常充裕,那么推荐使用CMS收集器。这样可以充分利用多CPU执行并发垃圾收集。启用方法 -XX:+UseCurrentMarkSweepGC.
相反地,则使用吞吐量收集器,及所谓的throughput collector。这样不会挤占紧张的CPU资源,使kafka broker达到最大的吞吐量。启用方法-XX:UseParallel GC
若用户使用java8——这是推荐的版本。实际上如果用户在kafka官网上下载使用scala 2.22编译的kafka二进制压缩包。那么就必须安装并使用java8-推荐使用G1垃圾收集器。根据笔者的实际使用经验,在没有任何调优的情况下,G1收集器本身回比CMS表现出更好的性能,主要体现在Full GC的次数更少、需要微调的参数更少等方面。因此推荐用户始终使用G1收集器,不论实在broker端还是在clients端
除此之外,我们还需要打开GC日志的监控,并实时确保不会出现“G1HR #StartFullGC”.
只有uG1的其他擦书,可以根据使用实际情况酌情做微小调整。
JVM参数
kafka推荐用户使用 oracle JKD版本使java8.另外由于kafka broker主要使用的是堆外内存,即大量使用操作系统的页缓存,因此其实并不需要为JVM分配太多的内存。在实际中,通常为broker设置不超过6GB的堆空间。一下就是一份典型的生产环境中的JVM参数列表:
-Xmx6g -Xmx6g -XX:metaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinmetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
OS参数
kafka虽然支持很多平台,但到目前为止被广泛使用并已被证明表现良好的平台依然是linux平台。
通常情况下,kafka并不需要太多的os级别的参数调优,但依然有一些OS参数是必须调整的。
文件描述符限制:kafka会频繁的创建并修改文件系统的中文件,这包括消息的日志文件、索引文件及各种元数据管理文件等。因此如果一个broker上面有很多topic的分区,那么这个broker势必就需要打开很多个文件——大致数量约等于分区数 × (分区总大小、日志段大小)× 3.举一个例子,假设broker上保存了50个分区,每个分区平均尺寸是10GB,每个日志段大小是1GB,那么这个broker需要维护1500个左右的文件描述符。因此在实际使用场景中最好首先增大进程能够打开的最大文件描述符上限,比如设置一个很大的值,如100000.具体设置方法为ulimit -n 100000.
Socket缓冲区大小:这里指的是OS级别的socker缓冲区大小,而非kafka自己提供的socker缓冲区参数。事实上。kafka自己的参数将其设置为64KB,这对于普通的内网环境而言通常是足够的,因为内网环境下往返时间(RTT)一般都很低,不会产生过多的数据堆积在socket缓冲区中,但对于那些跨地区的数据传输而言,仅仅增加kafka参数就不够了,因为前者也受限于OS级别的设置。因此如果是做远距离的数据传输,那么建议将OS级别的socket缓冲区调大,比如增加到128KB,甚至更大。
最好使用Ext4 或XFS文件系统:其实kafka操作的都是普通文件,并没有依赖于特定的文件系统,但依然推荐使用Ext4或XFS文件系统,特别是XFS通常有着更好的性能。这种性能的提升主要影响的是kafka的写入功能。根据官网的测试报告,使用XFS的写入时间大约是160ms,而使用Ext4大约是250ms。因此生产环境中最好使用XFS文件系统。
关闭swap:其实这是很多使用磁盘的应用程序的常规调优手段。具体命令为sysctl vm.swappiness = <一个较小的数>,即大幅度江都swap空间的使用,以免极大地拉低性能,后面章节会详细讨论为何不显示设置该值为0
设置更长的flush时间: 我们知道kafka依赖OS页缓存的“刷盘”功能实现消息真正的写入物理磁盘,默认刷盘间隔是5秒。通常情况下,这个间隔时间太短了,适当增加该值可以在很大程度上提升OS物理写入操作的性能,LinkedIn公司自己将该值设置为2分钟以增加整体的物理写入吞吐量。
总结
本章着重讲述了如何搭建kafka生产环境以及设置kafka集群需要考虑的各方面参数,并且针对如何评估kafka线上集群分别从各个方面进行了探讨。
下一章节:producer开发