前言
- 数据字典 DataDictionary
- 数据血缘 DataLineage
- 元数据触发器 MetaTrigger
一. 血缘分析的导推形式
- sql
- kafka streams
- spark rdd
- flink datastream
二. 血缘分析的技术方案分析。
- 通过调度器反向推导血缘关系。
- 通过计算引擎系统提供的血缘分析接口进行收集。
- 通过计算引擎系统的解析过程源码进行提取
- 通用的sql解析器工具
三. queryparser内功心法.
- 语法解析过程
- 信息传播过程
- 特定的计算模型
- 如何新增新的数据库方言。
前言
最近在元数据的项目建设中,主要涉及了三方面的基础工作分析。
-
数据字典 DataDictionary
数据字典算是元数据的最基础功能,相当于元数据模型里面的实体。可以进行信息的查询及搜索,第三方API接口使用。目前最核心的实体包括三块:用户(User),数据集(Dataset),
数据管道(Datapipe)。[用户User实体] 可以通过ldap集成,可以用于关联元数据实体及后期资产管理,甚至集成后期的数据安全建设。
[数据集Dataset实体] 则是不同数据源的模式,可以额外包含很多细节的选项,包括样例数据,存储开销,健康度(字段描述完整性,后期集成数据加载质量检测)。
[数据导管Datapipe实体] 数据导管主要关注数据流通过程,分为几类。最常用的是sql,其实可以是kafka streams, spark rdd之类的代码。这里的数据导管属于静态资源。
-
数据血缘 DataLineage
前面定义了数据实体,接着就要定义数据关系。[User实体]关系由于重要度不太高,这里不做过多关注,留于后续考虑,甚至需要其它关系建立之后会有更多意义。
[Dataset实体]与[Datapipe实体]之间的关系主要由血缘分析这个过程完成。
我们定义这个数据关系为有向图导管DagConduit。
如果大家熟悉haskell conduit库的话,就会很自然想到conduit由pipe而来。DagConduit由Datapipe 推导而来,而Datapipe是依赖于Dataset。
这样每个DagConduit形成了一个局部关系图:
[Input Dataset] -> Datapipe -> [Output Dataset]
每个InputDataset又依赖于Datapipe,这样每个DagConduit输出自己的关系,在图数据库存储里面进行关联就会形成完整的dag关系图了。这里的DagConduit就是我们今天的主角,下一节讲会重点讲解。
元数据触发器 MetaTrigger
这里的主要意图是完成元数据驱动编程的思想。
当前主要考虑的点则是,通过血缘分析生成dag信息去触发airflow调度器系统。
当然元数据驱动的使用范围非常广泛,也是非常核心的内容。
后续会做更多的探索及进一步分享与交流。
一. 血缘分析的导推形式
血缘分析在这里主要是我们前文所说的DagConduit的推导过程。这个推导过程很大依赖于Datapipe。
因为元数据是个通用平台,可推导的范围还是非常多的,我们这里仅从实用角度考虑并介绍。
常见的etl过程主要分为sql, kafka streams, spark rdd, flink datastream,这几个。
sql是最常用的Datapipe,但是其变种也是最多的。
kafka streams, spark rdd, flink datastream则是另一种形式,会生成相应的topology相关的lineage,由于灵活度不高,可以使用对应的库进行提取。
这里我们重点关注sql血缘分析,因为sql这种dsl是最有表达力,最符合元数据模型的。
甚至kafka, spark, flink也绝大多数会提供sql接口。
由于本人大多时间关注于sql推导模型,未对其它模型做过多探索,将于后续交流及使用后补充相关内容。
二. 血缘分析的技术方案分析。
我们常见的血缘分析实现方式主要分为四种:
-
- 通过调度器反向推导血缘关系。
因为调度器跟DagConduit有着很密切的关系。我们很容易从调度器的每个DAG里面提取出对应的Datapipe以及它们的关系。
这种方案的可行性非常高,成本非常廉。但缺点也是很显而易见的,这种方案不能支持字段级别的血缘,而字段关系对于元数据非常重要,甚至是后续扩展数据质量及数据安全的重要思路之一。并且对于业务方面的影响分析也是非常有价值意义的。
-
- 通过计算引擎系统提供的血缘分析接口进行收集。
由于数据治理的重要性逐渐为人所认识,血缘分析功能也成为了etl工具箱的重要一环。所以hive就直接提供了血缘分析接口。
这种方式的优点是显而易成的,即支持了字段级别血缘分析,成本也是非常低的,对于大部分公司其实是够用的。但是缺点也是非常明显的。
因为计算引擎与血缘分析在一起,如果血缘分析的功能需要增加,计算引擎势必也要受到牵连。毕竟字段血缘分析在很多情况下,很多场景下并不是标准统一的。比如select a from t where b = 'c'。这里的a字段到底需不需要依赖于b的值呢?这里不同的需求可能找到的答案并不会一致。如果需要更一步进行统计信息获取后进行相关的血缘扩展分析,也是非常困难的,因为接口是非常固定有限的。
另外血缘分析在运行时执行,这样血缘分析已经与系统运行时有了直接的绑定关系。所以无法进行血缘分析驱动,将一个系统换成另一个系统。如果是sql解析出来的血缘分析,我们可以通过元数据触发器触发到各种各样的平台,是非常灵活的。
当然,这些弱点对于实际使用来说并不是特别致命。毕竟成本低,且简单可用是非常有吸引力的,系统耦合之类的事情虽然比较烂,但先用起来再说,无可厚非,所以atlas也是这么干的。所以我坚信atlas可以解决问题,但绝不是未来。并且这里面还隐藏着另一个大的隐患,并不是所有etl工具箱都会给你提供血缘接口,原因很明显。对于成熟的产品,绝不会让血缘分析时与计算引擎绑定在一起受波折。
所以,sql血缘解析才是未来。
-
- 通过计算引擎系统的解析过程源码进行提取
这个应该是最常用的做法了,成本虽然高一点,但是整体可控,毕竟基础代码完成了很大一部分功能了,还可以按需定制,对于个人成长及公司的定制化需求都是非常令人满意的。毕竟互联网公司大部分使用开源产品,开源的世界任你折腾。
这里面困惑其实也不少。如果是商业产品,如果商家没有提供解析器,基本上就非常无奈了。对于每套计算引擎各自的规则可能不一样,致使门槛非常高了,甚至出现了专用的商业公司去统一解决这个问题。甚至对于字段级血缘提取,代码的功力也是非常有要求的。并且开源软件为了自己的一些内部功能,在代码里面渗杂了过多的东西,致使学习成本可以吓退不少人了。
-
通用的sql解析器工具
由于前面的问题,所以大家都想要一套通用的sql解析工具去专门解决sql分析的问题。但是这个里面涉及一个非常严重的问题,就是成熟度。由于sql解析过程有非常大的工作量,需要对语法规则非常熟练,写起来繁杂的工作量及系统多样性很容易让人退缩。所以市面上一直找不到成熟的通用的sql解析器。所以这里面就回归到了我们的主角,queryparser由此而生。queryparser是uber公司开发的,成熟的多年应用到了生产系统。目前支持hive, queryparser, vertica, teradata。对于pg系统,有由专用的gpu数据库公司SQream员工开发的解析器hssqlppp。这两套系统采用通用的解析器内核parsec。
那么在这里放出地址给各位:
- hive解析器
https://github.com/uber/queryparser/tree/master/dialects/hive - presto解析器
https://github.com/uber/queryparser/tree/master/dialects/presto - vertica解析器
https://github.com/uber/queryparser/tree/master/dialects/vertica - teradata解析器
https://github.com/LeapYear/queryparser/tree/leapyear-master/dialects/teradata - pg解析器
https://github.com/JakeWheat/hssqlppp
这个queryparser强大在哪里呢?
一看吓死人,核心代码只要7000多行。而sql结构定义就占了3000多行,也就是说逻辑只有4000行左右,擅抖吧,奥利给! - hive解析器
[nix-shell:~/my-repo/queryparser/src]$ cloc .
20 text files.
20 unique files.
0 files ignored.
github.com/AlDanial/cloc v 1.84 T=0.08 s (251.9 files/s, 121135.2 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Haskell 20 1724 562 7332
-------------------------------------------------------------------------------
SUM: 20 1724 562 7332
-------------------------------------------------------------------------------
[nix-shell:~/my-repo/queryparser/src]$ find . -name "*.hs" | xargs -I {} wc -l {} | sort -nr
2334 ./Database/Sql/Type/Query.hs
1125 ./Database/Sql/Type.hs
836 ./Database/Sql/Util/Scope.hs
647 ./Database/Sql/Type/Names.hs
641 ./Database/Sql/Type/Schema.hs
505 ./Database/Sql/Util/Columns.hs
438 ./Database/Sql/Util/Tables.hs
403 ./Database/Sql/Util/Eval.hs
389 ./Database/Sql/Info.hs
383 ./Database/Sql/Util/Joins.hs
372 ./Database/Sql/Util/Lineage/ColumnPlus.hs
372 ./Database/Sql/Type/Scope.hs
361 ./Database/Sql/Util/Schema.hs
216 ./Database/Sql/Util/Eval/Concrete.hs
159 ./Database/Sql/Util/Lineage/Table.hs
147 ./Database/Sql/Type/TableProps.hs
109 ./Database/Sql/Position.hs
66 ./Database/Sql/Pretty.hs
64 ./Database/Sql/Helpers.hs
51 ./Database/Sql/Type/Unused.hs
三. queryparser内功心法.
平复一下内心的平静,让我们来了解一上queryparser到底是怎么实现的。
queryparser包含了核心逻辑的实现(大部分就是标准sql逻辑),每个方言有各自特写的扩展,我们这里以queryparser-hive为例。
queryparser-hive将一些特定sql的功能处理掉,核心功能交给 queryparser来处理。由于我们接触的大部分是标准sql,我们在这里分析标准的sql语句,在后续源码篇里面我们会进一步介绍详细介绍queryparser-hive与queryparser的整体过程。
整个解析过程分为三个步骤:
- parseManyAll 语法解析过程
将文本打碎(词法分析)并组装成结构化(语法分析) - resolveHiveStatement 信息传播过程
遍历数据结构,进行外部的catalog关联,以及相应信息的传递,属于通用的信息加工逻辑阶段。 - getColumnLineage 特定的计算模型
计算模型分为很多种,有各自的需求定义。在提供的接口中,用户实现特定的逻辑完成所需的功能。
queryparser提供了两种模型,一种是字段血缘模型,一种是数据计算模型。当然我们可以随意实现接口定制自己的模型。
比如通过实现不同的sql逻辑处理方法将sql语句转换成对应的http restful请求。
一个类似的功能可参照:
SELECT status, content_type, content::json->>'data' AS data
FROM http_patch('http://httpbin.org/patch', '{"this":"that"}', 'application/json');
https://github.com/pramsey/pgsql-http
1. 语法解析过程
语法解析常用的实现分为两种:
一种是parser generator,就是你写好规则,自动帮你生成解析代码,比如常见的antlr就是。这种方式的特写是性能高,效率高。缺点则是定制化扩展弱,异常信息非常难读懂。
另一种是parser combinator,这种就是你自己手写语法规则,非常灵活,效率会低一点,可控性比较强。
queryparser就是使用了parser combinator方式进行解析,所以我们可以自由定制。
语法解析的目地是生成AST,简单来说就是一个递归的数据结构,有点类似json。在语法解析过程中要做的工作就是,定义数据结构,然后把信息塞进去。其实难度并不大,但是由于sql规则的复杂度,整个工作量并不轻松。
语法规则的过程分为两部分:词法解析及语法解析。
为什么需要词法解析呢?因为sql跟平常我们所说的csv文件并不一样,支持可以使用分隔符来定义结构。有些东西在不同的环境下有不同的意义。比如任何事物在注释里面它就没有效果了,里面可能还有变量声明,关键字信息等。
所以词法分析器就是按sql的基本单元进行拆分,然后语法分析器将其组装起来形成递归的结构体。为了便于理解,我们给出实际代码里面的词法结构及逻辑片段。
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Token.hs
data Token = TokWord !Bool !Text
| TokString !ByteString
| TokNumber !Text
| TokSymbol !Text
| TokVariable !Text VariableName -- the Text is for namespace, the
-- Token is the param name which
-- may be another TokVariable!
| TokError !String
deriving (Show, Eq)
可以看出,hive的基本单元分为了TokWord(通用名称), TokString(字符), TokNumber(数字), TokSymbol(符号), TokVariable(变量)
这个解析的过程叫做scanner,就是常用的字符处理生成Token流。
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Scanner.hs
有了Token流,我们进行组合生成AST。
那么我们可以看HIVE AST的结构定义:
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Type.hs
data HiveStatement r a = HiveStandardSqlStatement (Statement Hive r a)
| HiveUseStmt (Use a)
| HiveAnalyzeStmt (Analyze r a)
| HiveInsertDirectoryStmt (InsertDirectory r a)
| HiveTruncatePartitionStmt (TruncatePartition r a)
| HiveAlterTableSetLocationStmt (AlterTableSetLocation r a)
| HiveAlterPartitionSetLocationStmt (AlterPartitionSetLocation r a)
| HiveSetPropertyStmt (SetProperty a)
| HiveUnhandledStatement a
然后hive的HiveStandardSqlStatement包包含的Statement定义在queryparser核心结构里面,由于sql结构定义query部分比较多,单独拎出来形成了两个文件:
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type.hs
data Statement
d -- sql dialect
r -- resolution level (raw or resolved)
a -- per-node parameters - typically Range or ()
= QueryStmt (Query r a)
| InsertStmt (Insert r a)
| UpdateStmt (Update r a)
| DeleteStmt (Delete r a)
| TruncateStmt (Truncate r a)
| CreateTableStmt (CreateTable d r a)
| AlterTableStmt (AlterTable r a)
| DropTableStmt (DropTable r a)
| CreateViewStmt (CreateView r a)
| DropViewStmt (DropView r a)
| CreateSchemaStmt (CreateSchema r a)
| GrantStmt (Grant a)
| RevokeStmt (Revoke a)
| BeginStmt a
| CommitStmt a
| RollbackStmt a
| ExplainStmt a (Statement d r a)
| EmptyStmt a
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Query.hs
data Query r a
= QuerySelect a (Select r a)
| QueryExcept a (ComposedQueryColumns r a) (Query r a) (Query r a)
| QueryUnion a Distinct (ComposedQueryColumns r a) (Query r a) (Query r a)
| QueryIntersect a (ComposedQueryColumns r a) (Query r a) (Query r a)
| QueryWith a [CTE r a] (Query r a)
| QueryOrder a [Order r a] (Query r a)
| QueryLimit a (Limit a) (Query r a)
| QueryOffset a (Offset a) (Query r a)
接下来就是把token组装起来。
由于每个方言组装逻辑语法规则不一样,所以各自有自己的实现,核心接口并未提供。
https://github.com/uber/queryparser/blob/master/dialects/hive/src/Database/Sql/Hive/Parser.hs
statementParser :: Parser (HiveStatement RawNames Range)
statementParser = do
maybeStmt <- optionMaybe $ choice
[ HiveUseStmt <$> useP
, HiveAnalyzeStmt <$> analyzeP
, do
let options =
-- this list is hive-specific statement types that may be
-- preceded by an optional `WITH` and an optional inverted
-- `FROM`
[ (void insertDirectoryPrefixP, fmap HiveInsertDirectoryStmt . insertDirectoryP)
]
prefixes = map fst options
baseParsers = map snd options
_ <- try $ P.lookAhead $ optional withP >> invertedFromP >> choice prefixes
with <- option id withP
invertedFrom <- invertedFromP
let parsers = map ($ (with, invertedFrom)) baseParsers
choice $ parsers
, try $ HiveTruncatePartitionStmt <$> truncatePartitionStatementP
, HiveUnhandledStatement <$> describeP
, HiveUnhandledStatement <$> showP
, do
_ <- try $ P.lookAhead createFunctionPrefixP
HiveUnhandledStatement <$> createFunctionP
, do
_ <- try $ P.lookAhead dropFunctionPrefixP
HiveUnhandledStatement <$> dropFunctionP
, HiveStandardSqlStatement <$> statementP
, try $ HiveAlterTableSetLocationStmt <$> alterTableSetLocationP
, try $ HiveUnhandledStatement <$> alterTableSetTblPropertiesP
, alterPartitionP
, HiveSetPropertyStmt <$> setP
, HiveUnhandledStatement <$> reloadFunctionP
]
case maybeStmt of
Just stmt -> terminator >> return stmt
Nothing -> HiveStandardSqlStatement <$> emptyStatementP
where
terminator = (Tok.semicolonP <|> eof) -- normal statements may be terminated by `;` or eof
emptyStatementP = EmptyStmt <$> Tok.semicolonP -- but we don't allow eof here. `;` is the
-- only way to write the empty statement, i.e. `` (empty string) is not allowed.
解析过程并不复杂,就是按结构体深度递归,跟json解析差不了多少。
整个过程就是,至上而下解析,解析一项,如果失败了则尝试其它项,深度递归下去。
体力活比较重,因为规则多,每一项都要写规则尝试匹配。
就这样,整个结构体就组装起来了。
2. 信息传播过程
其实queryparser的信息传播过程非常简单,就是单纯从catalog里面去查询表信息,关联到表生成的字段,然后将以前的表名转换成表信息,生成引用信息。
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Type/Scope.hs
具体内容我们可以看前后结果对比
data RawNames
deriving instance Data RawNames
instance Resolution RawNames where
type TableRef RawNames = OQTableName
type TableName RawNames = OQTableName
type CreateTableName RawNames = OQTableName
type DropTableName RawNames = OQTableName
type SchemaName RawNames = OQSchemaName
type CreateSchemaName RawNames = OQSchemaName
type ColumnRef RawNames = OQColumnName
type NaturalColumns RawNames = Unused
type UsingColumn RawNames = UQColumnName
type StarReferents RawNames = Unused
type PositionExpr RawNames = Unused
type ComposedQueryColumns RawNames = Unused
instance Resolution ResolvedNames where
type TableRef ResolvedNames = RTableRef
type TableName ResolvedNames = RTableName
type CreateTableName ResolvedNames = RCreateTableName
type DropTableName ResolvedNames = RDropTableName
type SchemaName ResolvedNames = FQSchemaName
type CreateSchemaName ResolvedNames = RCreateSchemaName
type ColumnRef ResolvedNames = RColumnRef
type NaturalColumns ResolvedNames = RNaturalColumns
type UsingColumn ResolvedNames = RUsingColumn
type StarReferents ResolvedNames = StarColumnNames
type PositionExpr ResolvedNames = Expr ResolvedNames
type ComposedQueryColumns ResolvedNames = ColumnAliasList
data RTableName a = RTableName (FQTableName a) SchemaMember
deriving (Generic, Data, Eq, Ord, Show, Functor, Foldable, Traversable)
data SchemaMember = SchemaMember
{ tableType :: TableType
, persistence :: Persistence ()
, columnsList :: [UQColumnName ()]
, viewQuery :: Maybe (Query ResolvedNames ()) -- this will always be Nothing for tables
} deriving (Generic, Data, Eq, Ord, Show)
可以看到,sql解析之前表名,列名信息就是简单的字符OQTableName, UQColumnName。
解析之后就映射成了实际的各种形式。resolve出的RTableName就包含了从catalog里面查询出来的表的所有列员。
这个信息可以用来解析insert into时不带字段名,以及select *时的一些模糊逻辑。
当然本人觉得如果sql够标准的话,表的字段信息是完成可以从传播过程中自动推导的,而不用依赖于外部提供的catalog模式信息,这个也是本人正在尝试加强信息传播过程中优化的方向之一。
整个resolve过程代码:
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Scope.hs
由于是概述篇,就不展开讲述,后续将在源码篇里详细介绍。
3. 特定的计算模型
有了前面两部分过程之后,我们的结构体经过遍历关联处理有了更详细的信息。
最后一步则是通过这个结构体去做对应的计算。
因为整个遍历过程跟处理过程,相似度比较高。
所以queryparser提供了一个标准计算模型,应用则可以定制化特定部分进行逻辑处理, 只需要编写对应的函数,不需要自己去编写整个过程。
当然如果你有兴趣,自己定义重写或者扩展一套计算模型也是非常方便的。
我们可以看一下这个计算逻辑的定义:
class (Monad (EvalRow e), Monad (EvalMonad e), Traversable (EvalRow e)) => Evaluation e where
type EvalValue e :: *
type EvalRow e :: * -> *
type EvalMonad e :: * -> *
addItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
removeItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
unionItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
intersectItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e] -> EvalT e 'TableContext (EvalMonad e) (EvalRow e [EvalValue e])
distinctItems :: Proxy e -> EvalRow e [EvalValue e] -> EvalRow e [EvalValue e]
offsetItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
limitItems :: Proxy e -> Int -> RecordSet e -> RecordSet e
filterBy :: Expr ResolvedNames Range -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
inList :: EvalValue e -> [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
inSubquery :: EvalValue e -> EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
existsSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
atTimeZone :: EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleConstant :: Proxy e -> Constant a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleCases :: Proxy e -> [(Expr ResolvedNames Range, Expr ResolvedNames Range)] -> Maybe (Expr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleFunction :: Proxy e -> FunctionName Range -> Distinct -> [Expr ResolvedNames Range] -> [(ParamName Range, Expr ResolvedNames Range)] -> Maybe (Filter ResolvedNames Range) -> Maybe (OverSubExpr ResolvedNames Range) -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleGroups :: [RColumnRef ()] -> EvalRow e ([EvalValue e], EvalRow e [EvalValue e]) -> EvalRow e (RecordSet e)
handleLike :: Proxy e -> Operator a -> Maybe (Escape ResolvedNames Range) -> Pattern ResolvedNames Range -> Expr ResolvedNames Range -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleOrder :: Proxy e -> [Order ResolvedNames Range] -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
handleSubquery :: EvalRow e [EvalValue e] -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleJoin :: Proxy e -> JoinType a -> JoinCondition ResolvedNames Range -> RecordSet e -> RecordSet e -> EvalT e 'TableContext (EvalMonad e) (RecordSet e)
handleStructField :: Expr ResolvedNames Range -> StructFieldName a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
handleTypeCast :: CastFailureAction -> Expr ResolvedNames Range -> DataType a -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
binop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))
unop :: Proxy e -> TL.Text -> Maybe (EvalValue e -> EvalT e 'ExprContext (EvalMonad e) (EvalValue e))
queryparser定义了两种计算模型:
- 数据计算模型
也就是说可以真实通过在数据上运行sql上跑出结果。当然这个里面的逻辑并不是非常优化的,没有做rbo或者cbo相关的优化,并且逻辑实现比较简单。所以不适合海量数据运算,则提供基础实现版本。
- 数据计算模型
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval/Concrete.hs
这里面的细节由于跟本文关联不大,就不一一介绍了,如有兴趣,可以源码篇作更进一步介绍。
字段血缘模型就是用得比较多的了,简单来讲呢,依旧是深度遍历到列级别,然后组装起来。
我们可以简单看看计算模型的组装过程分为哪些。
https://github.com/uber/queryparser/blob/master/src/Database/Sql/Util/Eval.hs
最基础的内容是常量与(列)表达式,表结构
instance Evaluation e => Evaluate e (Constant a) where
type EvalResult e (Constant a) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
eval p constant = handleConstant p constant
instance Evaluation e => Evaluate e (Expr ResolvedNames Range) where
type EvalResult e (Expr ResolvedNames Range) = EvalT e 'ExprContext (EvalMonad e) (EvalValue e)
...
eval _ (ColumnExpr _ col) = do
row <- asks evalRow
case M.lookup (void col) row of
Just x -> pure x
Nothing -> throwError $ "failure looking up column: " ++ show (void col) ++ " in " ++ show (M.keys row)
...
instance Evaluation e => Evaluate e (Tablish ResolvedNames Range) where
type EvalResult e (Tablish ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
...
eval _ (TablishTable _ _ (RTableRef tableName table)) = asks evalFromTable <*> pure (RTableName tableName table) >>= \case
Nothing -> throwError $ "missing table: " ++ show (void tableName)
Just result -> pure result
eval _ (TablishTable _ _ (RTableAlias (TableAlias _ aliasName alias))) = asks (M.lookup alias . evalAliasMap) >>= \case
Nothing -> throwError $ "missing table alias: " ++ show aliasName
Just result -> pure result
...
表结构向上传播到TablishJoin , TablishLateralView , SelectFrom
列表达式通过表达式一层层往上计算。
然后到达SelectWhere, JoinCondition, SelectTimeseries , SelectGroup, SelectHaving
然后组合形成了基本的select语句
instance Evaluation e => Evaluate e (Select ResolvedNames Range) where
type EvalResult e (Select ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
eval p Select{..} = do
-- nb. if we handle named windows at resolution time (T637160)
-- then we shouldn't need to do anything with them here
unfiltered <- maybe (pure $ emptyRecordSet p) (eval p) selectFrom
filtered <- maybe pure (eval p) selectWhere unfiltered
interpolated <- maybe pure (eval p) selectTimeseries filtered
groups <- maybe (const $ pure . pure) (eval p) selectGroup selectCols interpolated
having <- maybe pure (eval p) selectHaving groups
records <- mapM (eval p selectCols) having
let rows = recordSetItems =<< records
labels = map void $ selectionNames =<< selectColumnsList selectCols
indistinct = makeRecordSet p labels rows
pure $ case selectDistinct of
Distinct True -> indistinct { recordSetItems = distinctItems p $ recordSetItems indistinct }
Distinct False -> indistinct
然后加上CTE功能就形成了完整的Query,然后在SelectFrom里面可以递归到这个Query进行递归组合。
instance Evaluation e => Evaluate e (Query ResolvedNames Range) where
type EvalResult e (Query ResolvedNames Range) = EvalT e 'TableContext (EvalMonad e) (RecordSet e)
eval p (QuerySelect _ select) = eval p select
eval p (QueryExcept _ (ColumnAliasList cs) lhs rhs) = do
exclude <- recordSetItems <$> eval p rhs
RecordSet{recordSetItems = unfiltered, ..} <- eval p lhs
let labels = map (RColumnAlias . void) cs
makeRecordSet p labels <$> removeItems p exclude unfiltered
eval p (QueryUnion _ (Distinct False) (ColumnAliasList cs) lhs rhs) = do
RecordSet{recordSetItems = lhsRows, ..} <- eval p lhs
RecordSet{recordSetItems = rhsRows} <- eval p rhs
let labels = map (RColumnAlias . void) cs
makeRecordSet p labels <$> unionItems p lhsRows rhsRows
eval p (QueryUnion info (Distinct True) cs lhs rhs) = do
result@RecordSet{recordSetItems} <- eval p (QueryUnion info (Distinct False) cs lhs rhs)
pure $ result{recordSetItems = distinctItems p recordSetItems}
eval p (QueryIntersect _ (ColumnAliasList cs) lhs rhs) = do
RecordSet{recordSetItems = litems, ..} <- eval p lhs
ritems <- recordSetItems <$> eval p rhs
let labels = map (RColumnAlias . void) cs
makeRecordSet p labels <$> intersectItems p litems ritems
eval p (QueryWith _ [] query) = eval p query
eval p (QueryWith info (CTE{..}:ctes) query) = do
RecordSet{..} <- eval p cteQuery
columns <- override cteColumns recordSetLabels
let result = makeRecordSet p columns recordSetItems
introduceAlias p (void cteAlias) result $ eval p $ QueryWith info ctes query
where
override [] ys = pure ys
override (alias:xs) (_:ys) = do
ys' <- override xs ys
pure $ (RColumnAlias $ void alias) : ys'
override _ [] = throwError "more aliases than columns in CTE"
eval p (QueryLimit _ limit query) = eval p limit <$> eval p query
eval p (QueryOffset _ offset query) = eval p offset <$> eval p query
eval p (QueryOrder _ orders query) = eval p query >>= handleOrder p orders
当然这个过程我们会在源码篇详细讲解,对于普通读者,可以了解一下大致的计算过程。
整个过程有个EvalContext,
evalAliasMap里面会记录表别名对应的信息,
evalFromTable里面会记录表对应的信息,
evalRow里面会对应字段对应的信息
data EvalContext e = EvalContext
{ evalAliasMap :: Map TableAliasId (RecordSet e)
, evalFromTable :: RTableName Range -> Maybe (RecordSet e)
, evalRow :: Map (RColumnRef ()) (EvalValue e)
}
data RecordSet e = RecordSet
{ recordSetLabels :: [RColumnRef ()]
, recordSetItems :: EvalRow e [EvalValue e]
}
这个信息在字段血缘分析的过程中表体表现为ColumnPlusSet:
instance Evaluation ColumnLineage where
type EvalValue ColumnLineage = ColumnPlusSet
type EvalRow ColumnLineage = Writer ColumnPlusSet
type EvalMonad ColumnLineage = Identity
data ColumnPlusSet = ColumnPlusSet
{ columnPlusColumns :: Map FQCN (Map FieldChain (Set Range))
, columnPlusTables :: Map FQTN (Set Range)
} deriving (Eq, Show)
ColumnPlusSet的主要信息就是相关的列信息与表信息。
type ColumnLineagePlus = Map (Either FQTN FQCN) ColumnPlusSet
最终解析出来的结果有两种形式:
对于每个表FQTN, 有对应的ColumnPlusSet(表依赖集及字段依赖集)
对于表的每个字段FQCN,有对应的ColumnPlusSet(表依赖集及字段依赖集)
有人可能奇怪了,在什么时候,字段会依赖于表?
这个在select count(1) as cnt from b的时候cnt就是依赖于表的。当然其它情况,我会进一步整理后补充。
4. 如何新增新的数据库方言。
我们前面讲过,整个血缘分析分为三部分。
第一部分sql解析的这个词法解析及语法解析比较依赖于sql的语法规则,所以需要手写,难度并不大,只是工作量比较大。比如支持存储过程之类的也是语法规则问题。
第二部分信息传播,基本上是通用的。对于除标准sql以外的信息,我们大部分不需要处理,特定的需要处理的工作量也非常少,可以简单抄抄改改。比如支持存储过程之类的大部分也是sql逻辑处理后组合起来。
第三部分计算模型,也基本上是通用的,由于对于血缘分析来说,所涉及到关注面会更少,所需要改动是最小的。。。
5. SQL解析潮流从此开始
所以我们可以看到,queryparser仅仅就干好了一件事情,就几千行代码,并且干得简单。扩展性也很强。
当然我们也注意到了,它的整个列信息推理过程比较原始,直接查询了catalog,但是也是非常方便后续扩展的。
它的计算模型对比其它解析器是非常大的一个亮点,你可以基于各种方言写支持各种生态的扩展。只需要将对应方言规则的sql解析后,实现对应计算模型里的方法即可,给了我们相当大的想象力。
你可以配置sql去做http restful递归请求,配置sql去做etl数据加载。。。