Flink Table API&SQL编程指南之时间属性(3)

Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及Ingestion time(摄入时间)。关于这些时间语义的具体解释,可以参考另一篇文章Flink的时间与watermarks详解。本文主要讲解Flink Table API & SQL中基于时间的算子如何定义时间语义。通过本文你可以了解到:

  • 时间属性的简介
  • 处理时间
  • 事件时间

时间属性简介

Flink TableAPI&SQL中的基于时间的操作(如window),需要指定时间语义,表可以根据指定的时间戳提供一个逻辑时间属性。

时间属性是表schama的一部分,当使用DDL创建表时、DataStream转为表时或者使用TableSource时,会定义时间属性。一旦时间属性被定义完成,该时间属性可以看做是一个字段的引用,从而在基于时间的操作中使用该字段。

时间属性像一个时间戳,可以被访问并参与计算,如果一个时间属性参与计算,那么该时间属性会被雾化成一个常规的时间戳,常规的时间戳不能与Flink的时间与水位线兼容,不能被基于时间的操作所使用。

Flink TableAPI & SQL所需要的时间属性可以通过Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认

// 可以选择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

处理时间

基于本地的机器时间,是一种最简单的时间语义,但是不能保证结果一致性,使用该时间语义不需要提取时间戳和生成水位线。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定义处理时间

处理时间的属性可以在DDL语句中被定义为一个计算列,需要使用PROCTIME()函数,如下所示:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外字段,作为处理时间属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口

DataStream转为Table的过程中定义处理时间

在将DataStream转为表时,在schema定义中可以通过.proctime属性指定时间属性,并将其放在其他schema字段的最后面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 声明一个额外逻辑字段作为处理时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

// 定义个带有处理时间属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name" , "data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // 创建stream
        DataStream<Row> stream = ...;
        return stream;
    }

    @Override
    public String getProctimeAttribute() {
        // 该字段会追加到schema中,作为第三个字段
        return "user_action_time";
    }
}

// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件时间

基于记录的具体时间戳,即便是存在乱序或者迟到数据也会保证结果的一致性。总共有三种方式定义处理时间属性,具体如下

DDL语句创建表时定事件时间

事件时间属性可以通过 WATERMARK语句进行定义,如下:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明user_action_time作为事件时间属性,并允许5S的延迟  
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream转为Table的过程中定义事件时间

当定义Schema时通过.rowtime属性指定事件时间属性,必须在DataStream中指定时间戳与水位线。例如在数据集中,事件时间属性为event_time,此时Table中的事件时间字段中可以通过’event_time. rowtime‘来指定。

目前Flink支持两种方式定义EventTime字段,如下:

// 方式1:
// 提取timestamp并分配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 声明一个额外逻辑字段作为事件时间属性
// 在table schema的末尾使用user_action_time.rowtime定义事件时间属性
// 系统会在TableEnvironment中获取事件时间属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");

// 方式2:

// 从第一个字段提取timestamp并分配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段已经用来提取时间戳,可以直接使用对应的字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");

// 使用:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

另外也可以在创建TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中需要实现getRowtimeAttributeDescriptors方法,创建基于EventTime的时间属性信息。

// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name", "data", "user_action_time"};
        TypeInformation[] types =
            new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        // 创建流,基于user_action_time属性分配水位线
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }

    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        // 标记user_action_time字段作为事件时间属性
        // 创建user_action_time描述符,用来标识时间属性字段
        RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "user_action_time",
            new ExistingField("user_action_time"),
            new AscendingTimestamps());
        List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
        return listRowtimeAttrDescr;
    }
}

// register表
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文主要介绍了如何在Flink Table API和SQL中使用时间语义,可以使用两种时间语义:处理时间和事件时间。分别对每种的时间语义的使用方式进行了详细解释。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335