为了批流统一,Flink提供了两种关系型API,Table API和SQL。Table API是一种语言集成的查询API,由多个比如selection,filter,join关系operator组合而成。Flink SQL是基于Calcite来实现的。无论是在streaming还是在batch上,Table API和SQL具有相同的语义并且能够得到相同的结果。Table API、SQL以及DataStream API之间都能无缝集成,可以方便的使用。本篇文章主要介绍下Flink SQL的整体流程。本文内容是基于Flink 1.12来讲解。
1. 整体执行流程介绍
Flink SQL的执行一般分为四个阶段,主要依赖 Calcite 来完成
Parse
语法解析,Calcite使用JavaCC把用户提交的Sql String 转换成一个抽象语法树(AST),对应SqlNode节点。Validate
语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在,会分别对 from / where / group by / having / select / order by 等子句进行validate ,校验之后仍然是 SqlNode 构成的语法树;Optimize
查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;Execute
将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
2. 源码分析
Flink1.12 默认的Planner是Blink Planner,本篇文章也是基于Blink Planner来分析。
为了方便讲解,咱们给个具体regular join例子来说明,将 Orders 订单表和 Shipments 运输单表依据订单id进行Regular Join,并且以name进行分组,然后对name进行orderby操作,最后取出name以及sum(price)
SELECT name, SUM(price)
FROM Orders o
JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name
2.1 Parse阶段
源码入口TableEnvironment#sqlQuery方法,会调用实现类TableEnvironmentImpl#sqlQuery方法,看下该方法源码
@Override
public Table sqlQuery(String query) {
List<Operation> operations = parser.parse(query);
if (operations.size() != 1) {
throw new ValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query.");
}
Operation operation = operations.get(0);
if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {
return createTable((QueryOperation) operation);
} else {
throw new ValidationException(
"Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");
}
}
然后看下parser.parse(query) --> ParserImpl#parse
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
首先是基于Calcite的Parse解析,Calcite涉及的东西比较多,这里就不展开来说了,最终返回一个SqlNode节点。
然后调用convert方法,在这个方法中,会对解析之后的SqlNode进行validate
2.2 Validate阶段
2.2.1 Scope背景知识
-
首先给出SqlValidatorScope的继承关系,该图来自https://www.jianshu.com/p/83e88fdc04ec
SqlValidatorScope是所有Scope的父接口,是做名字转换用的,代表在Parse Tree的位置。当验证如"foo"."bar"这个表达式的时候,会首先使用对应Scope实现的resolve方法来定位"foo",如果成功,会返回描述结果类型的SqlValidatorNamespace对象。
- 这个概念可能有点抽象,咱们看下其中一个具体实现SelectScope的官方解释
/**
* The name-resolution scope of a SELECT clause. The objects visible are those
* in the FROM clause, and objects inherited from the parent scope.
*
*
* <p>This object is both a {@link SqlValidatorScope} and a
* {@link SqlValidatorNamespace}. In the query</p>
*
* <blockquote>
* <pre>SELECT name FROM (
* SELECT *
* FROM emp
* WHERE gender = 'F')</pre></blockquote>
*
* <p>we need to use the {@link SelectScope} as a
* {@link SqlValidatorNamespace} when resolving 'name', and
* as a {@link SqlValidatorScope} when resolving 'gender'.</p>
*
* <h2>Scopes</h2>
*
* <p>In the query</p>
*
* <blockquote>
* <pre>
* SELECT expr1
* FROM t1,
* t2,
* (SELECT expr2 FROM t3) AS q3
* WHERE c1 IN (SELECT expr3 FROM t4)
* ORDER BY expr4</pre>
* </blockquote>
*
* <p>The scopes available at various points of the query are as follows:</p>
*
* <ul>
* <li>expr1 can see t1, t2, q3</li>
* <li>expr2 can see t3</li>
* <li>expr3 can see t4, t1, t2</li>
* <li>expr4 can see t1, t2, q3, plus (depending upon the dialect) any aliases
* defined in the SELECT clause</li>
* </ul>
*
* <h2>Namespaces</h2>
*
* <p>In the above query, there are 4 namespaces:</p>
*
* <ul>
* <li>t1</li>
* <li>t2</li>
* <li>(SELECT expr2 FROM t3) AS q3</li>
* <li>(SELECT expr3 FROM t4)</li>
* </ul>
*
* @see SelectNamespace
*/
- 简单理解一下,scope直接翻译就是范围,表示SQL不同部分可以看到的数据源,类似于Java的作用域。Scope中重要的方法包括 1. 获取Scope的root SqlNode :SqlNode getNode(); 2. 获取Scope中所有的columns: void findAllColumnNames(List<SqlMoniker> result); 3. 获取Scope中所有的table aliases:void findAliases(Collection<SqlMoniker> result); 4. 将field转成fully-qualified identifier的方法: SqlQualified fullyQualify(SqlIdentifier identifier); 等
2.2.2 Namespace背景知识
与Scope类似,SqlValidatorNamespace是所有Namespace的父接口,一个Namespace描述了一个SQL查询的关系。
比如1,对于 SELECT emp.deptno, age FROM emp,dept 这样一个查询,from子句代表了一个Namespace,这个Namespace由emp和dept两个table组成,row type由这两个emp、dept table的column组成。
另外一个例子2,如果一个查询的from子句中,包含一个table和一个sub-query,那该Namespace就包含了该table的columns和sub-query的select中的columns字段。SqlValidatorNamespace也有多种实现,比如table name对应IdentifierNamespace,SELECT queries对应SelectNamespace,UNION / EXCEPT / INTERSECT对应SetopNamespace等
简单理解一下,namespace表示scope中的一个数据源(数据源是一个逻辑概念,可以是table,field或子查询),一个scope中可以有多个namespace。
2.2.3 Validate源码分析
源码入口 ParserImpl#parse --> SqlToOperationConverter#convert --> FlinkPlannerImpl#validate --> SqlValidatorImpl#validate 方法
public SqlNode validate(SqlNode topNode) {
SqlValidatorScope scope = new EmptyScope(this);
scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
final SqlNode topNode2 = validateScopedExpression(topNode, scope);
final RelDataType type = getValidatedNodeType(topNode2);
Util.discard(type);
return topNode2;
}
该方法主要的逻辑都在validateScopedExpression中,处理流程如下:
- performUnconditionalRewrites
- 对表达式进行重写使其更标准化,简化接下来的validate逻辑。
SELECT name, SUM(price)
FROM Orders o
JOIN Shipments s ON o.id = s.orderId
GROUP BY name
HAVING SUM(o.price) > 10
ORDER BY name
下面针对这个例子具体调试跟下源码
-
在经过Calcite parse之后,得到的SqlNode是一个SqlOrderBy实例,并且join --> inner join
然后看下performUnconditionalRewrites方法
上面提到parse之后,SqlNode是一个SqlOrderBy实例,performUnconditionalRewrites方法首先会先拿到SqlOrderBy对应的OperandList,具体都有哪些Operands,可以看下SqlOrderBy#getOperandList,对应四个Operands: query, orderList, offset, fetch。(note:不同SqlNode实例,有不同的getOperandList实现)
1. query: 是个SqlSelect实例
SELECT `name`, SUM(`price`)
FROM `Orders` AS `o`
INNER JOIN `Shipments` AS `s` ON `o`.`id` = `s`.`orderId`
GROUP BY `name`
HAVING SUM(`o`.`price`) > 10
2. orderList:是个SqlNodeList实例,对应 name
3. offset: null
4. fetch: null
然后performUnconditionalRewrites方法会对每个Operand递归的执行performUnconditionalRewrites。
如果operand对应的SqlKind是 VALUES / ORDER_BY / EXPLICIT_TABLE / DELETE / UPDATE / MERGE , performUnconditionalRewrites会对其进行rewrite。
performUnconditionalRewrites方法执行之后,SqlNode从最开始的SqlOrderBy实例,变成了SqlSelect实例
- performUnconditionalRewrites方法,把SqlOrderBy节点的orderList, offset, fetch这三个operand赋值给SqlSelect对象,然后直接把SqlSelect返回。
- registerQuery
- 创建scope以及namespace,namespace和scope对象中都会包含SqlNode。
上面例子执行完registerQuery方法,最终FlinkCalciteSqlValidator对象为
从图中可以看出,测试sql共有两个scopes和4个namespaces
- validateQuery
- 首先根据node和scope来获取namespace
- 对namespace进行validate,validateNamespace(ns, targetRowType);
- 最后通过调用validateSelect来进行SQL验证
调用关系链如下图所示:
validateSelect处理分为8个部分:
- validateFrom
- 在调用validateFrom之前,首先会去校验from后面的names有没有重复table name,如果有重复,直接报错。如果没有from子句,也会直接报错。
/**
* Validates the FROM clause of a query, or (recursively) a child node of
* the FROM clause: AS, OVER, JOIN, VALUES, or sub-query.
*
* @param node Node in FROM clause, typically a table or derived
* table
* @param targetRowType Desired row type of this expression, or
* {@link #unknownType} if not fussy. Must not be null.
* @param scope Scope
*/
protected void validateFrom(
SqlNode node,
RelDataType targetRowType,
SqlValidatorScope scope) {
Objects.requireNonNull(targetRowType);
switch (node.getKind()) {
case AS:
case TABLE_REF:
validateFrom(
((SqlCall) node).operand(0),
targetRowType,
scope);
break;
case VALUES:
validateValues((SqlCall) node, targetRowType, scope);
break;
case JOIN:
validateJoin((SqlJoin) node, scope);
break;
case OVER:
validateOver((SqlCall) node, scope);
break;
case UNNEST:
validateUnnest((SqlCall) node, scope, targetRowType);
break;
default:
validateQuery(node, scope, targetRowType);
break;
}
// Validate the namespace representation of the node, just in case the
// validation did not occur implicitly.
getNamespace(node, scope).validate(targetRowType);
}
对于我们的例子,会先调用validateJoin
- validateJoin方法中,又包括了validateFrom(left, unknownType, joinScope); 和 validateFrom(right, unknownType, joinScope); 这两个方法首先完成的功能是验证,其次会把left节点和right节点的名字补齐,即full qualified name,比如
`Orders` AS `o` --> `default_catalog`.`default_database`.`Orders` AS `o`
- validateJoin会去校验是等值连接还是使用on关键字。如果使用on关键字,那on的条件是否为空,如果为空直接抛异常。
- 验证from中的table是否存在,将sqlNode name扩展为full qualified name并得到数据源的数据类型可以理解该数据源所有field type组成的row type。
validateWhereClause:
- 验证where条件中的字段是否存在,如果不存在就返回;
- 扩展field name为full qualified name;
- 验证where clause中是否存在aggregate function,如果where clause中存在 Windowed aggregate 表达式 或者 Aggregate表达式,就直接报错。
- 验证where clause的类型是否是boolean类型,如果不是,直接报错。validateGroupClause
- 验证group by field是否存在,如果不存在就返回;
- 如果存在就扩展该field name
- 验证group by中是否存在aggregate function,如果group by中存在 Windowed aggregate 表达式 或者 Aggregate表达式,就直接报错。validateHavingClause
- 验证having field是否存在,如果不存在就返回;
- 扩展having中的field name
- 验证having中不在聚合函数中的field是否和group by中的field相同;
- 验证having中的field是否存在;
- 验证having clause的类型是否是boolean类型,如果不是,直接报错。
- 验证having clause中是否有嵌套Aggregate expressions。对于非窗口agg(expr),expr中不能包含嵌套aggregate function,比如 SUM(2 * MAX(x)) 就是非法的;如果是windowed aggregate "agg(expr)",expr可以包含aggregate function,比如下面的例子就是合法的。
SELECT AVG(2 * MAX(x)) OVER (PARTITION BY y)
FROM t
GROUP BY y
validateWindowClause:暂略
handleOffsetFetch: 暂略
validateSelectList
- 验证projection list中的aliases是否是唯一的,对于 * 和 TABLE.* 不做该检查
- 如果select clause中包含 * 或 TABLE.* ,对字段进行补齐
- 扩展field name;并且抽取aliases,使用selectItems(List<SqlNode>)和 aliases(Set<String>)分别存储field的node信息和别名;然后获得field数据类型,组成键值对<alias, type> 放入 List<Map.Entry<String, RelDataType>> fields 对象中。
- 验证projection list中的field是否存在
- 判断projection list中不在聚合函数中的field是否和group by的field相同validateOrderList
- 验证order by field是否存在,如果不存在就返回;
- 如果order by field不在select projection list中,扩展field name,否则不进行扩展;
- 验证order by中field是否是group by的field子集,如果出现不在group by中的field,直接报错。
到这里,validate验证阶段基本就完成了。
2.3 Optimize阶段
2.3.1 Convert转换
convert阶段主要的工作是把 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,也就是生成相应的逻辑计划(Logical Plan),然后返回根结点 RelRoot。
即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan),根据这个已经生成的Flink的logical Plan,将它转换成calcite的logicalPlan,这样我们才能用到calcite强大的优化规则。
SqlNode有很多种,既包括MIN、MAX这种表达式型的,也包括SELECT、JOIN这种关系型的,转化过程中,将这两种分离成RexNode表达式型和RelNode关系型。
介绍下 RelRoot 出现的必要性,这里说两个场景,都需要借助 RelRoot 来解决。
场景1:
SELECT name
FROM emp
ORDER BY empno DESC
Calcite知道结果必须sort排序,但不能将其排序顺序表示为排序法,因为empno并不在结果字段中,这种场景我们需要使用RelRoot这么表达
RelRoot: {
rel: Sort($1 DESC)
Project(name, empno)
TableScan(EMP)
fields: [0]
collation: [1 DESC]
}
可以看到,empno字段会出现在结果字段中,只是 fields 标识给consumer暴露什么字段。
场景2:
SELECT name AS n, name AS n2, empno AS n FROM emp
这里重复使用了name字段,并且有多列的别名都叫 n ,这种场景应该使用RelRoot这么表达
RelRoot: {
rel: Project(name, empno)
TableScan(EMP)
fields: [(0, "n"), (0, "n2"), (1, "n")]
collation: []
}
2.3.1.1 源码入口:SqlToRelConverter#convertQuery
public RelRoot convertQuery(
SqlNode query,
final boolean needsValidation,
final boolean top) {
if (needsValidation) {
query = validator.validate(query);
}
RelNode result = convertQueryRecursive(query, top, null).rel;
可以看出在convertQueryRecursive采取了递归遍历的方式来解析query。对于给定的例子,会去调用 convertSelect --> convertSelectImpl
/**
* Implementation of {@link #convertSelect(SqlSelect, boolean)};
* derived class may override.
*/
protected void convertSelectImpl(
final Blackboard bb,
SqlSelect select) {
convertFrom(
bb,
select.getFrom());
convertWhere(
bb,
select.getWhere());
final List<SqlNode> orderExprList = new ArrayList<>();
final List<RelFieldCollation> collationList = new ArrayList<>();
gatherOrderExprs(
bb,
select,
select.getOrderList(),
orderExprList,
collationList);
final RelCollation collation =
cluster.traitSet().canonize(RelCollations.of(collationList));
if (validator.isAggregate(select)) {
convertAgg(
bb,
select,
orderExprList);
} else {
convertSelectList(
bb,
select,
orderExprList);
}
if (select.isDistinct()) {
distinctify(bb, true);
}
convertOrder(
select, bb, collation, orderExprList, select.getOffset(),
select.getFetch());
if (select.hasHints()) {
final List<RelHint> hints = SqlUtil.getRelHint(hintStrategies, select.getHints());
// Attach the hints to the first Hintable node we found from the root node.
bb.setRoot(bb.root
.accept(
new RelShuttleImpl() {
boolean attached = false;
@Override public RelNode visitChild(RelNode parent, int i, RelNode child) {
if (parent instanceof Hintable && !attached) {
attached = true;
return ((Hintable) parent).attachHints(hints);
} else {
return super.visitChild(parent, i, child);
}
}
}), true);
} else {
bb.setRoot(bb.root, true);
}
}
也就是在对select这种SqlNode进行convert的时候,会分别对from,where,aggregate,select,order等进行转换,最终生成的逻辑计划如下:
== Abstract Syntax Tree ==
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+- LogicalProject(name=[$1], price=[$2])
+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
+- LogicalTableScan(table=[[default_catalog, default_database, Shipments]])
转成一个关系树之后:
- 首先 会生成一个operation,对应代码 SqlToOperationConverter#toQueryOperation。operation包含两个成员变量,分别是RelNode calciteTree(就是刚才convert之后的RelNode)和 TableSchema tableSchema(包含字段名,字段类型 列表,对于本文例子这里是select之后到的VARCHAR(2147483647) name, FLOAT EXPR$1),在tableSchema的build方法中,还会对field name是否重复进行validate,以及验证watermark对应的rowtime属性类型的root type是否是TIMESTAMP_WITHOUT_TIME_ZONE。
- 根据上面生成的operation,生成一个Table。table包含的成员变量主要包括QueryOperation,TableEnvironmentInternal以及OperationTreeBuilder和LookupCallResolver。
2.3.2 Optimize
// TODO,sql优化的东西有点多,这块内容后面再补充
2.4 Execute阶段
2.4.1 转换成ExecNode
入口 PlannerBase#translateToExecNodePlan,完成FlinkPhysicalRel DAG 到 ExecNode DAG的转换,并且尝试reuse重复的sub-plans。
// TODO,源码过程尽量补充下