一、Docker 安装 Kafka 单机版
1、下载镜像
docker pull wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.12-2.3.0
2、启动 zookeeper 容器
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
3、启动 kafka 容器
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.9:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.9:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
这里主要设置了4个参数
KAFKA_BROKER_ID=0
KAFKA_ZOOKEEPER_CONNECT=192.168.1.9:2181
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.9:9092
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
中间两个参数的192.168.1.9改为宿主机器的 IP 地址,如果不这么设置,可能会导致在别的机器上访问不到 kafka 。
注意:若出现 kafka 容器启动后闪退,需要在防火墙中开启相应的端口,使用以下命令,在防火墙中增加2181和9092端口并重启。
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --reload
4、测试 kafka
启动消息发送者
#进入 kafka 容器的命令行
docker exec -it kafka /bin/bash
#进入kafka所在目录
cd opt/kafka_2.12-2.3.0/
启动消息发送者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkaTest
再新建一个主机会话,用来启动消费者
#进入 kafka 容器的命令行
docker exec -it kafka /bin/bash
#进入kafka所在目录
cd opt/kafka_2.12-2.3.0/
启动消息消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkaTest --from-beginning
在消息发送发送者命令行输入123456,然后再在消息接收者查看,看到123456 消息则代表 kafka 单机版搭建成功。
二、SpringBoot 项目中简单使用
1、引入 maven 依赖,版本参考官方
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置 yml 文件
spring:
kafka:
# kafka服务器地址(可以多个)
bootstrap-servers: 192.168.1.9:9092
consumer:
# 指定一个默认的组名
group-id: kafkaGroup
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取字节数
batch-size: 65536
# 缓存容量
buffer-memory: 524288
# 服务器地址
bootstrap-servers: 192.168.1.9:9092
3、写一个简单的 Controller ,里面包含发送消息和接收消息
@EnableAutoConfiguration
@RestController
@Slf4j
public class Controller {
/**
* 注入kafkaTemplate
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "testTopic";
@RequestMapping("/kafkaSend")
public String testKafkaSend() {
for (int i = 1; i < 6; i++) {
kafkaTemplate.send(TOPIC,"key" + i, "data" + Math.random());
}
return "success";
}
/**
* 消费者监听消息
*/
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<?, ?> consumer) {
log.info("{} - {}:{}", consumer.topic(), consumer.key(), consumer.value());
}
}
4、进行测试,发送请求http://localhost:8080/kafkaSend,可看到控制台打印如下: