Flink(1.13) 的window机制(一)

窗口概述

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而Window窗口是一种切割无限数据为有限块进行处理的手段。

在Flink中, 窗口(window)是处理无界流的核心. 窗口把流切割成有限大小的多个"存储桶"(bucket), 我们在这些桶上进行计算.

image.png

窗口的分类

窗口分类

窗口分为2类:

  1. 基于时间的窗口(时间驱动)
  2. 基于元素个数的(数据驱动)

WindowAssigner

WindowAssigner 是一个抽象类,是所有窗口策略父类。

public abstract class WindowAssigner<T, W extends Window> implements Serializable {}

WindowAssigner下所有的子类

SlidingProcessingTimeWindows (org.apache.flink.streaming.api.windowing.assigners)
按照处理时间进行窗口滑动
of 的重载方法

  • of(Time size, Time slide)
    size:窗口长度
    slide:滑动步长
  • of(Time size, Time slide, Time offset)
    offset:时区

BaseAlignedWindowAssigner (org.apache.flink.streaming.api.windowing.assigners)

TumblingEventTimeWindows (org.apache.flink.streaming.api.windowing.assigners)

    TumblingTimeWindows (org.apache.flink.streaming.api.windowing.assigners)

MergingWindowAssigner (org.apache.flink.streaming.api.windowing.assigners)

    DynamicProcessingTimeSessionWindows(org.apache.flink.streaming.api.windowing.assigners)

    ProcessingTimeSessionWindows(org.apache.flink.streaming.api.windowing.assigners)
    按照处理时间划分session,该方式为静态gap。
    withGap :窗口间隔时长,gap时间内的会划为一个窗口,这是一个合并的操作,这也说明为啥是MergingWindowAssigner 子类的原因。

    DynamicEventTimeSessionWindows(org.apache.flink.streaming.api.windowing.assigners)

    EventTimeSessionWindows(org.apache.flink.streaming.api.windowing.assigners)

TumblingProcessingTimeWindows (org.apache.flink.streaming.api.windowing.assigners)
按照处理时间进行窗口滚动
of 的重载方法

  • of(Time size)
    size:窗口长度
  • of(Time size, Time offset)
    offset:配置时区
  • of(Time size, Time offset, WindowStagger windowStagger)
    windowStagger:窗口交错

SlidingEventTimeWindows(org.apache.flink.streaming.api.windowing.assigners)

    SlidingTimeWindows (org.apache.flink.streaming.api.windowing.assigners)

GlobalWindows (org.apache.flink.streaming.api.windowing.assigners)

Time

    public static Time of(long size, TimeUnit unit) {
        return new Time(size, unit);
    }

    /** Creates a new {@link Time} that represents the given number of milliseconds. */
    public static Time milliseconds(long milliseconds) {
        return of(milliseconds, TimeUnit.MILLISECONDS);
    }

    /** Creates a new {@link Time} that represents the given number of seconds. */
    public static Time seconds(long seconds) {
        return of(seconds, TimeUnit.SECONDS);
    }

    /** Creates a new {@link Time} that represents the given number of minutes. */
    public static Time minutes(long minutes) {
        return of(minutes, TimeUnit.MINUTES);
    }

    /** Creates a new {@link Time} that represents the given number of hours. */
    public static Time hours(long hours) {
        return of(hours, TimeUnit.HOURS);
    }

    /** Creates a new {@link Time} that represents the given number of days. */
    public static Time days(long days) {
        return of(days, TimeUnit.DAYS);
    }

WindowStagger

public enum WindowStagger {
    /** Default mode, all panes fire at the same time across all partitions. */
    ALIGNED {
        @Override
        public long getStaggerOffset(final long currentProcessingTime, final long size) {
            return 0L;
        }
    },

    /**
     * Stagger offset is sampled from uniform distribution U(0, WindowSize) when first event
     * ingested in the partitioned operator.
     */
    RANDOM {
        @Override
        public long getStaggerOffset(final long currentProcessingTime, final long size) {
            return (long) (ThreadLocalRandom.current().nextDouble() * size);
        }
    },

