Flink源码阅读之Flinksql执行流程

最近工作中需要开发一些新的Flink sql 的connector,所以先开始研究研究Flink sql的执行流程。

基本结构

Planner接口
负责sql解析、转换成Transformation
Executor接口
负责将planner转换的Transformation生成streamGraph并执行

public interface Planner {

    /**
     * Retrieves a {@link Parser} that provides methods for parsing a SQL string.
     *
     * @return initialized {@link Parser}
     */
    Parser getParser();

    /**
     * Converts a relational tree of {@link ModifyOperation}s into a set of runnable
     * {@link Transformation}s.
     *
     * <p>This method accepts a list of {@link ModifyOperation}s to allow reusing common
     * subtrees of multiple relational queries. Each query's top node should be a {@link ModifyOperation}
     * in order to pass the expected properties of the output {@link Transformation} such as
     * output mode (append, retract, upsert) or the expected output type.
     *
     * @param modifyOperations list of relational operations to plan, optimize and convert in a
     * single run.
     * @return list of corresponding {@link Transformation}s.
     */
    List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);

    /**
     * Returns the AST of the specified Table API and SQL queries and the execution plan
     * to compute the result of the given collection of {@link QueryOperation}s.
     *
     * @param operations The collection of relational queries for which the AST
     * and execution plan will be returned.
     * @param extended if the plan should contain additional properties such as
     * e.g. estimated cost, traits
     */
    String explain(List<Operation> operations, boolean extended);

    /**
     * Returns completion hints for the given statement at the given cursor position.
     * The completion happens case insensitively.
     *
     * @param statement Partial or slightly incorrect SQL statement
     * @param position cursor position
     * @return completion hints that fit at the current cursor position
     */
    String[] getCompletionHints(String statement, int position);
}

Sql解析

Parser接口
负责sql解析
有两个实现一个是old planner,另一个是blink planner
flink对sql的解析依赖于calcite

具体实现

@Override
    public List<Operation> parse(String statement) {
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();
        // parse the sql query
        //依赖calcite将sql语句解析为sqlNode
        SqlNode parsed = parser.parse(statement);

        //将sqlnode转换为Operation
        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
            .orElseThrow(() -> new TableException("Unsupported query: " + statement));
        return Collections.singletonList(operation);
    }
    
    /**
     * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
     * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
     * is subclass of {@code SqlNode}.
     *
     * @param flinkPlanner FlinkPlannerImpl to convertCreateTable sql node to rel node
     * @param catalogManager CatalogManager to resolve full path for operations
     * @param sqlNode SqlNode to execute on
     */
    public static Optional<Operation> convert(
            FlinkPlannerImpl flinkPlanner,
            CatalogManager catalogManager,
            SqlNode sqlNode) {
        // validate the query
        // 校验sql的合法性
        final SqlNode validated = flinkPlanner.validate(sqlNode);
        SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
        //对不同的ddl/dml进行转换
        if (validated instanceof SqlCreateTable) {
            return Optional.of(converter.convertCreateTable((SqlCreateTable) validated));
        } else if (validated instanceof SqlDropTable) {
            return Optional.of(converter.convertDropTable((SqlDropTable) validated));
        } else if (validated instanceof SqlAlterTable) {
            return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
        } else if (validated instanceof SqlCreateFunction) {
            return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
        } else if (validated instanceof SqlAlterFunction) {
            return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));
        } else if (validated instanceof SqlDropFunction) {
            return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));
        } else if (validated instanceof RichSqlInsert) {
            return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));
        } else if (validated instanceof SqlUseCatalog) {
            return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));
        } else if (validated instanceof SqlUseDatabase) {
            return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));
        } else if (validated instanceof SqlCreateDatabase) {
            return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));
        } else if (validated instanceof SqlDropDatabase) {
            return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
        } else if (validated instanceof SqlAlterDatabase) {
            return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
        } else if (validated instanceof SqlCreateCatalog) {
            return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
        } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
            return Optional.of(converter.convertSqlQuery(validated));
        } else {
            return Optional.empty();
        }
    }

举个栗子:

