brew直接安装,包含了zookeeper:
brew install kafka
安装完成后会提示启动命令:
//zookeeper启动
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
zkServer start
//kafka启动
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
其中安装路径:
==> Summary
🍺 /usr/local/Cellar/kafka/2.6.0
配置路径:
/usr/local/etc/kafka/zookeeper.properties
/usr/local/etc/kafka/server.properties
输入命令brew services start zookeeper先启动zk,brew services start kafka启动kafka。可以查看配置文件,zk默认端口2181,Kafka默认端口9092。
进入kafka安装目录/usr/local/Cellar/kafka/2.6.0/bin,并创建一个topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test1
查看topic创建情况:
bin % kafka-topics --list --zookeeper localhost:2181
test1
创建生产者:
kafka-console-producer --broker-list localhost:9092 --topic test1
创建消费者:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
生产者端输入并回车,可以看到消费者端收到消息,说明正常能消费。
在springboot中使用。
pom.xml中引入kafka依赖:
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
yml中添加kafka配置:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
client-id: test-client
group-id: test-consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
添加生产者producer接口:
@Controller
public class TestController {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/test/kafka")
public void send(String msg) {
kafkaTemplate.send("test1", msg);
}
}
添加消费者监听:
@Component
public class TestKafkaListener {
@KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void partition0(String msgData) {
System.out.println("demo3 receive : " + msgData + ", partition: 0" );
}
@KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
public void partition1(String msgData) {
System.out.println("demo3 receive : " + msgData + ", partition: 1" );
}
@KafkaListener(id = "c3", topicPartitions = {@TopicPartition(topic = "test1", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "0"))})
public void listenPartitionOnly(String msgData) {
System.out.println("demo3 receive : " + msgData + ", partition: 2" );
}
}
注解中必须指定topic,可以指定partition和开始消费的消息offset。
启动项目,并在浏览器访问:http://localhost:8080/test/kafka?msg=123,可以看到消费者收到数据:
demo3 receive : 123, partition: 2