    /**
     * When the first event is received in the window operator, take the difference between the
     * start of the window and current procesing time as the offset. This way, windows are staggered
     * based on when each parallel operator receives the first event.
     */
    NATURAL {
        @Override
        public long getStaggerOffset(final long currentProcessingTime, final long size) {
            final long currentProcessingWindowStart =
                    TimeWindow.getWindowStartWithOffset(currentProcessingTime, 0, size);
            return Math.max(0, currentProcessingTime - currentProcessingWindowStart);
        }
    };

    public abstract long getStaggerOffset(final long currentProcessingTime, final long size);
}

基于时间的窗口

时间窗口包含一个开始时间戳(包括)和结束时间戳(不包括), 这两个时间戳一起限制了窗口的尺寸.
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口. 这个类提供了key查询开始时间戳和结束时间戳的方法, 还提供了针对给定的窗口获取它允许的最大时间戳的方法(maxTimestamp())。

时间窗口可以理解成一个桶,装有各种各样的元素,
相同的元素会存放到同一个窗口(如下图),统计窗口05的数据,其实每个元素都有05的窗口。
注意:(a,4),表示有 4个a,而不是a的个数为4。

时间窗口又分4种:

1.滚动窗口(Tumbling Windows)

滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙.比如,如果指定一个长度为5分钟的滚动窗口, 当前窗口开始计算, 每5分钟启动一个新的窗口。
滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。

滚动窗口

  • 没有间隙
  • 没有重叠
  • 窗口长度
    public static void main(String[] args) throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);

        // 设置窗口为5秒
        WindowedStream<Tuple2<String, Long>, String, TimeWindow> window = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5l)));

        // 聚合
        window.sum(1).print();

        env.execute();

    }

输入

java python java scala

输出:并不会立即输出,需要等待窗口关闭之后才能进行输出

5> (python,2)
1> (scala,1)
3> (java,3)

2.滑动窗口(Sliding Windows)

与滚动窗口一样, 滑动窗口也是有固定的长度. 另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率.

所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠. 这种情况下, 一个元素可能会被分配到多个窗口中

例如, 滑动窗口长度10分钟, 滑动步长5分钟, 则, 每5分钟会得到一个包含最近10分钟的数据.


滑动窗口
  • 固定长度
  • 滑动步长
  • 滑动步长<窗口长度,会造成数据重复
  • 滑动步长>窗口长度,会造成数据丢失
    public static void main(String[] args) throws Exception {

        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);

        //窗口长度为5,没3秒统计一次。
        SlidingProcessingTimeWindows windows = SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3));

        WindowedStream<Tuple2<String, Long>, String, TimeWindow> window = keyBy.window(windows);

        // 聚合
        window.sum(1).print();

        env.execute();

    }

输出

12> (,1)
5> (python,9)
3> (java,9)
5> (python,2)
3> (java,2)

3.会话窗口(Session Windows)

会话窗口分配器会根据活动的元素进行分组. 会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间.

如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的gap(间隔)

我们可以配置静态的gap, 也可以通过一个gap extractor 函数来定义gap的长度. 当时间超过了这个gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口

会话窗口
  • 按照key进行分组(划分一个新的窗口)
  • 不会造成数据重叠
  • 没有固定开启和关闭时间
  • 若一段时间内没有数据,窗口自动关闭
  • 可以设置静态gap和动态gap

静态 gap

ProcessingTimeSessionWindows windows = ProcessingTimeSessionWindows.withGap(Time.seconds(3));
    @Test
    public void test1() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);

        // 设置session时间为3秒
        ProcessingTimeSessionWindows windows = ProcessingTimeSessionWindows.withGap(Time.seconds(3));

        WindowedStream<Tuple2<String, Long>, String, TimeWindow> window = keyBy.window(windows);


        // 聚合
        window.sum(1).print();


        env.execute();
    }

动态 gap

DynamicProcessingTimeSessionWindows<Object> windows = 
ProcessingTimeSessionWindows.withDynamicGap(e-> new Random().nextInt(3000));
@Test
public void test2() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = 
source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);

        // 设置session时间为3秒
        DynamicProcessingTimeSessionWindows<Object> windows = 