CREATE TABLE user_behavior (
                    user_id BIGINT,
                    item_id BIGINT,
                    category_id BIGINT,
                    behavior STRING,
                    ts TIMESTAMP(3),
                    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
                    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
                ) WITH (
                    'connector.type' = 'kafka',  -- 使用 kafka connector
                    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
                    'connector.topic' = 'user_behavior',  -- kafka topic
                    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
                    --'connector.properties.group.id' = '',
                    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址
                    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址
                    'format.type' = 'json'  -- 数据源格式为 json
                )

上面的sql经过SqlNode parsed = parser.parse(statement);解析之后如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GnFJR75e-1593768699547)(/Users/admin/Library/Containers/com.tencent.WeWorkMac/Data/Library/Application Support/WXWork/Temp/ScreenCapture/企业微信截图_13cee561-c9a0-4318-acd6-bd48a418549d.png)]

对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。
不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。
Operation主要分为Create/Alter/Drop/Modify/Query/Use 几大类,每个下面又细分如CreateOperation


在这里插入图片描述

看下createTable的处理,返回的就是CreateTableOperation

    /**
     * Convert the {@link SqlCreateTable} node.
     */
    private Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
        // primary key and unique keys are not supported
        if ((sqlCreateTable.getPrimaryKeyList().size() > 0)
            || (sqlCreateTable.getUniqueKeysList().size() > 0)) {
            throw new SqlConversionException("Primary key and unique key are not supported yet.");
        }

        // set with properties
    //将with参数放到一个map中
        Map<String, String> properties = new HashMap<>();
        sqlCreateTable.getPropertyList().getList().forEach(p ->
            properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
        //schema
        TableSchema tableSchema = createTableSchema(sqlCreateTable);
        String tableComment = sqlCreateTable.getComment().map(comment ->
            comment.getNlsString().getValue()).orElse(null);
        // set partition key
        List<String> partitionKeys = sqlCreateTable.getPartitionKeyList()
            .getList()
            .stream()
            .map(p -> ((SqlIdentifier) p).getSimple())
            .collect(Collectors.toList());

        CatalogTable catalogTable = new CatalogTableImpl(tableSchema,
            partitionKeys,
            properties,
            tableComment);

        UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
        ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

        return new CreateTableOperation(
            identifier,
            catalogTable,
            sqlCreateTable.isIfNotExists());
    }
else if (operation instanceof CreateTableOperation) {
            CreateTableOperation createTableOperation = (CreateTableOperation) operation;
            catalogManager.createTable(
                createTableOperation.getCatalogTable(),
                createTableOperation.getTableIdentifier(),
                createTableOperation.isIgnoreIfExists());
        } 

对表的DDL操作(比如对table、function、database的操作)入口为tableEnv.sqlUpdate(sql),会调用到catalogManager对表的DDL进行维护,如果是query语句入口tableEnv.sqlQuery(sql),则会走

else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
            return Optional.of(converter.convertSqlQuery(validated));
        }

private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
        // transform to a relational tree
        RelRoot relational = planner.rel(validated);
        return new PlannerQueryOperation(relational.project());
    }

将SqlNode转换为RelNode,

