多节点集群可以提供高可用保障,充分利用 kafka 的分布式特性,例如负载均衡、故障转移等。
我们需要安装:
- 多节点 zookeeper 集群。
- 多节点 kafka 集群。
安装 zookeeper 集群
这里我们使用3个节点安装 zk。
(1)准备工作
在3台服务器上都安装好 java,并下载解压 zookeeper。
配置好 hosts:
$ vim /etc/hosts
# 添加
10.20.12.191 server191
10.20.12.192 server192
10.20.12.195 server195
(2)创建数据目录
在3台服务器上都执行:
$ mkdir /data/zk
(3)创建配置文件
在3台服务器上都创建文件 conf/zoo.cfg,内容:
# zk 最小时间单位,丈量心跳时间、超时时间等
tickTime=2000
# zk 会在内存中保存系统快照,定期写入此文件夹
dataDir=/data/zk
# zk 监听client连接的端口
clientPort=2181
# follower 初始连接 leader 时的最大 tick 次数
# 例如设为5,表示 follower 要在 5xtickTime 时间内连接成功,否则超时
initLimit=5
# follower 与 leader 同步的最大时间
syncLimit=2
# zk 节点列表,形式:server.X=host:port:port
# X 是节点ID,需要与后面设置的 myid 一致
# host 是节点地址
# 第1个port是 follower 连接 leader 的端口
# 第2个port用于 leader 选举
server.1=server195:2888:3888
server.2=server192:2888:3888
server.3=server191:2888:3888
(4)创建 myid 文件
在3台服务器的 dataDir
目录下创建 myid 文件,文件内容只是数字。
例如第一个节点上 /data/zk/myid 的内容就是 1
。
# 节点1
$ echo "1" > /data/zk/myid
# 节点2
$ echo "2" > /data/zk/myid
# 节点3
$ echo "3" > /data/zk/myid
(4)启动
在3台服务器上都执行:
$ nohup bin/zkServer.sh start conf/zoo.cfg >> /data/zk/nohup.out 2>&1 &
在后台启动,输出信息在 /data/zk/nohup.out 文件中。
查看 zk 状态:
$ bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /data/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader
Mode 有 leader、follower。
如果是失败状态,例如:
Error contacting service. It is probably not running.
可以等待一会儿,多节点的沟通需要时间,大概30秒左右。
还可以使用 jps
命令查看 zk 是否启动:
$ jps
# 输出结果
16065 QuorumPeerMain
...
QuorumPeerMain 就是 zk 的主进程。
如果启动失败,可以查看错误信息:
$ bin/zkServer.sh start-foreground
停止 zk 的命令:
$ bin/zkServer.sh stop
安装 kafka 集群
(1)准备工作
还是使用zk那3台服务器,每个服务器中都下载kafka,解压,并创建log目录(具体位置自己定义):
$ mkdir -p /data/kafka/data-logs
(2)修改配置文件
在3台服务器中进入kafka安装目录下,修改配置文件 config/server.properties,修改之前先备份一份:
$ cp config/server.properties config/server.properties.bak
打开配置文件,找到以下几项并修改:
- server1
broker.id=0
listeners=PLAINTEXT://server191:9092
advertised.listeners=PLAINTEXT://server191:9092
log.dirs=/data/kafka/data-logs
zookeeper.connect=server191:2181,server192:2181,server195:2181
- server2
broker.id=1
listeners=PLAINTEXT://server192:9092
advertised.listeners=PLAINTEXT://server192:9092
log.dirs=/data/kafka/data-logs
zookeeper.connect=server191:2181,server192:2181,server195:2181
- server3
broker.id=2
listeners=PLAINTEXT://server195:9092
advertised.listeners=PLAINTEXT://server195:9092
log.dirs=/data/kafka/data-logs
zookeeper.connect=server191:2181,server192:2181,server195:2181
只有 broker.id 和 listeners 的地址不同。
(3)启动
3台服务上都执行:
$ echo > /data/kafka/nohup.out
$ nohup bin/kafka-server-start.sh config/server.properties >> /data/kafka/nohup.out 2>&1 &
在 /data/kafka/nohup.out 中查看启动信息:
$ tail -f /data/kafka/nohup.out
# 或者
$ cat /data/kafka/nohup.out
(4)消息测试
- 创建 topic
$ bin/kafka-topics.sh --create --zookeeper server191:2181,server192:2181,server195:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
# 查看 topic 列表
$ bin/kafka-topics.sh --list --zookeeper server191:2181,server192:2181,server195:2181
# 查看 topic 描述
$ bin/kafka-topics.sh --describe --zookeeper server191:2181,server192:2181,server195:2181 --topic my-replicated-topic
- 生成消息
$ bin/kafka-console-producer.sh --broker-list server191:9092,server192:9092,server195:9092 --topic my-replicated-topic
# 出现提示符,输入信息,回车,继续输入
>my test message 1
>my test message 2
# ctrl+c 退出
- 消费消息
$ bin/kafka-console-consumer.sh --bootstrap-server server191:9092,server192:9092,server195:9092 --topic my-replicated-topic --from-beginning