阅读了kafka官网说明,总结了一下,和demo的个性实现;用幕布软件写的,直接贴过来了
- 核心概念
- 概念
- Kafka Streams是一个客户端库,用于处理和分析存储在Kafka中的数据
- Kafka Streams的入门门槛低,横向扩展只需要增加集群,类似consumer group增加consumer成员
- 亮点
- 轻量,一个简单的kafka streams库
- 利用kafka的分区,和消费group可以做到横向扩展
- 支持"exactly-once"精准一次语义
- 自带窗口,连接等功能
- 流处理拓扑(Topology)
- SourceProcessor 。。。(map、filter、reduce、aggregation)。。。SinkProcessor
- 时间概念
- 类型
- 事件时间(Event Time),时间本身的时间,发生时间
- 处理时间(Processing Time),应用开始处理的时间
- 摄取时间(Ingestion time),存储到kafka的时间
- TimeStampExtractor,流数据的时间根据接口的实现来
- 新时间戳生成
- 处理记录生成新记录时,继承时间戳,比如context.forward, process()等
- 周期性函数(Punctuator#punctuate())发送记录时,是当前事件context.timestamp()
- 对于聚合操作,生成的更新记录时,是最后一天到达触发记录的时间
- 可以在processor在明确指定时间戳
- 聚合,计数和求和这种操作等
- 窗口,根据时间窗口进行操作,允许晚到的记录,根据retention time
- 流和表,KStream和KTable
- 状态,可存储和查询
- 类型
- 概念
- 架构(图:http://kafka.apache.org/21/images/streams-architecture-overview.jpg)
- Stream Partition and Tasks,多个application拥有相同application.id的应用相当于一个consumer group,每个application消费多个topic的多个分区,协同工作,各自为Task,(图:http://kafka.apache.org/21/images/streams-architecture-tasks.jpg),这多个Application可以在一个线程中启动,也可以分别启动
- Local States Store,每个task可以开启一个或多个本地存储,这些存储也是有容错和恢复功能的,有名为***changelog的topic来存储states store的变更记录,出现问题的时候可以进行重放恢复
- 编写Kafka Streams 程序
- 两种方式,使用DSL(StreamBuilder)或者Processor API(Topology)
- 设置java.lang.Thread.UncaughtExceptionHandler来捕获stream的异常退出
- 尽量使用优雅停机,调用close()方法
- 配置项说明
- application.id 流处理应用程序的标识符
- bootstrap.servers 用于建立与Kafka群集的初始连接的主机/端口对列表
- application.server 定义当前应用定义的地址和端口,可用于在单个Kafka Streams应用程序中发现状态存储的位置
- buffered.records.per.partition 每个分区缓冲的最大记录数,默认1000
- cache.max.bytes.buffering 所有线程中用于记录缓存的最大内存字节数。默认10485760字节,10M
- client.id 发出请求时传递给服务器的ID字符串。(此设置将传递给Kafka Streams内部使用的使用者/生产者客户端。)默认空串
- commit.interval.ms 提交任务位置的频率,默认30000,30秒;如果是精准一次语义,。。。
- default.deserialization.exception.handler 实现DeserializationExceptionHandler接口的异常处理类
- default.production.exception.handler 实现ProductionExceptionHandler接口的异常处理类
- key.serde 记录键的默认序列化器/反序列化器类实现Serde接口(另请参见value.serde)
- metric.reporters 用作度量标准记录器的类列表
- metrics.num.samples 为计算指标而维护的样本数 默认2
- metrics.recording.level 指标的最高记录级别 默认INFO
- metrics.sample.window.ms 计算度量样本的时间窗口。 默认30秒
- num.standby.replicas 每个任务的备用副本数 默认0
- num.stream.threads 执行流处理的线程数 默认1
- processing.guarantee 处理方式。可以是"at_least_once"(默认)或"exactly_once"。
- poll.ms 阻止等待输入的时间量(以毫秒为单位) 默认100ms
- replication.factor 应用程序创建的更改日志主题和重新分区主题的复制因子 默认1
- retries 重试,默认0
- retry.backoff.ms 重试请求之前的等待一段时间量(以毫秒为单位)。如果retries参数配置为大于0,则适用;默认100ms
- state.cleanup.delay.ms 分区迁移时,等待多少时间删除之前的State(以毫秒为单位)。默认10个小时
- state.dir States Store存储的位置,默认/tmp/kafka-streams
- timestamp.extractor 实现TimestampExtractor接口的时间戳提取器类。
- value.serde 记录值的默认序列化器/反序列化器类实现Serde接口(另请参阅key.serde)。
- windowstore.changelog.additional.retention.ms延长多少时间不删除window state store,默认1天
- 还有一些consumer和producer的配置,也可以配置进去,参考http://kafka.apache.org/21/documentation/streams/developer-guide/config-streams
- Stream DSL
- KStream、流数据记录
- KTable、统计table,同键值覆盖
- GlobalKTable 全局KTable,跨应用实例应用统计存储
- 输入流
- topic -> KStream
- topic -> KTable 更新流(Upsert) ,需要提供一个State Store
- topic -> GlobalKTable ,需要提供一个State Store
- 流转换
- 无状态
- KStream -> KStream[],stream.branch...
- KStream -> KStream,KStream -> KTable,String.filter...
- KStream -> KStream,KTable -> KTable,String.filterNot...
- KStream -> KStream,stream.flatMap...,stream.flapMapValues...,stream.Map,stream.MapValues...
- KStream ->void,KTable->void,stream.foreach...
- KStream -> KGroupStream,groupByKey,GroupBy
- KTable -> KGroupTable, GroupBy
- stream.merge,stream.peek,stream.print,stream.selectByKey,table.toStream
- 有状态
- 聚合、Joining、Windowing、其他自定义Processor
- 注意KTable的groupBy,然后aggregate,参看http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api 的(alice, E)INSERT alice。。。
- 窗口(参看http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api)
- Tumbling time window;翻滚时间窗口,连续自然时间窗口,特殊的hopping time,根据自然时间;like [0;5000),[5000;10000),... not[1000;6000),[6000;11000),...
- Hopping time window;跳跃时间窗口,根据自然事件,[0;5000),[3000;8000),... not [1000;6000),[4000;9000),...
- Sliding time windows;滑动窗口
- Session Windows;
- 无状态
- Processor API
- 主要方法
- init方法,可以进行初始化操作,可以拿到ProcessorContext和State Store的一些信息
- process方法,入参为单条记录
- punctuate方法,迭代本地状态聚合等,然后发送给下游,可以在init中结合contxext.schedule定时发送
- State Store
- 有基于RocksDB的,和基于内存的
- withLoggingDisabled,默认存储changelog,做到容错;可以关闭
- 可以自定义
- 连接Processor和State Store
- 主要方法
- 数据类型和序列化
- 每个设计到输入和输出的时候都可以指定键值类型的Serdes,否则会使用默认的配置,内置有一些默认的基本类型;可以自定义,业务场景下可能用使用json方式解析和封装比较多;
- 比如Produced.with(stringSerde, longSerde),Consumed.with(stringSerde, longSerde)
下面是自己对kafka streams的一些应用和封装,用其做一些时间窗口内的数据统计
入口类
public class OtherApplication {
static KafkaStreams streams;
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "other-application2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdes.eventSerde().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> kStream = builder.stream("inputTopic",
Consumed.
with(Serdes.String(), MySerdes.eventSerde()).
withTimestampExtractor(
new MyEventTimeExtractor()
)
);
CountProcessor countProcessor =
new CountProcessor(new BasicMetrics(new BasicMetrics.WindowFragment(60 * 1000L, 0, 1), "money", "account", "countStore"));
countProcessor.streamIn(kStream);
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
次数统计类
public class CountProcessor extends StreamInWindowProcessor<Long> {
public CountProcessor(BasicMetrics basicMetrics) {
super(basicMetrics);
}
@Override
public void streamIn(TimeWindowedKStream<String, Event> kStream) {
kStream
.count(
Materialized
.as(
Stores.persistentWindowStore(
basicMetrics.storeName, basicMetrics.windowFrag.windowSizeMs * 100, 2, basicMetrics.windowFrag.windowSizeMs, false
)
)
)
.toStream()
.process(() -> this, basicMetrics.storeName);
}
@Override
public void process(WindowStoreIterator<Long> iterator) {
long total = 0L;
while (iterator.hasNext()) {
KeyValue<Long, Long> next = iterator.next();
total += next.value;
}
//todo total
}
}
窗口统计父类
public abstract class StreamInWindowProcessor<T> implements Processor<Windowed<String>, T> {
ReadOnlyWindowStore<String, T> store;
public BasicMetrics basicMetrics;
public Object key;
private static Map<Object, Object> resultMap = new ConcurrentHashMap<>();
public static Object getData(Object key) {
return resultMap.get(key);
}
public static void setData(Object key, Object data) {
resultMap.put(key, data);
}
public StreamInWindowProcessor(BasicMetrics basicMetrics) {
this.basicMetrics = basicMetrics;
}
public void streamIn(KStream<String, Event> stream) {
try {
TimeWindowedKStream<String, Event> kStream = stream
.groupBy((key, value) -> basicMetrics.groupByField + "$$" + value.get(basicMetrics.groupByField))
.windowedBy(TimeWindows.of(basicMetrics.windowFrag.windowSizeMs));
streamIn(kStream);
} catch (Throwable t) {
t.printStackTrace();
}
}
public abstract void streamIn(TimeWindowedKStream<String, Event> kStream);
@Override
public void init(ProcessorContext context) {
if (!StringUtils.isEmpty(basicMetrics.storeName)) {
store = (ReadOnlyWindowStore<String, T>) context.getStateStore(basicMetrics.storeName);
}
}
@Override
public void process(Windowed<String> key, T value) {
long currentStart = key.window().start();
this.key = key.key();
WindowStoreIterator<T> fetch = store.fetch(key.key(), basicMetrics.windowFrag.from(currentStart), basicMetrics.windowFrag.to(currentStart));
process(fetch);
}
public abstract void process(WindowStoreIterator<T> iterator);
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
辅助类
public class BasicMetrics {
public String storeName;
public BasicMetrics(WindowFragment windowFrag, String secondField, String groupByField, String storeName) {
this.windowFrag = windowFrag;
this.secondField = secondField;
this.groupByField = groupByField;
this.storeName = storeName;
}
public WindowFragment windowFrag;
public String groupByField;
public String secondField;
public static class WindowFragment {
public WindowFragment(Long windowSizeMs, Integer preFrom, Integer preEnd) {
this.windowSizeMs = windowSizeMs;
this.preFrom = preFrom;
this.preEnd = preEnd;
}
public Long from(long currentEventStart) {
return currentEventStart - preEnd * windowSizeMs;
}
public Long to(long currentEventStart) {
return currentEventStart - preFrom * windowSizeMs;
}
public Long windowSizeMs;
public Integer preFrom = 0;//e.g:当前窗口
public Integer preEnd = 1;//e.g:上一窗口,windowSizeMs为1分钟和preFrom:0和preEnd:1表示"最近两分钟"
}
}