主要包含3个步骤:

  1. 推断table类型

  2. 推断计算列

  3. 推断watermark分配

    val tableSourceTable = new TableSourceTable[T](
        relOptSchema,
        schemaTable.getTableIdentifier,
        erasedRowType,
        statistic,
        tableSource,
        schemaTable.isStreamingMode,
        catalogTable)
    
      // 1. push table scan
    
      // Get row type of physical fields.
      val physicalFields = getRowType
        .getFieldList
        .filter(f => !columnExprs.contains(f.getName))
        .map(f => f.getIndex)
        .toArray
      // Copy this table with physical scan row type.
      val newRelTable = tableSourceTable.copy(tableSource, physicalFields)
      val scan = LogicalTableScan.create(cluster, newRelTable)
      val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
      relBuilder.push(scan)
    
      val toRexFactory = cluster
          .getPlanner
          .getContext
          .unwrap(classOf[FlinkContext])
          .getSqlExprToRexConverterFactory
    
      // 2. push computed column project
      val fieldNames = erasedRowType.getFieldNames.asScala
      if (columnExprs.nonEmpty) {
        val fieldExprs = fieldNames
            .map { name =>
              if (columnExprs.contains(name)) {
                columnExprs(name)
              } else {
                s"`$name`"
              }
            }.toArray
        val rexNodes = toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
        relBuilder.projectNamed(rexNodes.toList, fieldNames, true)
      }
    
      // 3. push watermark assigner
      val watermarkSpec = catalogTable
        .getSchema
        // we only support single watermark currently
        .getWatermarkSpecs.asScala.headOption
      if (schemaTable.isStreamingMode && watermarkSpec.nonEmpty) {
        if (TableSourceValidation.hasRowtimeAttribute(tableSource)) {
          throw new TableException(
            "If watermark is specified in DDL, the underlying TableSource of connector" +
                " shouldn't return an non-empty list of RowtimeAttributeDescriptor" +
                " via DefinedRowtimeAttributes interface.")
        }
        val rowtime = watermarkSpec.get.getRowtimeAttribute
        if (rowtime.contains(".")) {
          throw new TableException(
            s"Nested field '$rowtime' as rowtime attribute is not supported right now.")
        }
        val rowtimeIndex = fieldNames.indexOf(rowtime)
        val watermarkRexNode = toRexFactory
            .create(erasedRowType)
            .convertToRexNode(watermarkSpec.get.getWatermarkExpr)
        relBuilder.watermark(rowtimeIndex, watermarkRexNode)
      }
    
      // 4. returns the final RelNode
      relBuilder.build()
    }
    

对Source table进行推断

见CatalogSourceTable.scala

  lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]]

TableFactoryUtil会根据DDL中的with参数进行推断

private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
   try {
      return TableFactoryService
         .find(TableSourceFactory.class, properties)
         .createTableSource(properties);
   } catch (Throwable t) {
      throw new TableException("findAndCreateTableSource failed.", t);
   }
}

主要逻辑在TableFactoryService#filter方法中

private static <T extends TableFactory> List<T> filter(
      List<TableFactory> foundFactories,
      Class<T> factoryClass,
      Map<String, String> properties) {

   Preconditions.checkNotNull(factoryClass);
   Preconditions.checkNotNull(properties);
     //先根据TableSourceFactory.class 过滤出source的factory
   List<T> classFactories = filterByFactoryClass(
      factoryClass,
      properties,
      foundFactories);
     //再根据with参数中的connect.type过滤出满足条件的SourceFactory
   List<T> contextFactories = filterByContext(
      factoryClass,
      properties,
      classFactories);
    //最终判断一下所有properties是否都支持
   return filterBySupportedProperties(
      factoryClass,
      properties,
      classFactories,
      contextFactories);
}

大致调用栈如下


在这里插入图片描述

后续的优化部分其实都是直接基于 RelNode来完成的。

SQL 转换及优化

转换的流程主要分为四个部分,即 1)将 Operation 转换为 RelNode,2)优化 RelNode,3)转换成 ExecNode,4)转换为底层的 Transformation 算子。

如果是DML操作进过SQL转化后会变为ModifyOperation,就会调用Planner的translate方法。

具体实现在PlannerBase.scala

override def translate(
    modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
  if (modifyOperations.isEmpty) {
    return List.empty[Transformation[_]]
  }
  // prepare the execEnv before translating
  getExecEnv.configure(
    getTableConfig.getConfiguration,
    Thread.currentThread().getContextClassLoader)
  overrideEnvParallelism()
    //转化为relNode
  val relNodes = modifyOperations.map(translateToRel)
  //SQL优化
  val optimizedRelNodes = optimize(relNodes)
  //转化为execNode
  val execNodes = translateToExecNodePlan(optimizedRelNodes)
  //转化为底层的transfomation
  translateToPlan(execNodes)
}

优化比较复杂,Blink主要是基于Calcite自己的优化,并自定义了一些优化逻辑,有兴趣的读者可以自行研究。

SQL执行

在得到Transformation后的转化逻辑就和streaming模式一致了,可以参考我之前的博客Flink源码阅读之基于Flink1.10的任务提交流程和[Flink源码阅读之基于Flink1.10的任务执行流程](

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342