kafka streams 总结


阅读了kafka官网说明,总结了一下,和demo的个性实现;用幕布软件写的,直接贴过来了

  • 核心概念
    • 概念
      • Kafka Streams是一个客户端库,用于处理和分析存储在Kafka中的数据
      • Kafka Streams的入门门槛低,横向扩展只需要增加集群,类似consumer group增加consumer成员
      • 亮点
        • 轻量,一个简单的kafka streams库
        • 利用kafka的分区,和消费group可以做到横向扩展
        • 支持"exactly-once"精准一次语义
        • 自带窗口,连接等功能
    • 流处理拓扑(Topology)
      • SourceProcessor 。。。(map、filter、reduce、aggregation)。。。SinkProcessor
    • 时间概念
      • 类型
        • 事件时间(Event Time),时间本身的时间,发生时间
        • 处理时间(Processing Time),应用开始处理的时间
        • 摄取时间(Ingestion time),存储到kafka的时间
      • TimeStampExtractor,流数据的时间根据接口的实现来
      • 新时间戳生成
        • 处理记录生成新记录时,继承时间戳,比如context.forward, process()等
        • 周期性函数(Punctuator#punctuate())发送记录时,是当前事件context.timestamp()
        • 对于聚合操作,生成的更新记录时,是最后一天到达触发记录的时间
          • 可以在processor在明确指定时间戳
      • 聚合,计数和求和这种操作等
      • 窗口,根据时间窗口进行操作,允许晚到的记录,根据retention time
      • 流和表,KStream和KTable
      • 状态,可存储和查询
  • 架构(图:http://kafka.apache.org/21/images/streams-architecture-overview.jpg
    • Stream Partition and Tasks,多个application拥有相同application.id的应用相当于一个consumer group,每个application消费多个topic的多个分区,协同工作,各自为Task,(图:http://kafka.apache.org/21/images/streams-architecture-tasks.jpg),这多个Application可以在一个线程中启动,也可以分别启动
    • Local States Store,每个task可以开启一个或多个本地存储,这些存储也是有容错和恢复功能的,有名为***changelog的topic来存储states store的变更记录,出现问题的时候可以进行重放恢复
  • 编写Kafka Streams 程序
    • 两种方式,使用DSL(StreamBuilder)或者Processor API(Topology)
    • 设置java.lang.Thread.UncaughtExceptionHandler来捕获stream的异常退出
    • 尽量使用优雅停机,调用close()方法
  • 配置项说明
    • application.id 流处理应用程序的标识符
    • bootstrap.servers 用于建立与Kafka群集的初始连接的主机/端口对列表
    • application.server 定义当前应用定义的地址和端口,可用于在单个Kafka Streams应用程序中发现状态存储的位置
    • buffered.records.per.partition 每个分区缓冲的最大记录数,默认1000
    • cache.max.bytes.buffering 所有线程中用于记录缓存的最大内存字节数。默认10485760字节,10M
    • client.id 发出请求时传递给服务器的ID字符串。(此设置将传递给Kafka Streams内部使用的使用者/生产者客户端。)默认空串
    • commit.interval.ms 提交任务位置的频率,默认30000,30秒;如果是精准一次语义,。。。
    • default.deserialization.exception.handler 实现DeserializationExceptionHandler接口的异常处理类
    • default.production.exception.handler 实现ProductionExceptionHandler接口的异常处理类
    • key.serde 记录键的默认序列化器/反序列化器类实现Serde接口(另请参见value.serde)
    • metric.reporters 用作度量标准记录器的类列表
    • metrics.num.samples 为计算指标而维护的样本数 默认2
    • metrics.recording.level 指标的最高记录级别 默认INFO
    • metrics.sample.window.ms 计算度量样本的时间窗口。 默认30秒
    • num.standby.replicas 每个任务的备用副本数 默认0
    • num.stream.threads 执行流处理的线程数 默认1
    • processing.guarantee 处理方式。可以是"at_least_once"(默认)或"exactly_once"。
    • poll.ms 阻止等待输入的时间量(以毫秒为单位) 默认100ms
    • replication.factor 应用程序创建的更改日志主题和重新分区主题的复制因子 默认1
    • retries 重试,默认0
    • retry.backoff.ms 重试请求之前的等待一段时间量(以毫秒为单位)。如果retries参数配置为大于0,则适用;默认100ms
    • state.cleanup.delay.ms 分区迁移时,等待多少时间删除之前的State(以毫秒为单位)。默认10个小时
    • state.dir States Store存储的位置,默认/tmp/kafka-streams
    • timestamp.extractor 实现TimestampExtractor接口的时间戳提取器类。
    • value.serde 记录值的默认序列化器/反序列化器类实现Serde接口(另请参阅key.serde)。
    • windowstore.changelog.additional.retention.ms延长多少时间不删除window state store,默认1天
    • 还有一些consumer和producer的配置,也可以配置进去,参考http://kafka.apache.org/21/documentation/streams/developer-guide/config-streams
  • Stream DSL
    • KStream、流数据记录
    • KTable、统计table,同键值覆盖
    • GlobalKTable 全局KTable,跨应用实例应用统计存储
    • 输入流
      • topic -> KStream
      • topic -> KTable 更新流(Upsert) ,需要提供一个State Store
      • topic -> GlobalKTable ,需要提供一个State Store
    • 流转换
      • 无状态
        • KStream -> KStream[],stream.branch...
        • KStream -> KStream,KStream -> KTable,String.filter...
        • KStream -> KStream,KTable -> KTable,String.filterNot...
        • KStream -> KStream,stream.flatMap...,stream.flapMapValues...,stream.Map,stream.MapValues...
        • KStream ->void,KTable->void,stream.foreach...
        • KStream -> KGroupStream,groupByKey,GroupBy
        • KTable -> KGroupTable, GroupBy
        • stream.merge,stream.peek,stream.print,stream.selectByKey,table.toStream
      • 有状态
  • Processor API
    • 主要方法
      • init方法,可以进行初始化操作,可以拿到ProcessorContext和State Store的一些信息
      • process方法,入参为单条记录
      • punctuate方法,迭代本地状态聚合等,然后发送给下游,可以在init中结合contxext.schedule定时发送
    • State Store
      • 有基于RocksDB的,和基于内存的
      • withLoggingDisabled,默认存储changelog,做到容错;可以关闭
      • 可以自定义
      • 连接Processor和State Store
  • 数据类型和序列化
    • 每个设计到输入和输出的时候都可以指定键值类型的Serdes,否则会使用默认的配置,内置有一些默认的基本类型;可以自定义,业务场景下可能用使用json方式解析和封装比较多;
    • 比如Produced.with(stringSerde, longSerde),Consumed.with(stringSerde, longSerde)

下面是自己对kafka streams的一些应用和封装,用其做一些时间窗口内的数据统计

入口类

public class OtherApplication {
    static KafkaStreams streams;
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "other-application2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MySerdes.eventSerde().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Event> kStream = builder.stream("inputTopic",
                Consumed.
                        with(Serdes.String(), MySerdes.eventSerde()).
                        withTimestampExtractor(
                                new MyEventTimeExtractor()
                        )
        );
        CountProcessor countProcessor =
                new CountProcessor(new BasicMetrics(new BasicMetrics.WindowFragment(60 * 1000L, 0, 1), "money", "account", "countStore"));
        countProcessor.streamIn(kStream);


        streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

次数统计类

public class CountProcessor extends StreamInWindowProcessor<Long> {

    public CountProcessor(BasicMetrics basicMetrics) {
        super(basicMetrics);
    }

    @Override
    public void streamIn(TimeWindowedKStream<String, Event> kStream) {
        kStream
                .count(
                        Materialized
                                .as(
                                        Stores.persistentWindowStore(
                                                basicMetrics.storeName, basicMetrics.windowFrag.windowSizeMs * 100, 2, basicMetrics.windowFrag.windowSizeMs, false
                                        )
                                )
                )
                .toStream()
                .process(() -> this, basicMetrics.storeName);
    }

    @Override
    public void process(WindowStoreIterator<Long> iterator) {
        long total = 0L;
        while (iterator.hasNext()) {
            KeyValue<Long, Long> next = iterator.next();
            total += next.value;
        }
//todo total
    }
}

窗口统计父类

public abstract class StreamInWindowProcessor<T> implements Processor<Windowed<String>, T> {
    ReadOnlyWindowStore<String, T> store;
    public BasicMetrics basicMetrics;
    public Object key;
    private static Map<Object, Object> resultMap = new ConcurrentHashMap<>();

    public static Object getData(Object key) {
        return resultMap.get(key);
    }

    public static void setData(Object key, Object data) {
        resultMap.put(key, data);
    }

    public StreamInWindowProcessor(BasicMetrics basicMetrics) {
        this.basicMetrics = basicMetrics;
    }

    public void streamIn(KStream<String, Event> stream) {
        try {
            TimeWindowedKStream<String, Event> kStream = stream
                    .groupBy((key, value) -> basicMetrics.groupByField + "$$" + value.get(basicMetrics.groupByField))
                    .windowedBy(TimeWindows.of(basicMetrics.windowFrag.windowSizeMs));
            streamIn(kStream);
        } catch (Throwable t) {
            t.printStackTrace();
        }

    }

    public abstract void streamIn(TimeWindowedKStream<String, Event> kStream);


    @Override
    public void init(ProcessorContext context) {
        if (!StringUtils.isEmpty(basicMetrics.storeName)) {
            store = (ReadOnlyWindowStore<String, T>) context.getStateStore(basicMetrics.storeName);
        }
    }

    @Override
    public void process(Windowed<String> key, T value) {
        long currentStart = key.window().start();
        this.key = key.key();
        WindowStoreIterator<T> fetch = store.fetch(key.key(), basicMetrics.windowFrag.from(currentStart), basicMetrics.windowFrag.to(currentStart));
        process(fetch);
    }

    public abstract void process(WindowStoreIterator<T> iterator);

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

辅助类

public class BasicMetrics {
    public String storeName;

    public BasicMetrics(WindowFragment windowFrag, String secondField, String groupByField, String storeName) {
        this.windowFrag = windowFrag;
        this.secondField = secondField;
        this.groupByField = groupByField;
        this.storeName = storeName;
    }


    public WindowFragment windowFrag;
    public String groupByField;
    public String secondField;

    public static class WindowFragment {
        public WindowFragment(Long windowSizeMs, Integer preFrom, Integer preEnd) {
            this.windowSizeMs = windowSizeMs;
            this.preFrom = preFrom;
            this.preEnd = preEnd;
        }

        public Long from(long currentEventStart) {
            return currentEventStart - preEnd * windowSizeMs;
        }

        public Long to(long currentEventStart) {
            return currentEventStart - preFrom * windowSizeMs;
        }

        public Long windowSizeMs;
        public Integer preFrom = 0;//e.g:当前窗口
        public Integer preEnd = 1;//e.g:上一窗口,windowSizeMs为1分钟和preFrom:0和preEnd:1表示"最近两分钟"
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342