ProcessingTimeSessionWindows.withDynamicGap(e-> new Random().nextInt(3000));

        WindowedStream<Tuple2<String, Long>, String, TimeWindow> window = keyBy.window(windows);

        // 聚合
        window.sum(1).print();

        env.execute();
}

session会将gap时间范围内的窗口合并成一个窗口。
例如:
a1,a2,a3,a4,b1,b2,b3,b4

首先取最后元素(a1)的时间,然后与下一个元素(a2)进行匹配,若两个元素之间的时间进行比较是否超过gap,若没有则将他们划分为一个窗口,然后继续与下一个元素(a3)进行比较,以此类推。直到与某个元素的时间超过了gap就划分为另一个窗口。

4.全局窗口(Global Windows)

全局窗口分配器会分配相同key的所有元素进入同一个 Global window. 这种窗口机制只有指定自定义的触发器时才有用. 否则, 不会做任何计算, 因为这种窗口没有能够处理聚集在一起元素的结束点.

全局窗口
  • 需要指定触发器

创建全局窗口

 // 创建 全局窗口
WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window =
keyBy.window(org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.create());
    @Test
    public void  test1() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);


        // 创建 全局窗口
        WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.window(org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.create());


        window.process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, GlobalWindow>() {
            /**
             *
             * @param key 聚合元素
             * @param context 上下文
             * @param elements 窗口所有的元素
             * @param out 收集器
             * @throws Exception
             */
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {

                ArrayList<Tuple2<String, Long>> list =new ArrayList<>();

                elements.forEach(list::add);

                String msg=String.format("key=%s,window=%s ,data=%s",key, context.window(),list);

                out.collect(msg);
            }
        }).print();

        env.execute();

    }

输入:

s
s
s
ss
s
s
s
s
s
s
s

输出:此时无论输入了多个元素,都不会触发计算。

所以需要设置触发器,由我们来设置触发条件。

Trigger

public abstract class Trigger<T, W extends Window> implements Serializable {

Trigger 下所有的子类
ProcessingTimeoutTrigger (org.apache.flink.streaming.api.windowing.triggers)
基于处理时间超时的触发器

CountTrigger (org.apache.flink.streaming.api.windowing.triggers)
基于元素个数的触发器

EventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
基于事件时间的触发器

DeltaTrigger (org.apache.flink.streaming.api.windowing.triggers)
此触发器计算上次触发的数据点与当前到达的数据点之间的增量。如果 delta 高于指定的阈值,它就会触发。

NeverTrigger in GlobalWindows (org.apache.flink.streaming.api.windowing.assigners)
永远不会触发的触发器,作为 GlobalWindows 的默认触发器。

ContinuousEventTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
根据给定的时间间隔连续触发,需要指定水位(Watermarks)

PurgingTrigger (org.apache.flink.streaming.api.windowing.triggers)
当嵌套触发器触发时,这将返回一个TriggerResult

ContinuousProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
根据运行作业的机器的时钟测量的给定时间间隔连续触发。

ProcessingTimeTrigger (org.apache.flink.streaming.api.windowing.triggers)
一旦当前系统时间超过窗格所属窗口的末尾,就会触发。


上面的都不会使用,自定义count触发器

        // 添加 触发器
        window.trigger(new Trigger<Tuple2<String, Long>, GlobalWindow>() {

            /**
             * 来一个元素触发一次
             * @param element
             * @param timestamp
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onElement(Tuple2<String, Long> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            /**
             * 基于处理时间
             * @param time
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            /**
             * 基于事件时间
             * @param time
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            /**
             * 触发器执行后,清空窗口元素
             * @param window
             * @param ctx
             * @throws Exception
             */
            @Override
            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

            }
        });

TriggerResult

public enum TriggerResult {

    /** 没有对窗口采取任何操作。*/
    CONTINUE(false, false),

    /** {@code FIRE_AND_PURGE} 计算window函数并发出window结果。*/
    FIRE_AND_PURGE(true, true),

    /**
     * On {@code FIRE}, 窗口被评估并发出结果。窗户没有清洗,
     *但是,所有的元素都被保留了。
     */
    FIRE(true, false),

