- 列出所有消费者组
$ /path/to/bin/kafka-consumer-groups.sh --bootstrap-server xx.xx.xx.xx:9091 --list
$ /path/to/bin/kafka-consumer-groups.sh --bootstrap-server xx.xx.xx.xx:9091,xx.xx.xx.xx:9091,xx.xx.xx.xx:9091 --list
- 查看具体消费者组详情
$ /path/to/bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --group <group 名称 > --describe
$ /path/to/bin/kafka-consumer-groups.sh --bootstrap-server xx.xx.xx.xx:9091,xx.xx.xx.xx:9091,xx.xx.xx.xx:9091 --group test --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic01 0 714 714 0 test-consumer-0-59aac8c1-73fb-4212-863b-c11d65a2947c /192.168.169.131 test-consumer-0
topic02 1 1866 1866 0 test-consumer-0-9bac7bf5-9b4a-4160-ae2d-9b11b9626e8d /192.168.169.130 test-consumer-0
topic03 2 6406 6406 0 test-consumer-0-8490dfda-c7bf-4e5d-a79f-20fba1716070 /192.168.169.130 test-consumer-0
Kafka 连接信息就是 < 主机名:端口 > 对,而 group 名称就是你的消费者程序中设置的 group.id 值。
kafka-consumer-groups 脚本的输出信息很丰富。
首先,它会按照消费者组订阅主题的分区进行展示,每个分区一行数据;
其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(即 LOG-END-OFFSET 列值)、该消费者组当前最新消费消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前两者的差值)、消费者实例 ID、消费者连接 Broker 的主机名以及消费者的 CLIENT-ID 信息。
毫无疑问,在这些数据中,我们最关心的当属 LAG 列的值了,图中每个分区的 LAG 值大约都是 60 多万,这表明,在我的这个测试中,消费者组远远落后于生产者的进度。理想情况下,我们希望该列所有值都是 0,因为这才表明我的消费者完全没有任何滞后。
有的时候,你运行这个脚本可能会出现下面这种情况,如下图所示:
简单比较一下,我们很容易发现它和前面那张图输出的区别,即 CONSUMER-ID、HOST 和 CLIENT-ID 列没有值!
如果碰到这种情况,你不用惊慌,这是因为我们运行 kafka-consumer-groups 脚本时没有启动消费者程序。
请注意我标为橙色的文字,它显式地告诉我们,当前消费者组没有任何 active 成员,即没有启动任何消费者实例。
虽然这些列没有值,但 LAG 列依然是有效的,它依然能够正确地计算出此消费者组的 Lag 值。
除了上面这三列没有值的情形,还可能出现的一种情况是该命令压根不返回任何结果。此时,你也不用惊慌,这是因为你使用的 Kafka 版本比较老,kafka-consumer-groups 脚本还不支持查询非 active 消费者组。一旦碰到这个问题,你可以选择升级你的 Kafka 版本了!
$ cat stat_kafka_lag.sh
#!/bin/bash
kafka_server="xx.xx.xx.xxx:9091,xx.xx.xx.xx:9091,xx.xx.xx.xx:9091"
kafka_bin_path="/kingdee/kafka/bin"
groups=`${kafka_bin_path}/kafka-consumer-groups.sh --bootstrap-server ${kafka_server} --list`
for group in ${groups}
do
echo "####################${group}####################"
${kafka_bin_path}/kafka-consumer-groups.sh --bootstrap-server ${kafka_server} --group ${group} --describe | awk '{print $5}'
done
参考
消费者组消费进度监控都怎么实现?