最近工作中需要开发一些新的Flink sql 的connector,所以先开始研究研究Flink sql的执行流程。
基本结构
Planner接口
负责sql解析、转换成Transformation
Executor接口
负责将planner转换的Transformation生成streamGraph并执行
public interface Planner {
/**
* Retrieves a {@link Parser} that provides methods for parsing a SQL string.
*
* @return initialized {@link Parser}
*/
Parser getParser();
/**
* Converts a relational tree of {@link ModifyOperation}s into a set of runnable
* {@link Transformation}s.
*
* <p>This method accepts a list of {@link ModifyOperation}s to allow reusing common
* subtrees of multiple relational queries. Each query's top node should be a {@link ModifyOperation}
* in order to pass the expected properties of the output {@link Transformation} such as
* output mode (append, retract, upsert) or the expected output type.
*
* @param modifyOperations list of relational operations to plan, optimize and convert in a
* single run.
* @return list of corresponding {@link Transformation}s.
*/
List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);
/**
* Returns the AST of the specified Table API and SQL queries and the execution plan
* to compute the result of the given collection of {@link QueryOperation}s.
*
* @param operations The collection of relational queries for which the AST
* and execution plan will be returned.
* @param extended if the plan should contain additional properties such as
* e.g. estimated cost, traits
*/
String explain(List<Operation> operations, boolean extended);
/**
* Returns completion hints for the given statement at the given cursor position.
* The completion happens case insensitively.
*
* @param statement Partial or slightly incorrect SQL statement
* @param position cursor position
* @return completion hints that fit at the current cursor position
*/
String[] getCompletionHints(String statement, int position);
}
Sql解析
Parser接口
负责sql解析
有两个实现一个是old planner,另一个是blink planner
flink对sql的解析依赖于calcite
具体实现
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
//依赖calcite将sql语句解析为sqlNode
SqlNode parsed = parser.parse(statement);
//将sqlnode转换为Operation
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
/**
* This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
* SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
* is subclass of {@code SqlNode}.
*
* @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
* @param catalogManager CatalogManager to resolve full path for operations
* @param sqlNode SqlNode to execute on
*/
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
// 校验sql的合法性
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
//对不同的ddl/dml进行转换
if (validated instanceof SqlCreateTable) {
return Optional.of(converter.convertCreateTable((SqlCreateTable) validated));
} else if (validated instanceof SqlDropTable) {
return Optional.of(converter.convertDropTable((SqlDropTable) validated));
} else if (validated instanceof SqlAlterTable) {
return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
} else if (validated instanceof SqlCreateFunction) {
return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
} else if (validated instanceof SqlAlterFunction) {
return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
} else if (validated instanceof SqlDropFunction) {
return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
} else if (validated instanceof RichSqlInsert) {
return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
} else if (validated instanceof SqlUseCatalog) {
return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
} else if (validated instanceof SqlUseDatabase) {
return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
} else if (validated instanceof SqlCreateDatabase) {
return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));
} else if (validated instanceof SqlDropDatabase) {
return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
} else if (validated instanceof SqlAlterDatabase) {
return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
} else if (validated instanceof SqlCreateCatalog) {
return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
return Optional.empty();
}
}
举个栗子:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
--'connector.properties.group.id' = '',
'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
'format.type' = 'json' -- 数据源格式为 json
)
上面的sql经过SqlNode parsed = parser.parse(statement);解析之后如下图:
对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。
不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。
Operation主要分为Create/Alter/Drop/Modify/Query/Use 几大类,每个下面又细分如CreateOperation
看下createTable的处理,返回的就是CreateTableOperation
/**
* Convert the {@link SqlCreateTable} node.
*/
private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
// primary key and unique keys are not supported
if ((sqlCreateTable.getPrimaryKeyList().size() > 0)
|| (sqlCreateTable.getUniqueKeysList().size() > 0)) {
throw new SqlConversionException("Primary key and unique key are not supported yet.");
}
// set with properties
//将with参数放到一个map中
Map<String, String> properties = new HashMap<>();
sqlCreateTable.getPropertyList().getList().forEach(p ->
properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
//schema
TableSchema tableSchema = createTableSchema(sqlCreateTable);
String tableComment = sqlCreateTable.getComment().map(comment ->
comment.getNlsString().getValue()).orElse(null);
// set partition key
List<String> partitionKeys = sqlCreateTable.getPartitionKeyList()
.getList()
.stream()
.map(p -> ((SqlIdentifier) p).getSimple())
.collect(Collectors.toList());
CatalogTable catalogTable = new CatalogTableImpl(tableSchema,
partitionKeys,
properties,
tableComment);
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
return new CreateTableOperation(
identifier,
catalogTable,
sqlCreateTable.isIfNotExists());
}
else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
对表的DDL操作(比如对table、function、database的操作)入口为tableEnv.sqlUpdate(sql),会调用到catalogManager对表的DDL进行维护,如果是query语句入口tableEnv.sqlQuery(sql),则会走
else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
}
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);
return new PlannerQueryOperation(relational.project());
}
将SqlNode转换为RelNode,
主要包含3个步骤:
推断table类型
推断计算列
-
推断watermark分配
val tableSourceTable = new TableSourceTable[T]( relOptSchema, schemaTable.getTableIdentifier, erasedRowType, statistic, tableSource, schemaTable.isStreamingMode, catalogTable) // 1. push table scan // Get row type of physical fields. val physicalFields = getRowType .getFieldList .filter(f => !columnExprs.contains(f.getName)) .map(f => f.getIndex) .toArray // Copy this table with physical scan row type. val newRelTable = tableSourceTable.copy(tableSource, physicalFields) val scan = LogicalTableScan.create(cluster, newRelTable) val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema) relBuilder.push(scan) val toRexFactory = cluster .getPlanner .getContext .unwrap(classOf[FlinkContext]) .getSqlExprToRexConverterFactory // 2. push computed column project val fieldNames = erasedRowType.getFieldNames.asScala if (columnExprs.nonEmpty) { val fieldExprs = fieldNames .map { name => if (columnExprs.contains(name)) { columnExprs(name) } else { s"`$name`" } }.toArray val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs) relBuilder.projectNamed(rexNodes.toList, fieldNames, true) } // 3. push watermark assigner val watermarkSpec = catalogTable .getSchema // we only support single watermark currently .getWatermarkSpecs.asScala.headOption if (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) { if (TableSourceValidation.hasRowtimeAttribute(tableSource)) { throw new TableException( "If watermark is specified in DDL, the underlying TableSource of connector" + " shouldn't return an non-empty list of RowtimeAttributeDescriptor" + " via DefinedRowtimeAttributes interface.") } val rowtime = watermarkSpec.get.getRowtimeAttribute if (rowtime.contains(".")) { throw new TableException( s"Nested field '$rowtime' as rowtime attribute is not supported right now.") } val rowtimeIndex = fieldNames.indexOf(rowtime) val watermarkRexNode = toRexFactory .create(erasedRowType) .convertToRexNode(watermarkSpec.get.getWatermarkExpr) relBuilder.watermark(rowtimeIndex, watermarkRexNode) } // 4. returns the final RelNode relBuilder.build() }
对Source table进行推断
见CatalogSourceTable.scala
lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]
TableFactoryUtil会根据DDL中的with参数进行推断
private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
try {
return TableFactoryService
.find(TableSourceFactory.class, properties)
.createTableSource(properties);
} catch (Throwable t) {
throw new TableException("findAndCreateTableSource failed.", t);
}
}
主要逻辑在TableFactoryService#filter方法中
private static <T extends TableFactory> List<T> filter(
List<TableFactory> foundFactories,
Class<T> factoryClass,
Map<String, String> properties) {
Preconditions.checkNotNull(factoryClass);
Preconditions.checkNotNull(properties);
//先根据TableSourceFactory.class 过滤出source的factory
List<T> classFactories = filterByFactoryClass(
factoryClass,
properties,
foundFactories);
//再根据with参数中的connect.type过滤出满足条件的SourceFactory
List<T> contextFactories = filterByContext(
factoryClass,
properties,
classFactories);
//最终判断一下所有properties是否都支持
return filterBySupportedProperties(
factoryClass,
properties,
classFactories,
contextFactories);
}
大致调用栈如下
后续的优化部分其实都是直接基于 RelNode来完成的。
SQL 转换及优化
转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNode,4)转换为底层的 Transformation 算子。
如果是DML操作进过SQL转化后会变为ModifyOperation,就会调用Planner的translate方法。
具体实现在PlannerBase.scala
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
return List.empty[Transformation[_]]
}
// prepare the execEnv before translating
getExecEnv.configure(
getTableConfig.getConfiguration,
Thread.currentThread().getContextClassLoader)
overrideEnvParallelism()
//转化为relNode
val relNodes = modifyOperations.map(translateToRel)
//SQL优化
val optimizedRelNodes = optimize(relNodes)
//转化为execNode
val execNodes = translateToExecNodePlan(optimizedRelNodes)
//转化为底层的transfomation
translateToPlan(execNodes)
}
优化比较复杂,Blink主要是基于Calcite自己的优化,并自定义了一些优化逻辑,有兴趣的读者可以自行研究。
SQL执行
在得到Transformation后的转化逻辑就和streaming模式一致了,可以参考我之前的博客Flink源码阅读之基于Flink1.10的任务提交流程和[Flink源码阅读之基于Flink1.10的任务执行流程](