    /**
     * 属性的值将被清除并丢弃窗口中的所有元素
     * 窗口函数或发出任何元素。
     */
    PURGE(false, true);
}

自定义触发器

@Test
    public void  test1() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);


        // 创建 全局窗口
        WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.window(org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.create());

        // 添加 触发器
        window.trigger(new Trigger<Tuple2<String, Long>, GlobalWindow>() {

             int count=0;

            /**
             * 来一个元素触发一次
             * @param element
             * @param timestamp
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onElement(Tuple2<String, Long> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

                // 三个元素发送一次
                if (count>=3){
                    count=0;
                    return TriggerResult.FIRE_AND_PURGE;
                }
                count++;

                // 超时不发送
                return TriggerResult.CONTINUE;
            }

            /**
             * 基于处理时间
             * @param time
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            /**
             * 基于事件时间
             * @param time
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                return null;
            }

            /**
             * 触发器执行后,清空窗口元素
             * @param window
             * @param ctx
             * @throws Exception
             */
            @Override
            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

            }
        });


        window.process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, GlobalWindow>() {
            /**
             *
             * @param key 聚合元素
             * @param context 上下文
             * @param elements 窗口所有的元素
             * @param out 收集器
             * @throws Exception
             */
            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Object> out) throws Exception {

                ArrayList<Tuple2<String, Long>> list =new ArrayList<>();

                elements.forEach(list::add);

                String msg=String.format("key=%s,window=%s ,data=%s",key, context.window(),list);

                out.collect(msg);
            }
        }).print();

        env.execute();

    }

输入

a
a
a
bb
bb
bb
cc
cc
cc
a
cc
bb

输出

11> key=a,window=GlobalWindow ,data=[(a,1), (a,1), (a,1), (a,1)]
16> key=cc,window=GlobalWindow ,data=[(cc,1), (cc,1), (cc,1), (cc,1)]
9> key=bb,window=GlobalWindow ,data=[(bb,1), (bb,1), (bb,1), (bb,1)]

基于元素个数的窗口

按照指定的数据条数生成一个Window,与时间无关

若有N个元素
a、b、c、b、a、c、a、c、b、a、a、b、c
那么会划为三个窗口,不同元素划分为一个窗口(a窗口,b窗口,c窗口)。

分2类:

1.滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.countWindow(3); //指定元素窗口的大小
    @Test
    public void test1() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);

        WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.countWindow(3);


        // 聚合
        window.sum(1).print();


        env.execute();
    }
}

输入

a
b
c
a
b
c
a
b
c

输出

11> (a,3)
3> (b,3)
8> (c,3)

说明:哪个窗口先达到3个元素, 哪个窗口就关闭. 不影响其他的窗口.

2.滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围最多是3个元素。

    WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.countWindow(窗口查长度, 滑动长度);
    @Test
    public void test2() throws Exception {
        Configuration config=new Configuration();
        config.setInteger("rest.port",8081); // 配置固定端口

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);


        DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);

        // 扁平化
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            Arrays.stream(value.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1L)));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 聚合
        KeyedStream<Tuple2<String, Long>, String> keyBy = flatMap.keyBy(s -> s.f0);


        WindowedStream<Tuple2<String, Long>, String, GlobalWindow> window = keyBy.countWindow(5, 3);


        // 聚合
        window.sum(1).print();


        env.execute();
    }

输入(注意这里有6个bb)

bb
bb
bb
bb
bb
bb

输出

9> (bb,3) // 3个输出一次
9> (bb,5) // 5个输出一个

每来3三个元素会关闭窗口,将触发窗口计算。5表示,最多取5个。


窗口位置(Keyed vs Non-Keyed Windows)

其实, 在用window前首先需要确认应该是在keyBy后的流上用, 还是在没有keyBy的流上使用.

在keyed streams上使用窗口, 窗口计算被并行的运用在多个task上, 可以认为每个task都有自己单独窗口. 正如前面的代码所示.

非non-keyed stream上使用窗口, 流的并行度只能是1, 所有的窗口逻辑只能在一个单独的task上执行.

.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))

需要注意的是: 非key分区的流上使用window, 如果把并行度强行设置为>1, 则会抛出异常

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容