手写编写一个工具类的接口程序:
package com.ky.kafka.consumer;
/**
@Author: xwj
@Date: 2019/3/12 0012 09:48
-
@Version 1.0
*/
public interface ConsumerInterface {/**
- 普通的消费者
- @param topic 主题
*/
public void consumer(String topic);
/**
- 批量消费
- @param topic 主题
- @param num 批量消费的数量
*/
public void consumerBatch(String topic, int num);
/**
- 指定从哪里开始消费数据
- @param topic 主题
- @param line 具体的位置
*/
public void consumerFromDetailPostion(String topic, long line);
}
其次编写一个工厂类生产实例:
import java.util.Properties;
/**
@Author: xwj
@Date: 2019/3/12 0012 09:56
-
@Version 1.0
*/
public class ConsumerFactory {/**
- 获取消费者实例
- @return 获取消费者实例
*/
public static KafkaConsumer getConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertyUtil.getInstance().getString("brokerList", ""));
props.put(ConsumerConfig.GROUP_ID_CONFIG, PropertyUtil.getInstance().getString("groupid", "123"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
System.out.println("初始化kafka消费者的配置信息......");
return new KafkaConsumer(props);
}
}
最后编写一个工具类通用的访问方法:
package com.ky.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@Author: xwj
@Date: 2019/3/12 0012 09:59
-
@Version 1.0
*/
public class Consumer implements ConsumerInterface {
private final static Logger logger = LoggerFactory.getLogger(Consumer.class);@Override
public void consumer(String topic) {
KafkaConsumer consumer = ConsumerFactory.getConsumer();
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(300);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("partition = %d , offset = %d, key = %s, value = %s", record.partition(),
record.offset(), record.key(), record.value()));
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("commit failed for offset {}", offsets, e);
}
}
});
}
} finally {
try {
consumer.commitSync();
} finally {
release(consumer);
}
}
}@Override
public void consumerBatch(String topic, int num) {
KafkaConsumer consumer = ConsumerFactory.getConsumer();
ConcurrentHashMap<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
int count = 0;
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(300);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, "no metadata"));
if (count % num == 0) {
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("commit failed for offset {}", offsets, e);
}
}
});
}
count++;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
consumer.commitSync();
} finally {
release(consumer);
}
}
}@Override
public void consumerFromDetailPostion(String topic, final long line) {
final KafkaConsumer consumer = ConsumerFactory.getConsumer();
final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
try {
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
consumer.commitSync(currentOffsets);
}@Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { //consumer.seekToBeginning(collection); for (TopicPartition topicPartition : collection) { consumer.seek(topicPartition, line); } } }); while (true) { final ConsumerRecords<String, String> records = consumer.poll(300); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("partition = %d , offset = %d, key = %s, value = %s", record.partition(), record.offset(), record.key(), record.value())); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata")); } consumer.commitAsync(currentOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) { logger.error("commit failed for offset {}", offsets, e); } } }); } } catch (Exception e) { e.printStackTrace(); } finally { try { consumer.commitSync(); } finally { release(consumer); } }
}
/**
关闭消费者
-
@param consumer 消费者
*/
private void release(final KafkaConsumer consumer) {
Runtime.getRuntime().addShutdownHook(new Thread() {@Override public void run() { super.run(); if (consumer != null) { consumer.close(); } }
});
}
}