摘要:本文整理自 SelectDB 资深大数据研发专家王磊,在 FFA 2022 实时湖仓专场的分享。本篇内容主要分为四个部分:
实时数仓需求和挑战
基于 Apache Doris 和 Apache Flink 构建实时数仓
用户案例与最佳实践分享
新版本特性
一、实时数仓需求和挑战
在数据流的角度上,分析一下传统的数据架构。从图中可以看到,数据分为实时数据流和离线数据流。
在实时数据部分,通过 Binlog 的方式,将业务数据库中的数据变更,采集到实时数仓。同时,通过 Flume-Kafka-Sink 对进行实时采集。当数据源上不同来源的数据都采集到实时数仓后,便可以构建实时数仓。在实时数仓构建的内部,我们仍然会遵守数仓分层理论,将数据分为 ODS 层、DWD 层、DWS 层、ADS 层。
在离线数据流部分,通过 DataX 定时同步的方式,采集业务库 RDS 中的一些数据。当不同来源的数据,进入到离线数仓后,便可以在离线数仓内部,依赖 Spark SQL 或 Hive SQL,对数据进行定时处理。
然后,分离出不同维度(ODS、DWD、ADS 等)的数据,并将这些数据存在一个存储介质上。我们一般会存在 HDFS 或 S3 的对象存储上。通过这样的方式,我们的离线数仓便构建起来了。与此同时,为了保障数据的一致性。我们需要一个数据清洗任务。使用离线数据对实时数据进行清洗,保障数据最终的一致性。
如上图所示,我们在技术架构的角度,对传统数据架构进行分析。从图中可以看到,不同应用采取不同技术栈。比如在湖仓部分是 Hive、Iceberg、Hudi 等组件。在湖仓之上的 Ad-hoc,我们选择 Impala 或 Kudu。对于 OLAP 的高并发,我们会使用 Druid 或 Kylin 组件。
除此之外,业务还有半结构化的需求。我们会使用 ES 对日志进行检索和分析;使用 HBase 构建高效的点查服务。在部分情况下,业务需要对外提供统一的数据服务。这时,部分业务会使用 Presto 或 Trino 的查询网关,统一对用户提供服务。
这个架构有什么问题呢?首先,架构组件多,维护复杂。其次,它的计算、存储和研发成本都比较高。主要因为我们同时维护两套数据和两套计算,即实时数据流和实时计算任务。最后,我们无法保障实时流的数据和离线数据的一致性。此时,只能通过离线数据定时清洗,保证数据的一致性。
基于以上痛点,一个“统一的实时”架构呼之欲出。“统一”是指数据结构的统一,我们希望半结构化和结构化数据能够统一存储在一起。同时也指的是在一个平台中能完成多种分析和计算场合(实时查询,交互式分析,批量处理等)“实时”指的是我们希望数据的接入和数据分析都能实时的进行。
二、基于 Apache Doris 和 Apache Flink 构建实时数仓
基于以上需求,如何通过 Apache Doris 和 Apache Flink 构建实时数仓?满足用户对实时应用和统一架构的需求。
首先,介绍一下什么是 Apache Doris?Apache Doris 是基于 MPP 架构的高性能的实时分析型数据库。它以极速易用的特点被人们所熟知,Doris 只需亚秒就能完成海量数据(TB 级别或者百亿级别数据)的查询。不仅支持高并发(点查、ad-hoc)的查场景,也支持高吞吐量(ETL 任务)的查询场景。
因此,Apache Doris 能较好的满足高并发报表、即席查询、统一数仓服务、以及湖仓和联邦查询等功能。用户在此之上,可以构建用户行为分析、AB 实验、日志检索分析、用户画像分析等应用。
从下图中我们可以看到,数据源经过各种数据集成和加工后,通常会入库到实时数仓 Doris 之中。同时,部分数据会入到湖仓架构的 Hive 或 Iceberg 中。然后,基于 Doris 构建一个统一的数仓,对外提供服务。
接下来,看一下如何基于 Apache Doris 构建极速易用的实时数仓的架构?因为 Doris 既可以承载数据仓库的服务,也可以承载 OLAP 等多种应用场景。
因此,它的实时数仓架构变得非常简单。我们只需要通过 Flink CDC 将 RDS 的数据,实时同步到 Doris。通过 routine load 将 Kafka 等消息系统中的数据,实时同步到 Doris。在 Doris 内部,基于 Doris 不同的表模型、Rollup、以及物化视图的能力,构建实时数仓。
Doris 的表模型分为明细模型、主键模型和统计模型。在 Doris 内部构建实时数仓时,ODS 层通常会使用明细模型构建。DWD 层通过 SQL 调度任务,对 ODS 数据抽取并获取。DWS 和 ADS 层则可以通过 Rollup 和物化视图的技术手段构建。
除此之外,Doris 还支持基于 Iceberg、Delta Lake 和 Hudi 的数据湖服务,提供一些联邦分析和湖仓加速的能力。这样我们便完成了基于 Doris 构建一个实时数仓。在实时数仓之上,我们可以构建 BI 服务、Adhoc 查询、多维分析等应用。
架构明确之后,我们看一下在实践过程中会有哪些挑战?首先,大家最关心的问题就是数据的一致性保证。
数据一致性一般分为“最多一次”、“至少一次”和“精确一次”。
- 最多一次是指,发送方仅发送消息,而不期待任何回复。在这种模型中,数据的生产和消费过程,都可能出现数据丢失。
- 至少一次是指,发送方不断重试,直到对方收到为止。在这个模型中,生产和消费过程都可能出现数据重复。
- 精确一次能够保证消息,只被严格发送一次,并且只被严格处理一次。这种数据模型,能够严格保证数据生产和消费过程中的准确一致性。Doris 基于两阶段提交,实现准确的数据一致性。
Flink CDC 通过 Flink Checkpoint 机制结合 Doris 两阶段提交,实现端到端的数据写入一致性。具体过程分为四步。
第一步,事务开启(Flink Job 启动和 Doris 事务开启):当 Flink 任务启动后,Doris 的 sink 会发起一个 precommit 请求,开启一个写入事务。
第二步,数据传输(Flink Job 的运行和数据的传输):在 Flink Job 运行过程中,Doris sink 会不断从上游算子获取数据,并通过 httpchunked 的方式,持续将数据传输到 Doris。
第三步,事务预提交:当 Flink 开始进行 Checkpoint 时,Flink 会发起一个 Checkpoint 请求。此时,Flink 的各个算子会进行 Barrier 对齐和快照保存。Doris Sink 会发出停止 Stream Load 写入的请求,并发起一个事务提交请求到 Doris。
此时,这批数据已经完全写入 Doris BE 中,BE 还没有进行数据发布。因此,写入 BE 的数据对用户来说是不可见的。
第四步,事务提交。当 Flink 的 Checkpoint 完成之后,它会通知各个算子。此时,Doris 会发起一次事务提交到 Doris BE。BE 会对此次写入的数据,进行发布。最终完成数据流的写入。
综上所述,是 Flink CDC 结合 Doris 两阶段事务,完成数据一致性提交的过程。这里有一个问题是,当预提交成功,但 Flink Checkpoint 失败时,该怎么办?这时 Doris 并没有收到事务最终的提交请求,Doris 内部会对写入数据进行回滚(rollback),从而保证数据最终的一致性。
下面来看一下数据增量和全量的同步。如何基于 Flink CDC 实现全量和增量的数据同步?
这个原理很简单,因为 Flink CDC 实现了基于 Snapshot 的全量数据同步,以及基于 BinLog 实时增量的数据同步。与此同时全量数据同步和增量数据同步可以自动切换。因此。在数据迁移的过程中,用户只需要配置好同步的表即可。当 Flink 任务启动时,首先会进行历史表的数据同步。同步完成之后,它会自动切换成实时同步。
完成实时数据同步之后,用户又产生了 RDS Schema 的变更需求。因为随着业务的发展,RDS 表结构会产生变更,用户希望 Flink CDC 不但能够将数据变化同步到 Doris,也希望将 RDS 表结构的变更也同步到 Doris。这样的话,用户不用担心 RDS 的表结构和 Doris 表结构不一致的问题。
要满足这种 DDL 同步的需求,前提是 Doris 能够快速支持这种 Schema 的变更。当很多 Schema Change 请求到来后 Doris 能够快速处理。由于 Doris 处理 Schema Change 时,相对比较耗时,所以我们引入了 Light Schema Change。
Light Schema Change 的实现原理非常简单,我们只需要在加减列时,对 FE 中的元数据进行修改,将修改后的数据持久化。Schema Change 只需要更新 FE 中的元数据即可,因此 Schema Change 的效率就非常高。
与此同时,这个响应过程是同步的过程。由于 Light Schema Change 只修改了 FE 的元数据,并没有同步给 BE。因此,会产生 BE 和 FE Schema 不一致的问题。
为了解决这种问题,我们对 BE 的写出流程进行了修改,具体包含三个方面。
- 数据写入:对于数据写入而言,FE 会将 Schema 持久化到元数据中。当 FE 发起导入任务时,会把最新的 Schema 一起发给 Doris BE,BE 会根据最新的 Schema 对数据进行写入,并与 RowSet 进行绑定。将该 Schema 持久化到 RowSet 的元数据中,实现了数据的各自解析。解决了写入过程中 Schema 不一致的问题。
- 数据读取:对于数据读取而言,FE 生成查询计划时,会把最新的 Schema 附在其中,并一起发送给 BE。BE 会拿到最新的 Schema,对数据进行读取,来解决读取过程中 Schema 发生不一致的问题。
- 数据 Compaction:对于数据的 Compaction 而言,当数据进行 Compaction 时,我们选取需要进行 Compaction 的 RowSet 中最新的 Schema,作为之后 RowSet 对应的 Schema。以此解决不同 Schema 上,RowSet 的合并问题。
经过 Light Schema Change 优化之后,Doris Schema Change 的性能,从之前一个 Schema 变化 310 毫秒,降低到了 7 毫秒。整体性能有近百倍的提升。彻底的解决了,海量数据的 Schema Change 变化难的问题。
有了 Light Schema Change 的保证,Flink CDC 能够同时支持 DML 和 DDL 的数据同步。那么在 Flink CDC 中,我们是如何实现的呢?
- 首先,我们要在 Flink CDC 的 MySQL Source 侧开启同步 MySQL DDL 的变更配置。然后,我们需要在 Doris 侧识别 DDL 的数据变更,并对其进行解析。
- 当 Doris Sink 发现 DDL 语句后,Doris Sink 会对这种表结构进行验证,验证表结构是否支持 Light Schema Change。当表结构验证通过后,Doris Sink 会发起 Schema Change 请求到 Doris,从而完成此次 Schema Change 的变化。
- 在具体使用过程中,Doris 如何开启 Light Schema Change 呢?我们只需要在表里添加,"light_schema_change" = "true"即可。当我们解决了基于 Flink 和 Doris 数据同步过程中,源数据的一致性、全量数据和增量数据的同步、以及 DDL 数据的变更后,一个完整的数据同步方案就完成了。
站在数据模型的角度,看一下如何基于 Flink 和 Doris,构建不同的数据模型。
第一种数据模型,也是最简单的数据模型。我们将一个 RDS 的表,同步到一个 Doris 的表中。通过 MySQL-Source 加 Doris-Sink,实现表结构和数据的变更。
第二种数据模型,我们可以将 MySQL 中两个表的数据同步到 Flink 后,在 Flink 内部进行多流 Join,完成数据打宽,生成一个大宽表数据,同步到 Doris 之上。
第三种数据模型,我们可以对上游的 Kafka 数据进行清洗,清洗后通过 Doris-Sink 写入 Doris 表中。
第四种数据模型,我们还可以将 MySQL 数据和 Kafka 数据,进行多流的 Join,然后写入 Doris。
第五种数据模型,我们在 Doris 侧建一个基于 Unique Key 模型的宽表。然后,将上游 RDS 中的数据,根据 Key 使用 Doris 的多列更新模式,将多列数据分布写入到 Doris 的大宽表中。
例如,我们首先将 MySQL 第一个表中的 status,同步到 Doris 中。其次,我们将 MySQL 第二个表中的,amount 字段同步到 Doris 的同一个大宽表模型中。
除此之外,我们还可以将多源的 RDS(MySQL、Oracle 等)数据同步到 Doris 后,在 Doris 内部使用宽表抽取的方式,构建一个大宽表。
对于大数据来说,高并发写入并不难,难点在于高并发更新。如何在上亿数据中快速找到需要更新的数据,并对其进行更新?这一直是大数据领域比较难的问题。
为了解决这个问题,Doris 通过 MVCC 多版本并发机制来实现。特别是在 Unique Key 模型中,当我们写入一个数据时,如果数据库中不存在数据,会写入一个版本。当我们再次对该数据进行更新时,会再次写入一个版本。此时,数据变更在 Doris 以多个版本的形式存在。当用户在查询时,Doris 会将最新版本对应的数据返回给用户。并在 compaction 时,对历史变更的数据进行清理。这种设计解决了海量数据的更新问题。同时,Doris 支持 Merge On Read 和 Merge On Write 两种方式。
首先,讲一讲 Merge On Read,它的特点是写入速度非常快。无论是 insert 语句,还是 update 语句,对于 Doris 来说,都是以多版本的方式写入 Doris。因此,它的写入性能很高。其次,它的查询性能不是很高。因为在查询过程中,需要对 Key 进行聚合去重,然后再执行查询。因此,它的性能不是很高。
举例:
- 我们在数据里面写入三条订单数据,数据是以 append 的形式写入 Doris 表中。
- 然后,我们对订单 1 的 cost 进行更新,写入一个新的版本的数据。当我们将订单 1 的 cost 修改为 30 时,数据通过 append 的形式,以新版本写入 Doris。
- 当我们对订单 2 的数据进行删除时,仍然通过 append 方式,将数据多版本的写入 Doris 并将_DORIS_DELETE_SIGN字段变为 1,此时,表示这条数据被删除了。
当 Doris 读取数据时,发现最新版本的数据被标记删除,就会将该数据从查询结果中进行过滤。除此之外,Doris 还有一个DORIS_SEQUENCE_COL列,保证在高并发更新的情况下,更新数据的一致性和顺序性。
下面看一下查询效果。当用户进行查询时,会进行两次聚合操作。第一次,使用多路归并排序,将重复的数据排列在一起,并使用高版本数据覆盖低版本数据的方式,对数据进行合并。
第二次,按照查询的聚合条件,对数据进行聚合。这样带来两个比较严重的查询性问题。第一,在多路归并的时候,它的代价较高,对 Key 进行比较,非常消耗 CPU。第二,在数据读取的过程中,我们无法进行有效的数据裁剪,会引入大量的 IO 操作。
下面来看一下 Merge On Write 如何实现高并发写入以及查询的性能问题。Merge On Write 兼容查询性能和写入性能。
Merge On Write 在写入的过程中,引入了 Delete Bitmap 数据结构,Doris 使用 Delete Bitmap 标记 RowSet 中某一行是否被删除。这里使用了兼顾性能和存储空间的 Row Bitmap,将 Bitmap 中的 Mem Table 一起存储在 BE 中,每个 segment 会对应一个 Bitmap。
为了保持 Unique Key 原有的语义,Delete Bitmap 也支持多版本。每次导入会产生该版本增量的 Bitmap。在查询时需要合并此前所有版本的 Delete Bitmap。
接下来,看一下基于 Delete Bitmap 的 Merge On Write 的写入流程。首先,DeltaWriter 会先将数据 flush 到磁盘。第二步,我们会批量检查所有 Key。在点查过程中,会经过一个区间树,查找到对应的 RowSet。然后,在 RowSet 内部通过 BloomFilter 和 index 高效查询。当查询到 Key 对应的 RowSet 后,便会覆盖 RowSet Key 对应的 Bitmap。然后在 publish 阶段更新 Bitmap,从而保证批量点查 Key 和更新 Bitmap 期间不会有新的可见 RowSet,保证 Bitmap 在更新过程中数据的正确性。除此之外,如果某个 segment 没有被修改,则不会有对应版本的 Bitmap 记录。
下面看一下 Merge On Write 的查询流程。首先,当我们查询某一版本数据时,Doris 会从 LRU Cache Delete Bitmap 中,查找该版本对应的缓存。如果缓存不存在,我们再去 RowSet 中读取对应的 Bitmap。最后,我们使用 Delete Bitmap,对 RowSet 中的数据进行过滤,将结果返回。
同时在这个阶段我们还可以通过 Bloomfilter 和 Bitmap 的二级索引来提高查询的效率,查询过程中 Doris 会进行有效的谓词下推。因此,在 Merge On Write 中,大幅提升了数据写入的查询的效率。
Doris 针对不同场景,提供了不同的数据模型,分别有明细数据模型、主键数据模型、聚合数据模型。其中,明细数据模型主要用来存储日志,数据来一条就存一条。主键数据模型指相同的 Key 会进行覆盖,比如根据订单 ID,对订单状态进行更新。
在统计模型方面,我们会将相同 Key 的 Value 列进行统计合并计算。比较典型的应用场景是报表统计和指标计算。比如根据门店 ID 和门店时间,对销售额进行统计计算。
对于统计模型来说,数据会边入库边统计。对于用户查询来说,直接查询统计后的数据,让查询过程更高效。
物化视图的概念。物化视图指,根据预定义的 SQL 分析语句执行预计算,并将计算结果物化,从而加速查询的一种手段。
对于 Doris 来说,物化视图主要用于聚合场景。除此之外,还可以用于聚合数据和明细数据的同时查询;以及匹配不同的前缀索引场景。
在物化视图使用方面,首先建立一个表,然后再建立一个物化视图。基于一张 Base 表,可以构建不同的物化视图基于不同的维度进行统计。当用户查询时,如果能够命中化识图,都会走物化视图;如果没有命中物化视图,会走 Base 表,从而完成高效查询。
对于数据更新来说,首先会更新 Base 表,然后更新物化视图。从而让 Doris 保证物化视图和 Base 表数据的完全一致性。
接下来,看一下物化视图的智能路由选择。Doris 通过物化视图加速查询,并且在查询过程中,自动进行路由选择。在查询数据的过程中,如果数据在物化视图中存在,会直接走物化视图;如果在物化视图中不存在,才会走 Base 表。
与此同时,Doris 的智能存储和现代化计算能力,也能快速完成数据查询。物化视图在智能路由的过程中,遵循最小匹配原则。只有查询的数据集比物化视图集合小时,才可能走物化视图。
它的智能选择过程包括最优选择和查询改写两个部分。首先,我们来看一下最优选择。最优选择包括两个部分,即过滤候选集和选择最优。
在过滤候选集过程中,当一个 SQL 语句过来,通过 Where 条件进行判断。Where 条件里查询的是 advertiser=1。由此可见,物化视图和 Base 表都有这个字段,这时的候选集是物化视图和 Base 表。
我们会判断 Group By 计算。Group By 字段是 advertiser 和 channel。这两个字段同时在物化视图和表中 Base。这时过滤的候选集仍然是物化视图和 Base 表。接下来,我们会过滤计算函数。比如在这里会执行 count(distinct user_id),然后对数据进行计算。由于 Count Distinct 的字段 user_id 在物化视图和 Base 表中都存在。因此过滤结果仍是物化视图加 Base 表。最后,进行最优选择。通过一系列计算,我们发现查询条件无论是 Where、Group By 还是 Agg function 关联的字段,在 Base 表和物化视图都存在。这时,我们需要进行最优选择。然后,Doris 经过计算发现 Base 表的数据远大于物化视图,即物化视图的数据更小。
由此可见,如果查询走物化视图,它的效率更高。最优的查询计划是物化视图。当我们找到最优的查询计划,会进行子查询改写,将我们的 Count Distinct 改写成 Bitmap。从而完成了物化视图的智能路由。完成智能路由之后,我们会将 Doris 生成的查询 SQL 发送到 BE 进行分布式查询计算。
Doris 数据分为两级存储。第一个是分区,第二个是分桶。我们可以按照天对数据在 Partition 级别进行分区。
接下来,我们按照 set 级别进行分桶。将一个分区的数据,分到不同的桶里。当我们在查询 set ID 时,就可以通过分区裁剪,快速定位数据,实现高并发的查询。
除此之外,Doris 还有很多优化查询的手段。比如一些索引能够加速查询,前 36 位会走前缀索引。Zone Map 索引、Bloom Filter 索引都能能快速完成数据的过滤计算。最后,Doris 内部在查询过程中,还做了各种各样的优化。
比如智能算子下推,选择更合适的数据模型。与此同时,Doris 在 Join 方面也有很大的优势,比如 Broadcast Join、Bucket Shuffle Join、Colocate、Shuffle Join,能够尽量减少 Join 过程中的数据 Shuffle。
三、用户案例与最佳实践分享
Doris 的应用场景主要应用在即时分析/交互分析、多维报表分析,实时数仓、湖仓加速&联邦分析的四大场景。在这四大场景上,我们可以构建一些指标监控、高频发报表、自助 BI、行为分析、AB Test 等各丰富的数据应用。
这是一个典型的基于 Doris 构建实时数仓的案例。下层的数据源是 RDS 业务库,文件系统数据、埋点数据。当数据实时接入 Doris 后,它基于 Doris 构建了一个实时数仓。
在数据接入过程中通过 DataX 进行离线数据同步;通过 Flink CDC 进行实时数据同步。然后,在 Doris 内部构建不同的数据分层,比如 ODS 层、DWS 层、DWD 层的数据。最后,在上层构建不同的数据应用,比如自助报表、自助数据抽取、数据大屏。
除此之外,它结合了自己的应用平台,构建了数据开发与治理平台,完成了源数据管理、数据分析等操作。用户在使用 Doris 后,又带来什么好处呢?
首先,它的业务计算从之前的两小时,减少到三分钟。全链路的更新报表从周级别,更新到十分钟级别。同时,Doris 基于高度兼容 MySQL,它的学习成本非常低。因此,报表签约工作是非常顺利的。
由于 Doris 的开发非常简单。开发周期从从周下降至天级。业务人员能快速满足业务需求的变化。
某运营商的服务的架构是通过 Flink CDC 将 RDS 的数据同步到 Doris 中。然后,通过 routine load 将上层日志中的数据接入到 Kafka 之后,将 Kafka 数据同步到 Doris 之中。然后,在 Doris 内部构建实时数仓。在数据调度时,它通过开源 Doris,完成数据调度。使用 Prometheus+Grafana 进行数据监控。
它采用 Flink+Doris 架构体系后,带来的好处是组件减少,解决了多架构下的数据的冗余存储,服务器资源节省了 30%,数据存储磁盘占用节省了 60%,运营成本大幅降低。该案例每天在用户的业务场景上,支持数万次的用户的在线查询和分析。
如上图所示,之前的架构是 Hadoop 数仓架构。因此它的组件比较丰富,有 RDS、HBase、Hive。除此之外,还有 Kafka 等技术栈。由此可见,它的技术栈非常复杂。
在使用 Doris 之后,它将 RDS 里的数据通过 Flink CDC,实时同步到 Doris 里,服务器资源成本得到了很大的降低。同时,也将数据的查询时间从 Spark 的 2~5 小时,缩短到十分钟,查询效率也大大提升。
在数据的同步过程中,它使用了 Flink CDC+MySQL,全量加增量的数据同步方式。与此同时,它还利用 Doris 的 Light Schema Change 特性,实时同步 Binlog 里的 DDL 表结构变更到 Doris,实现数据接入数仓零开发成本。
四、新版本特性
1.2 版本第一个新特性是增加了 Multi Catalog,用户可以无缝对接多种数据源。Doris 会自动进行 Schema 同步,它的性能是 Trino 的三倍。
除此之外,我们在主键模型可实时更新,实时更新场景下查询性能 10 倍以上提升。Light Schema Change 能将 DDL 数据更新,并将数据表结构的变更,拉近毫秒级别。在外表上,我们开始支持 JDBC 外表,以及 Array 类型数据类型,New Decimal 等各种情况。
重点介绍一下,之前我们支持 native UDF,大家的使用成本较高。现在,我们会支持 Java UDF,用户可以快速写一个 Java 类,就可以完成 Java UDF 的使用。
新版本相比在 1.1 之前的版本,有 3~5 倍的性能提升。同时,我们的性能也领先某业界标杆竞品两倍以上。上图是我们在 SSB-FLAT 的性能测试结果。