需求:spark streaming消费kafka,offset维护到kafka topic,开启checkpoint
环境:Java、spark2.3、spark-streaming-kafka-0-10_2. 11
开启checkpoint后程序遇到序列化异常,解决方案如下:
① 程序用到了自定义ConsumerStrategy,需要添加序列化
② 异步提交offset到kafka,commitAsync方法的回调函数不能用lambda表达式。需要实现OffsetCommitCallback回调函数接口并序列化
import com.lazyge.sprk.stream.util.KfkConsumerStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
public class CkptTestSpark {
public static void main(String[] args)throws InterruptedException {
new CkptTestSpark().start();
}
public void start()throws InterruptedException {
JavaStreamingContext ssc = getContext();
JavaInputDStream> stream = getStream(ssc);
AtomicReference offsetRanges =new AtomicReference<>();
JavaDStream wordDstream = stream.transform((Function2>, Time, JavaRDD>) (rdd, time) -> {
if( rdd.rdd()instanceof HasOffsetRanges ){
offsetRanges.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());
}
JavaRDD lines = rdd.map((Function, String>) (record) -> {
return (String)record.value();
});
JavaRDD words = lines.flatMap((FlatMapFunction) (line) -> {
return Arrays.asList(line.split(" ")).iterator();
});
return words;
});
wordDstream.foreachRDD((VoidFunction2, Time>)(rdd, time) -> {
rdd.collect().forEach(System.out::println);
/**
* 开启checkpoint的时候,OffsetCommitCallback 需要实现序列化才能使用
* 否则报错:NotSerializableException
*/
if (stream.dstream()instanceof CanCommitOffsets) {
((CanCommitOffsets)stream.dstream()).commitAsync(offsetRanges.get()/*, (OffsetCommitCallback)(ofs, ex) -> {
System.err.println("开启checkpoint的时候,OffsetCommitCallback 需要实现序列化才能使用");
}*/);
}
});
ssc.start();
System.out.println("beginninignininiinini");
ssc.awaitTermination();
}
private JavaStreamingContext getContext(){
SparkConf conf =new SparkConf()
.setIfMissing("spark.master","local[*]")
.setIfMissing("spark.app.name","ckptTestSpark");
JavaStreamingContext ssc =new JavaStreamingContext(conf, Durations.seconds(5));
ssc.checkpoint("E:\\tmp\\ckptspark");
return ssc;
}
private JavaInputDStream> getStream(JavaStreamingContext ssc){
String brokers ="192.168.99.128:9092,192.168.99.128:9093,192.168.99.128:9094";
String groupId ="testCkpt";
String topics ="first";
Set topicsSet =new HashSet<>(Arrays.asList(topics.split(",")));
Map kafkaParams =new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
/**
* 开启checkpoint的时候,ConsumerStrategy 需要实现序列化才能使用
* 否则报错:NotSerializableException
*/
JavaInputDStream> stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),
new KfkConsumerStrategy(kafkaParams, Arrays.asList(topics)));
/*JavaInputDStream> stream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topicsSet, kafkaParams));*/
return stream;
}
}
总结:用scala不会有那么多问题