一、kafka搭建
二、版本
springboot版本
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
kafka版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
三、基本的配置application.yml
实际上只有bootstrap-servers是必须配置的。
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 192.168.31.10:9092,192.168.31.10:9093,192.168.31.10:9094 bootstrap-servers: 192.168.31.10:9092,192.168.31.10:9093,192.168.31.10:9094 topic:
Name: home.bus.log #自定义topic名称
numPartitions: 2 #自定义分区
replicationFactor: 2 #自定义副本
consumer:
group-id: home.bus.log.group.1 auto-offset-reset: latest
enable-auto-commit:true auto-commit-interval: 20000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
四、自定义topic
如果不配置topic属性,系统会用缺省的,当然名字得需要配置,也可以在生产者中直接使用常量
@Configuration
@EnableKafkapublicclass KafkaTopicConfig { @Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${spring.kafka.topic.Name}")
private String topicName;
@Value("${spring.kafka.topic.numPartitions}")
privateint numPartitions;
@Value("${spring.kafka.topic.replicationFactor}")
privateint replicationFactor;
@Bean
public KafkaAdmin kafkaAdmin() { Map configs =newHashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
returnnew KafkaAdmin(configs);
}
@Bean
public NewTopic myTopic() {//第三个参数是副本数量,确保集群中配置的数目大于等于副本数量returnnewNewTopic(topicName, numPartitions, (short) replicationFactor);
}
}
通过配置,如果kafka服务器上没有创建topic,则会按照自定义属性来创建,如果topic名称已经创建了,那么NewTopic将不会创建新的topic,无论topic其他分区和副本属性是否相同
五、自定义的producer和consumer
简单的使用,不需要自定义,yml文件也进行了基本的配置,如果需要自定义在参照如下额配置:
//@Configuration//@EnableKafkapublicclass KafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServer;
/* --------------producer configuration-----------------**/ @Bean
publicMap producerConfigs() {
Map props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
publicProducerFactory producerFactory() {
returnnewDefaultKafkaProducerFactory<>(producerConfigs());
}
/* --------------kafka template configuration-----------------**/ @Bean
publicKafkaTemplate kafkaTemplate() {
KafkaTemplate kafkaTemplate =newKafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
//@Configuration//@EnableKafkapublicclass KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Bean
publicMap consumerConfigs() {
Map props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
publicConsumerFactory consumerFactory() {
returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());
}
}
六、创建生产者producer
根据需要直接调用就可以。
@ServicepublicclassKafkaServiceImplimplements MQService {
@Value("${spring.mqconfig.mq-enable}")
privateboolean mqEnable;
@Value("${spring.kafka.topic.Name}")
private String topicName;
privateLogger logger = LoggerFactory.getLogger(KafkaServiceImpl.class);
@Resource
privateKafkaTemplate kafkaTemplate;
publicboolean isMqEnable() {
return mqEnable;
}
publicvoidsetMqEnable(boolean mqEnable) {
this.mqEnable = mqEnable;
}
@Override
@Async("logThread") //异步,可以注销,除非网络出问题,否则发送消息到kafka服务器非常的快
publicvoid sendMessage(String msg) {
if(!isMqEnable())return;
longstart = System.currentTimeMillis();
kafkaTemplate.send(topicName,msg);
longend = System.currentTimeMillis();
logger.info("写入kafka,耗时:"+(end-start)+"毫秒");
}
}
七、创建消费者consumer
因为生产消息的时候进行了json封装,获取消息的时候对应进行反序列化
@ServicepublicclassMQConsumerServiceImplimplements MQConsumerService {
private ObjectMapper objectMapper;
privateList mqMessageList;
privatelongmaxMessageCount=100;
@Override
publicList getMessage() {
return mqMessageList;
}
@KafkaListener(topics = "${spring.kafka.topic.Name}")
privatevoidconsumer(ConsumerRecord record)
{
if(objectMapper==null) objectMapper =new ObjectMapper();
if(mqMessageList==null) mqMessageList =newArrayList<>();
Optional mqMessage = Optional.ofNullable(record.value());
if (mqMessage.isPresent()) {
Object message = mqMessage.get();
try {
if(mqMessageList.size()>maxMessageCount)
{
mqMessageList.remove(0);
}
MQMessage mq = objectMapper.readValue((String)message, MQMessage.class);//反序列化
mqMessageList.add(mq);
}catch (Exception e)
{
e.printStackTrace();
}
}
}
}
八、显示到页面上
大部分场景,我们并不需要把消息取出来显示,这里为了顺便测试一下消费者,也可以在kafka服务器上用命令查看
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.10:9092 --topic home.bus.log --from-beginning
@Controller
@RequestMapping(value = "/mq")publicclass mqController {
@Resource(name = "MQConsumerServiceImpl")
private MQConsumerService mqConsumerService;
@RequestMapping(value = "/list")
public String list() {
return"/mq/mqList";
}
@RequestMapping(value = "/getMsgList")
@ResponseBody
public Object getMsgList(HttpServletRequest request) {
intpageSize = 50;
try {
pageSize = Integer.parseInt(request.getParameter("pageSize"));
} catch (Exception e) {
e.printStackTrace();
}
intpageNumber = 0;
try {
pageNumber = Integer.parseInt(request.getParameter("pageNumber")) - 1;
} catch (Exception e) {
e.printStackTrace();
}
Map map =newHashMap<>(); String sortName = request.getParameter("sortName") ==null? "roleId" : request.getParameter("sortName");
String sortOrder = request.getParameter("sortOrder") ==null? "asc" : request.getParameter("sortOrder");
Sort sortLocal =newSort(sortOrder.equalsIgnoreCase("asc") ? Sort.Direction.ASC : Sort.Direction.DESC, sortName);
Pageable pageable = PageRequest.of(pageNumber, pageSize, sortLocal);
Page mqMessagePage =newPageImpl(mqConsumerService.getMessage(),pageable,this.mqConsumerService.getMessage().size());
map.put("total", mqMessagePage.getTotalElements());
map.put("rows", mqMessagePage.getContent());
return map;
}
}