环境:Windows
任务:完成用 kafka-python 库操作 kafka 完成生产和消费Demo
在 Windows 环境下安装 kafka
下载地址:https://kafka.apache.org/downloads.html
点击链接下载到本地,解压就可以了。
文件夹树形结构如下,Windows 平台下可以用的命令放在 bin/windows 下
D:\devSoftware\kafka_2.12-2.5.0>tree
├─bin
│ └─windows
├─config
├─libs
├─logs
└─site-docs
启动 kafka
运行 kafka 需要使用 Zookeeper,如果没有安装 Zookeeper,你可以使用 Kafka 自带打包和配置好的 Zookeeper。
# 进入 windows 脚本目录
cd bin\windows
# 启动 zookeeper,注意配置文件位置
zookeeper-server-start.bat ..\..\config/zookeeper.properties
再打开一个 cmd 窗口,启动 kafka 服务
kafka-server-start.bat ..\..\config/server.properties
这样,就在本机启动了一个默认端口为 9092 的 kafka 服务
安装 kafka-python
pip install kafka-python
生产者代码
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my_topic' , value= b'aaaaaaaaa', partition= 0)
result = future.get(timeout= 10)
print(result)
在 my_topic 主题下,发布消息为 aaaaaaaaa 的消息;
消费者代码
# 引入 KafkaConsumer
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_topic', bootstrap_servers= ['localhost:9092'])
for msg in consumer:
print(msg)
运行消费者代码,他就一直监听生产者发布的消息,只要生产者新产生了一条消息,他就会 print 输出 msg。
其他验证
cmd 中输入命令检查 my_topic 主题下的消息:
# bin\windows 下命令
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic --from-beginning
参考资料:
https://www.orchome.com/6
https://zhuanlan.zhihu.com/p/38330574