下面我们来实际的使用一下Kafka,看一下是怎么进行消息的生产的。
我们先新建一个类Producer,在构造方法中初始化KafkaProducer对象
//定义一个produce的参数
private final KafkaProducer<Integer, String> producer;
//定义topic
public final static String topic = "goods";
public Producer() {
//给producer属性赋值
Properties props = new Properties();
//序列化 防止在转换的时候抛出异常
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//配置kafka broker list的地址
props.put("bootstrap.servers", "192.168.61.158:9092"); //配置kafka broker list的地址
props.put("zk.connect", "192.168.61.151:2181,192.168.61.152:2181,192.168.61.153:2181");
//将producer 的属性值赋值进去
producer = new KafkaProducer(props);
}
KafkaProducer是通过Properties设置的配置参数,具体的参数比较简单,就是指定Kafka的broker地址,key的序列化器,value的序列化器,还有Zookeeper的地址。Kafka预置了StringSerializer这个类,可以序列化String类型的消息。
然后我们编写发送消息的方法
//发消息的
public void produce(String message) throws InterruptedException {
//发送消息
producer.send(new ProducerRecord<Integer, String>(topic, message));
System.out.println("send OK");
}
很简单,就是构造一个ProducerRecord对象并发送。
接下来编写测试方法进行测试
try {
Producer producer = new Producer();
producer.produce("hello");
}catch (Exception e){
LOG.error("produce message error",e);
}
Kafka的Producer编写其实很简单,就是在初始化KafkaProducer的时候把属性设置进去,然后调用send方法进行发送就行了。
Kafka的Producer开发就介绍到这里了。