摘要:本文整理自 StarRocks 社区技术布道师谢寅,在 Flink Forward Asia 2022 实时湖仓的分享。本篇内容主要分为五个部分:
- 极速数据分析
- 实时数据更新
- StarRocks Connector For Apache Flink
- 客户实践案例
- 未来规划
一、极速数据分析
统一 OLAP 分析的趋势,以及 StarRocks 极速查询分析的核心能力。计算机科学里所有难题,都能通过加中间层的方式来解决,但是不能加的东西太多。回想 Hadoop 生态演变的过程,先有了分布式存储,解决了海量数据如何用廉价的设备,来存储的问题。又有 MapReduce 帮助我们慢悠悠的解决了,分布式处理的问题。
为了让只会写 SQL 的分析师,能够专注于业务,不用担心 Java 编程的问题,又有了 Hive 帮助我们解决,SQL 到 MR 的自动解析。当人们觉得 Shuffle 磁盘太慢,我们研究了基于内存的弹性分布式数据集 RDD,让数据在内存里分布式的高效计算。
由于内存里微批的计算仍不能描述所有的实时语义,就有了为实时而生的分布式计算引擎 Flink。
早期,人们对数据的依赖还没那么深的时候,数据不管怎么进来的,最终能看到就行。随着时代的变迁,除了管理层需要看数,基层小伙伴也需要用数。于是 OLAP 分析产品,就像雨后春笋似的,有能直接聚合指标的,有能 Adhoc 探查的,有单表无敌的,有能支持数据更新的。
由于组件太多,数据在各个引擎里来回传递,时效性低,口径不一致,硬件资源,人力成本,都非常浪费。所以人们期待一种极致性能的分析型数据库,能够收敛 OLAP 分析层,开启实时数据分析新范式。
StarRocks 的愿景是希望帮助客户,能够实现极速统一 OLAP 分析的技术架构。首先,经过 2 年的打造,极致的性能已经深入人心。全面支持了向量化引擎,CBO 技术,智能物化视图等等一揽子技术。使得 StarRocks 可以完成亚秒级极速 OLAP 分析,保证数据分析应用最后一公里的极速响应。
同时,现代化 MPP 的架构,可以让查询服务充分利用多机多核的资源。保证业务随着硬件,可以 scale-up 和 scale-out。简洁的 fe+be 的架构,可以实现极简运维,优秀的实时摄入能力,让实时数据分析变得轻松简单。
在高并发点查的场景,资源合理规划,可以做到上万 QPS。在与云的整合上,我们已经在各大云商的半托管服务上,能够快速部署社区版 StarRocks。周边我们也在努力整合更丰富的生态方案。开放活跃的社区也逐渐有伙伴,帮我们贡献更多的关键性 Feature。
有这样极致的分析性能,StarRocks 到底能帮我们做什么呢?这里归纳总结了 4 个比较核心的场景。
BI 报表类业务。这个是 StarRocks 的看家本领,不管你要加速固定报表,还是要自助式 BI,拖拉拽来探索式分析,都可以用 StarRocks 来支撑。
实时类的业务。比如实时大屏,Flink+StarRocks 的方案已经非常成熟了。尤其是增量聚合类的指标,StarRocks 的聚合表模型,可以直接生成 DWS 层的 sum,min,max 的聚合指标。
有一些用户在打造客户数据平台时,做用户分群、行为分析、用户画像等场景也会用 StarRocks 去做。之前很多场景是离线的,StarRocks 实时的链路也可以秒级摄入,这样离线和实时的数据可以联合分析,让数据的新鲜度更靠前。
统一分析。除了刚刚讲的实时数据和离线数据的统一,StarRocks 还支持 Iceberge/Hudi/Hive 外表查询,可以实现湖和仓的联邦分析。也有客户用 StarRocks 真真切切的解决了它们的分析和服务割裂的问题,以及尝试业财一体化分析等等。
如上图所示,展示了 StarRocks 的核心能力,其中第一点,就是 StarRocks 全面支持了 SIMD 指令,充分去利用单颗 CPU 的处理能力,让它一次指令能够处理更多的数据。
StarRocks 支持非常多的分布式 Join 策略,针对不同的场景,CBO 优化器可以自动选择合适的分布式 Join 策略。从若干个查询规划候选中,选择最优的规划,让查询体验最好。
有了 CBO 优化器,可以基于统计信息,自动改写左右表的关系,智能选择最优的查询规划。此外,比如用低基数全局字典让 String 映射为 int;延迟物化来降低无效的 Scan;runtimeFilter 让右表的过滤可以推到左表提前 Scan 等等,还有一揽子极致的优化,整体保证 StarRocks 能够应对非常复杂的分析查询。
以最大的限度,让使用者关注在业务逻辑本身。从各种参数调优、分布式 Join 策略的选择等等手动优化的工作中解脱出来,把这些事情交给 StarRocks 自动完成。
StarRock 支持非常丰富的数据摄入能力。有配套的手段可以从传统关系型数据同步存量数据,也可以结合 Primary Key 模型和 Flink-CDC,整合做实时 Upsert、Delete 的数据同步。
此外,对于消息队列的数据,StarRocks 的 routine load 可以直接消费 Kafka 的消息,也可以用 Connector 和 Flink 整合。核心的组件 FE,负责元数据管理和 SQL 解析,执行规划的生成等。BE 承载了向量化执行引擎和列式存储。在外层,支持非常丰富的外表查询能力,可以整合湖和仓的数据一体分析。StarRocks 做的 BlockCache 的 Feature,可以让湖的查询能力不弱于仓的性能。
另外,StarRocks 部署非常简单,不管你是在云上还是私有化部署,都可以实现极简运维。对外通过 MySQL JDBC 就能轻松连入,去应对 BI 分析、报表、实时看板等场合。
二、实时数据更新
接下来,重点看看 StarRocks 在有更新的实时链路里,怎么提供高效的分析查询服务。首先,谈到实时数仓,每个企业每个客户的理解都不尽相同,技术路线的选择也会有所不同。
有的场景处理逻辑非常复杂,借助 Flink 强大的计算能力和丰富的时间语义,客户可以在 Flink 里完成建模。然后,把加工后的结果持久化到消息总线。StarRocks 可以去订阅对应的 Kafka 里的分层数据,再把结果同步过来。对于固定报表类的场景,往往聚焦在 ADS/DWS 层的聚合指标查询,要求查询有极高的性能。这种在 Flink 里计算,在 StarRocks 负责极速查询分析的方案就比较适合。有些场景,数据量不大,利用离线数仓跑批的思路,用调度系统在 StarRocks 里一层一层的做上去,也能实现数仓分层的建设。我们通常说的 OLAP 多维分析,一般会聚焦在 DWD 宽表和 DWS 轻度汇聚层,更灵活的 Adhoc 查询,可能还会对 ODS 原始数据进行查看。
前面聊的一些实时数仓建设的思路,大部分是建立在 append 流的基础上的。假定我们的数据只有追加,没有 Upsert/Delete 操作。在有更新的场景下,不管是增量构建,还是微批调度,都很难保证上层的聚合指标,下钻下来,还能跟明细层对应上。
已经有客户尝试在一些场景下,用 Primary Key 模型做 ODS,保证实时的数据 Upsert/Delete。然后,上面的分层用逻辑视图,保证聚合指标和明细的完全同步吻合。
另外,我们跟 Flink 去结合,如果只支持 append 流是远远不够的。那么 StarRocks 能不能解这个难题呢?答案是肯定的。
生活中我们总说覆水难收,比喻事情已成定局,难以挽回。但是强大的 Flink,就有回撤流这种功能,这里提供了一个词频统计的简单 SQL。
可以看到,在新的数据“StarRocks, 1”进来后,如果没有数据回撤,来标记上一轮 Sink 出去的数据失效的情况下,再叠加新进来的数据,就会造成结果的错误。反之,有了 Flink Retract,可以收回上一批次的结论,然后吐出正确的指标。
这种情况下,Flink 端能搞定回撤的问题了,但是 OLAP 端怎么办呢?如果没有高效稳定的 Upsert/Delete 能力,非常容易造成数据的重复和结果的错误。
在一年前,我们在 1.9 版本中发布了新的存储引擎 Primary Key 表模型,在支持实时更新的同时,还能保持查询的高性能。它内置了 OP 字段,以 0 或 1 的形式来标记数据的 Upsert/Delete,恰恰吻合了 Flink 回撤流的数据特征。结合我们提供的 Flink Connector,可以直接将 Flink 的回撤流,对接进 Primary Key 模型。
它基于 Delete+Insert 的方式或者叫 merge-on-write 的方式,实现更新。相比原来 merge-on-read 的 unique 模型,在导入性能几乎不受影响的前提下,查询性能提升了 3-10 倍。
它非常适合 TP->AP 实时同步数据,并加速查询的场景。通过 Flink-CDC 工具,将 TP 业务系统,比如 MySQL 直接同步到 StarRocks,极大的简化了实时分析数据流,简单易用。目前,己经有多个用户在线上系统中采用,是实时数据分析的典型范式。
2022 年,我们对 pk 模型做了持久化主键索引的功能,来降低主键模型的内存开销。原来的主键索引是基于全内存哈希表的,新的持久化索引同样使用了基于 Hash 的设计,并且使用了类似 LSM 的多层设计。
第一层 Hash 为内存 Hash 表,第二层是基于磁盘的 Hash 表结构。为了节约存储空间,使用了类似原全内存 Hash 表的 Shard by length 设计。测试结果显示,内存占用一般只有原来 1/10。由于查询索引本质上,是大量的随机 IO 操作,如果需要持久化索引,推荐使用固态硬盘。
这个是我们导入测试时的内存对比,左边是 BE 进程的内存总占用,右边是索引的内存占用。在全内存索引模式下,随着数据持续导入,总内存最高到 120G,索引内存最高到 60G 左右。
在持久化索引模式下,随着数据持续导入,总内存最高到 70-80G,索引内存最高到 3-4G 左右,内存使用下降非常明显。
另一个 Feature 是,部分列更新的支持。在去年的 FFA 峰会上,我分享了基于聚合模型的 replace_if_not_null,来实现部分列更新的方法。使用这个方法有一定的开发成本,开发者需要把宽表的下标凑齐,没有数据的位置需要显式的去补 null 值。
今天谈的 PK 模型的部分列更新功能,开发成本会更低,数据接入时只需要指定该数据流的相关列名即可。虽然 SR 在多表查询方面性能非常好,但是在一些场景下,用户还是期望大宽表带来的极速性能。
目前,如果想要实现这个效果,有几个常见方案。
-
在上游数据流中插入一个 Join 模块或者算子,通常使用 Flink 等流式计算平台。用多流 Join,拼成整行数据。
如果上游多个数据流的数据到达时间不一致,很难设计合适的 window 去在计算引擎里打宽数据,启用 mapState 之类的状态计算又过于定制,迭代效率又是个问题。
用 TP 系统建宽表。上游模块以部分列更新方式写入 TP 系统,再通过 TP 系统,同步给 AP 系统。这样需要额外搭一套 TP 模块和同步模块。
先分模块导入 AP 系统,AP 系统中通过 DML 定期做 Join,后置的定期去刷新大宽表,这样会牺牲一定实时性。
这三种方式都有一定的复杂度,如果 SR 能够直接支持部分列更新,将带来全新的思路,能很好的解决这个问题,简化多流 Join 的链路。
从 2.3 版本开始支持了部分列更新功能,实现方式还是以现有的 Insert+Delete 模式为基础,流程可以参考图中的例子。我要把第一列为 3 的那行最后那个值列 c 改为 y。需要先找到 3 所在的行,然后把跟本次更新无关的列带出来,标记这行为 Delete,然后再追加更新后新的行进去。
剩下的操作就和原来的 Full Row Upsert 类似了。由于采用 Delete+Insert 的方式,实现部份列更新,读写放大问题其实对这种用法造成了一定的限制,特别是对大宽表仅更新很少一部分列的情况。
比如有个表有 10000 列,我们只更新其中的一列。需要先读取其余的 9000 多列,再写入全部 10000 列。所以,我们目前推荐部份列更新,仅在列不是特别多的场景下使用(比如小于 500 列的情况下),并且尽量在固态盘上使用这个功能。为了部份解决这个问题,我们后面打算引入行存,这样能够解决一部分读放大的问题。
另一个 Feature 是,条件更新。在导入时添加了条件的功能,只有条件满足时,才进行更新。常用的场景比如导入的数据有乱序,或者由于并发导致的数据乱序。
为了防止乱序数据覆盖正确的数据,一般会设计一个时间戳字段。这样在更新时可以指定一个条件,当时间戳大于当前时间,才进行更新操作,否则忽略该行。
在 Flink-CDC 同步时,如果任务并发非常高,导致事务数量较多的话,我们新增加了基于 Stream Load 的事务导入接口,可以将多个导入任务合并成一个事务。在定期 Sink 开始前,开启事务。然后,并行写入数据。最后,全部 Task 完成数据传输后,整体提交事务。
这样上面的例子中,总事务数就从 4 个减少到了 1。高频导入的瓶颈本质上是事务数量高的问题,降低事务数量,就可以提升实时导入的能力。
三、StarRocks Connector For Apache Flink
接下来,我们看下 StarRocks 怎么和 Flink 通过 Connector 来整合。上图描绘了 Flink Connector 的整体情况,StarRocks 提供了 Source Connector。用户可以把 StarRocks 的表作为数据源,用 Flink 分布式的提取 StarRocks 的数据。可以用于跨机房的数据迁移,或者基于 Flink 做进一步复杂的分布式处理。
Sink Connector 主要是把 Flink 内存里的数据,走 StarRocks 的向量化导入接口,将实时的流数据高效的导入到 StarRocks。
之前客户为了实现 Flink 读 StarRocks 表,需要自己定制 Source,以 MySQL JDBC 的形式读取数据,BE 的数据最终需要单点抽上来,效率较差。
StarRocks 提供的 Source Connector,进行了分布式设计。先在 FE 找到对应的分片元数据信息,然后分布式的直接从存储层提取数据,整体的吞吐能力大大提升。
Sink Connector 的使用会比 Source 更多,借助 Flink 强大的流批一体处理能力,可以处理流式消息,也可以抽取 TP 数据库的数据,乃至于 Hive 数仓的数据。经过 Flink 的加工之后,通过 Sink Connector,走 Stream Load 接口,同步到 StarRocks。
这里举个部分列更新的例子,原来有“101,Tom,80”的记录。现在需要追加一些新的数据,并做数据更新。目标是要把 101 的 Tom 改为 Lily。我们看到,对于接口侧,只需要指定主键 id 列和需要更新的 name 列,按照正常数据导入的形式导入就行。
在 Flink-Connector 配置也非常简单,和 Stream Load 接口用法一致,只需要启用 partial_update,然后指定数据的列名就可以了。
四、客户实践案例
接下来,分享一个实践案例。主要是关于京东物流,用 Flink 和 StarRocks 的 Primary Key 模型,来解决分析和服务一体化的问题。
首先,看下京东物流的数据特色,以及面临的主要挑战。大家知道在京东 APP 下单之后,订单由商城域进入物流域,从仓储进分拣,从配送到妥投,最后到消费者手里。整体的物流业务很复杂,流程也很长。随着实体包裹在物流位置上的运输,数据经过了很多环节,还有物流批次的一些行业概念,又多了一些时间维度。整体的数据环节复杂,呈现出多维立体分布的特征。
除此之外,流程复杂,业务系统也很多。因为既有架构演进的历史原因,仍然存在多种数据源。所以京东物流的数据架构上,比较依赖联邦查询的能力。
在业务层面,有很多宏观的数据汇总指标,来指导生产。需要把聚合后的结果,进行关联出统一数据指标。所以聚合查询,和多表关联也是一个特色。
我们可以看下京东物流的早期架构,整体分为数据服务链路和数据分析链路,这两条线基本上是割裂的。
数据服务的链路,由 Flink 消费消息队列的数据,送入到一些数据库产品当中。主要有 ClickHouse,ES 还有 MySQL,为了接口能够提供稳定高效的查询服务。而灵活分析的场景,没有固定的模型能够应对所有查询,可能会基于 Hive 或者 Presto,以 Adhoc 的形式完成比较复杂的 SQL。
总体分析,早期架构有这样一些问题。
- 数据源多样,维护成本比较高。
- 性能不足,写入延迟大,大促的场景会有数据积压,交互式查询体验较差。
- 各个数据源割裂,无法关联查询,形成众多数据孤岛。然后从开发的角度,每个引擎都需要投入相应的学习开发成本,程序复杂度比较高。
京东物流经历了大数据产业化的整个历程,数据应用多种多样,使用数据的方式也各不相同。于是,着力打造了 UData 平台,作为数据资产和数据应用之间的桥梁,以数据接入或者外表关联的形式,形成统一数据收口。StarRocks 作为 UData 平台的核心底座,支撑了两个主要场景:
以接口的形式,提供稳定可靠的数据服务模块。
给业务人员提供极速的灵活分析模块。让业务可以在数据指标地图上,在线查找自己需要的数据指标。
并且做了指标配置化开发、指标积木式编排、可视化 SQL 编辑等平台侧赋能,解决数据使用的最后一公里问题。
今年,京东物流用 StarRocks 在实时链路里,替换掉了 ClickHouse,解决查询并发和多表关联的瓶颈。然后,在实时服务链路和数据分析链路前面,将 StarRocks 作为数据的统一查询入口,实现分析和服务的一体化。
与此同时,京东物流也积极参与 StarRocks 社区的技术共建工作。基于既有架构实现的外表聚合下推和排序下推功能。可以在一些场合,有效降低网络带宽的传输,并且完善了 RPC 和 Http 接口的外表支持。
在部分场景开启聚合下推的功能,聚合查询性能有大幅提升。这里举例了一个 6 表关联的复杂 SQL,从之前的 30 秒优化到 6s,性能提升数倍。从右侧的监控也能看到,中间测试开启了一段时间的下推功能,QPS 有明显的提升。
在今年(2022 年)双 11,StarRocks 承载了运单实时数据更新和自助式分析查询的场景。这个场景的 QPS 不高,但是查询非常灵活,会基于数百列的表做任意的筛选和关联查询。另外一个特点是,运单的时间语义字段有好几个。比如下单时间、发货时间、妥投时间等等,而且时间数据会被修改,查询的时候不一定是用哪个字段去过滤。这就没有办法从中选出一个合理的分区字段用于分区,在查询时无法进行分区裁剪。
从左侧监控的示意图可以看到,中间有个非常大的 latancy 尖峰,就是因为这种 SQL 会有大量的 Scan,从而造成查询的高延迟。
为了解决这个问题,我们和京东物流的同学一起分析讨论了这种特殊的场景。一种解法是,把这些时间语义的字段,做一些归类梳理。然后,按不同的时间语义,拆成多个表使用。这种方法会大大的提升分区裁剪的效果,但是开发成本会比较大,需要上下游系统的联动。对于双 11 这样的时间紧任务重的大促活动,很难再做出比较大的结构性调整。
于是,我们尝试了在日期字段上加 Bitmap 索引的方案。如图所示,在加上索引后,Scan 有效得到降低,问题得到了解决。
利用 StarRocks 成功应对了双 11 的大促挑战之后,京东物流对未来技术架构做了进一步的规划。在计算层,打算把离线链路的 Spark 计算引擎优化掉,采用 Flink 实现流和批的一体化处理。在分析层,将逐渐优化掉既有的一系列分析组件,把 StarRocks 作为统一的底座,让数据分析和数据服务,收敛到 StarRocks 上来。
五、未来规划
接下来,分享下 StarRocks 在实时数据分析方面,会进一步做出哪些工作。
一方面从易用性角度,支持主键与排序键分离。在目前的主键模型中,可以认为 Sort Key 和 Primary Key 是统一在一起的。例如右边的例子,主键是 id。如果数据按照 id 排序,对 City 过滤的查询,就需要扫描全表,或者需要加其它二级索引,想办法加速过滤。
如果能把 Sort Key 和 Primary 拆分开,建表时可以让 Sort Key 和 Primary 用不同的字段。比如 id 作为主键,负责更新的唯一约束;City 作为 Sort Key,负责数据的存储顺序。这样就能加速一些常见的查询。
其实可以进一步思考,目前 StarRocks 的好几种表模型,duplicate/aggr/unique 面对不同场合,客户需要稍作选择。这些都可以统一到一种语法表达,统一之后,对用户来说,只有一种表更容易理解,可以进一步提升易用性。
另一个重要的方向是。多表物化视图。目前,主键模型并不支持 ROLLUP 和物化视图,而 ROLLUP 功能也比较有限。StarRocks 新一代的多表物化视图架构会支持更加高级的功能,包括透明的查询加速,离线的全量构建和实时的增量构建;可以写比较复杂的表达式,多表关联,子查询嵌套等等;以类似 Insert into overwrtie 的语义异步或者同步的自动完成物化。
增量物化视图的构建,需要能够提供表的增量修改数据。我们会在内部实现类似 binlog 的机制,来辅助实时增量多表物化的任务。
有了增量多表物化视图,更易用,性能又更好的 Primary Key 模型,乃至于部分列更新,LocalCache 加速等 Feature 的加持。未来实时数据分析或将迎来新的范式。人们不必再设计非常复杂的 ETL 架构,维护那么多的外围程序,做了很多无效的数据搬运工作,耗费了不少人力和物力,交付节奏总跟不上新需求迸发的速度。
未来,你有可能通过 Flink,将实时数据从消息队列或者 TP 的 CDC 秒级的持续稳定的摄入进 StarRocks。在 StarRocks 里,直接去面向分析,开发我们的指标和模型。外表的形式也好,逻辑视图也好,CTE 也好,以最敏捷的形式快速迭代开发。开发完 SQL 逻辑之后,结合场景,哪些指标是需要高并发低延迟的服务的?哪些层是需要反复被其它下游频繁调用的?不管是宽表,还是多表关联的复杂 SQL,我们都可以按需的去上卷或构建物化。直面分析,按需加速,让数据分析的链路更经济,效率更好。
实时即未来,StarRocks 在逐渐实现这样的能力,StarRocks 和 Flink 结合去构建实时数据分析体系的联合解决方案,将在一定程度上颠覆既有的一些禁锢,形成实时数据分析新范式。