Hudi 源码之数据写入逻辑

背景

本篇以Flink操作Hudi表为例,分析COW表和MOR表的upsert以及insert操作详细的执行流程。

StreamWriteFunction

Hudi Flink的StreamWriteFunction负责将数据流写入Hudi表。因此我们从StreamWriteFunction的处理数据方法processElement开始分析。

processElement方法将数据缓存到buckets中。bucket的按照一定规则定期flush数据。

@Override
public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
    bufferRecord((HoodieRecord<?>) value);
}

DataBucket为数据写入缓存。缓存了需要写入某个partition path和fileID之下的一批record。

Bucket中的数据不是无限累加的。满足下面2个条件之一会触发flush bucket:

  1. 如果某个DataBucket数据累积够FlinkOptions#WRITE_BATCH_SIZE会flush这个bucket。

  2. 如果缓存的数据总大小超过FlinkOptions#WRITE_TASK_MAX_SIZE,会flush缓存数据最多的bucket。

上面逻辑在bufferRecord方法中,内容如下:

protected void bufferRecord(HoodieRecord<?> value) {
    // 根据HoodieRecord的partitionPath和fileId构建bucketID
    final String bucketID = getBucketID(value);

    // 将数据封装为DataBucket,放入buckets中
    DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
                                                     k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
    // 转换HoodieRecord为DataItem类型
    final DataItem item = DataItem.fromHoodieRecord(value);
    // 存放入bucket中
    bucket.records.add(item);

    // 检查bucket中数据总大小是否超过batch size限制
    // 为了提高性能,并非每次都精确计算record的大小。而是计算出第一条数据的大小之后,以后每次数据到来都沿用这个大小来累加
    // 内部有一个采样函数,有百分之一的概率重新计算当前record的大小,然后再次沿用这个值,一直重复
    boolean flushBucket = bucket.detector.detect(item);
    // 检查总buffer数据大小是否超过write task max size
    boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
    if (flushBucket) {
        // 如果需要flush这个bucket,调用flushBucket
        if (flushBucket(bucket)) {
            // 然后buffer总大小减去这个bucket的数据量
            this.tracer.countDown(bucket.detector.totalSize);
            // 重置这个bucket
            bucket.reset();
        }
    } else if (flushBuffer) {
        // find the max size bucket and flush it out
        // 如果需要flush数据最多的bucket
        // 找出totalSize最大的dataBucket
        DataBucket bucketToFlush = this.buckets.values().stream()
            .max(Comparator.comparingLong(b -> b.detector.totalSize))
            .orElseThrow(NoSuchElementException::new);
        // 然后flush这个bucket
        if (flushBucket(bucketToFlush)) {
            this.tracer.countDown(bucketToFlush.detector.totalSize);
            bucketToFlush.reset();
        } else {
            LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
        }
    }
}

接下来是flushBucket方法,内容如下:

private boolean flushBucket(DataBucket bucket) {
    // 获取上一次成功checkpoint的时间
    String instant = instantToWrite(true);

    if (instant == null) {
        // in case there are empty checkpoints that has no input data
        LOG.info("No inflight instant when flushing data, skip.");
        return false;
    }

    // 获取缓存的数据
    List<HoodieRecord> records = bucket.writeBuffer();
    // 检查缓存的数据数量必须大于0
    ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
    // 如果需要在insert/upsert之前去除重复数据
    if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
        Properties props = new Properties();
        config.addAllToProperties(props);
        // 调用deduplicateRecords去除重复数据
        records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
            .deduplicateRecords(records, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema(), props, recordMerger);
    }
    // 在flush之前执行预写入逻辑,赋给第一个record正确的partition path和fileID
    bucket.preWrite(records);
    // 调用writeFunction方法
    // writeFunction的创建下文分析
    final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
    // 清空缓存
    records.clear();
    // 构建元数据写入事件
    final WriteMetadataEvent event = WriteMetadataEvent.builder()
        .taskID(taskID)
        .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
        .writeStatus(writeStatus)
        .lastBatch(false)
        .endInput(false)
        .build();

    // 发送这个事件到coordinator
    this.eventGateway.sendEventToCoordinator(event);
    // 加入写入状态到writeStatuses集合
    writeStatuses.addAll(writeStatus);
    // 返回flush成功
    return true;
}

StreamWriteFunction会在snapshotState的时候调用flushRemaining方法,确保创建快照的时候将缓存的数据都发送出去。方法逻辑和flushBucket基本相同,不再赘述。

这里可以总结一下Flink Hudi刷写数据到磁盘的3个时机:

  1. 某个bucket数据量达到配置的bucket最大容量,刷写这个bucket
  2. 所有bucket数据量总和达到了配置值,刷写数据量最多的bucket
  3. Flink checkpoint的时候,刷写所有的缓存数据

上面提到的writeFunctioninitWriteFunction方法创建出来,根据不同的操作类型,创建出不同的writeFunction。代码逻辑如下:

private void initWriteFunction() {
    final String writeOperation = this.config.get(FlinkOptions.OPERATION);
    switch (WriteOperationType.fromValue(writeOperation)) {
        case INSERT:
            this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime);
            break;
        case UPSERT:
            this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime);
            break;
        case INSERT_OVERWRITE:
            this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime);
            break;
        case INSERT_OVERWRITE_TABLE:
            this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime);
            break;
        default:
            throw new RuntimeException("Unsupported write operation : " + writeOperation);
    }
}

这里的writeClientHoodieFlinkWriteClient。分为4种逻辑:

  • insert
  • upsert
  • insertOverwrite
  • insertOverwriteTable

下面以upsert操作为例分析writeClient.upsert方法。

COW Upsert逻辑

HoodieFlinkWriteClient

继续上一节末尾分析HoodieFlinkWriteClientupsert方法。代码如下:

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
    // 创建HoodieTable
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
        initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
    // 校验schema是否和数据兼容
    table.validateUpsertSchema();
    // 配置operationType为UPSERT
    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
    HoodieWriteMetadata<List<WriteStatus>> result;
    // 根据不同的操作类型创建出对应的HoodieWriteHandle,它封装了各种操作类型的数据写入逻辑
    // AutoCloseableWriteHandle为包装类,在try块结束的时候调用writeHandle的closeGracefully关闭它
    try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
        // 调用table的upsert方法执行upsert,具体操作方法由closeableHandle.getWriteHandle()提供
        result = ((HoodieFlinkTable<T>) table).upsert(context, closeableHandle.getWriteHandle(), instantTime, records);
    }
    // 更新监控信息
    if (result.getIndexLookupDuration().isPresent()) {
        metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
    }
    // 执行写入后逻辑,更新监控信息
    return postWrite(result, instantTime, table);
}

这个方法将upsert逻辑交给了HoodieFlinkTable去执行。负责具体怎么写入的处理逻辑在closeableHandle.getWriteHandle()中。接下来分别介绍WriteHandle的创建过程和HoodieFlinkTable的执行过程。

WriteHandle创建过程

在分析upsert方法之前, 需要先搞清楚什么条件下使用的具体是哪个writeHandle。这一段内容中我们分析writeHandle的创建逻辑。

Hudi提供了多种Handle,分别对应不同类型的写入方式。

writeHandle通过AutoCloseableWriteHandle创建出来。上一节代码中AutoCloseableWriteHandle的构造函数内容如下:

AutoCloseableWriteHandle(
    List<HoodieRecord<T>> records,
    String instantTime,
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table
) {
    this.writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), instantTime, table, records.listIterator());
}

上面又调用了getOrCreateWriteHandle方法:

private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
    HoodieRecord<T> record,
    HoodieWriteConfig config,
    String instantTime,
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
    Iterator<HoodieRecord<T>> recordItr) {
    // caution: it's not a good practice to modify the handles internal.
    FlinkWriteHandleFactory.Factory<T,
    List<HoodieRecord<T>>,
    List<HoodieKey>,
    List<WriteStatus>> writeHandleFactory = FlinkWriteHandleFactory.getFactory(table.getMetaClient().getTableConfig(), config);
    return writeHandleFactory.create(this.bucketToHandles, record, config, instantTime, table, recordItr);
}

这里使用writeHandleFactory工厂类创建writeHandleFlinkWriteHandleFactory根据不同的table配置和写入配置,创建符合要求的writeHandleFactoryFlinkWriteHandleFactory.getFactory方法内容为:

public static <T, I, K, O> Factory<T, I, K, O> getFactory(
    HoodieTableConfig tableConfig,
    HoodieWriteConfig writeConfig) {
    // 如果允许key重复
    if (writeConfig.allowDuplicateInserts()) {
        return ClusterWriteHandleFactory.getInstance();
    }
    // 如果是MOR类型表
    if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
        return DeltaCommitWriteHandleFactory.getInstance();
    } else if (tableConfig.isCDCEnabled()) {
        // 如果启用了CDC
        return CdcWriteHandleFactory.getInstance();
    } else {
        // 如果是COW类型表
        return CommitWriteHandleFactory.getInstance();
    }
}

如果是COW类型表,返回的是CommitWriteHandleFactory(对于COW表而言每次写入都是commit,而MOR表写入时delta commit,只有compaction生成parquet文件的时候才是commit)。继续分析writeHandleFactorycreate方法,它位于父类BaseCommitWriteHandleFactory中,代码如下:

@Override
public HoodieWriteHandle<?, ?, ?, ?> create(
    Map<String, Path> bucketToHandles,
    HoodieRecord<T> record,
    HoodieWriteConfig config,
    String instantTime,
    HoodieTable<T, I, K, O> table,
    Iterator<HoodieRecord<T>> recordItr) {
    final HoodieRecordLocation loc = record.getCurrentLocation();
    final String fileID = loc.getFileId();
    final String partitionPath = record.getPartitionPath();

    Path writePath = bucketToHandles.get(fileID);
    // record对应的文件存在,创建replaceHandle
    if (writePath != null) {
        HoodieWriteHandle<?, ?, ?, ?> writeHandle =
            createReplaceHandle(config, instantTime, table, recordItr, partitionPath, fileID, writePath);
        bucketToHandles.put(fileID, ((MiniBatchHandle) writeHandle).getWritePath()); // override with new replace handle
        return writeHandle;
    }

    final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
    // 新增数据,创建FlinkCreateHandle
    if (loc.getInstantTime().equals("I")) {
        writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
                                              fileID, table.getTaskContextSupplier());
    } else {
        // 创建mergeHandle
        writeHandle = createMergeHandle(config, instantTime, table, recordItr, partitionPath, fileID);
    }
    bucketToHandles.put(fileID, ((MiniBatchHandle) writeHandle).getWritePath());
    return writeHandle;
}

对于CommitWriteHandleFactory而言,createReplaceHandle返回的是FlinkMergeAndReplaceHandlecreateMergeHandle返回的是FlinkMergeHandle

Handle的创建过程到这里分析完了。接下来分析HoodieFlinkCopyOnWriteTable

HoodieFlinkCopyOnWriteTable

HoodieFlinkWriteClientupsert方法调用了HoodieFlinkTableupsert方法。HoodieFlinkTable顾名思义,代表了Flink下管理的Hudi table。它有两个子类:

  • HoodieFlinkCopyOnWriteTable:对应COW表
  • HoodieFlinkMergeOnReadTable:对应MOR表

由于本章是围绕COW表分析。我们查看HoodieFlinkCopyOnWriteTableupsert方法:

public HoodieWriteMetadata<List<WriteStatus>> upsert(
    HoodieEngineContext context,
    HoodieWriteHandle<?, ?, ?, ?> writeHandle,
    String instantTime,
    List<HoodieRecord<T>> records) {
    return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
}

FlinkUpsertCommitActionExecutor这个类负责执行upsert动作。它的execute方法又调用了FlinkWriteHelperwrite方法:

@Override
public HoodieWriteMetadata execute() {
    return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
                                                config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType);
}

逻辑流转到了FlinkWriteHelper中,下面对它展开分析。

FlinkWriteHelper

由于每条数据已经被标记了bucket ID(partition path和fileID),这里直接将数据交给executor(BaseFlinkCommitActionexecutor)执行写入操作。数据标记bucket ID的逻辑位于BucketAssignFunction,后面分析。

@Override
public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
                                                    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean shouldCombine, int configuredShuffleParallelism,
                                                    BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, WriteOperationType operationType) {
    try {
        Instant lookupBegin = Instant.now();
        Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
        // 执行写入操作
        HoodieWriteMetadata<List<WriteStatus>> result = executor.execute(inputRecords);
        result.setIndexLookupDuration(indexLookupDuration);
        return result;
    } catch (Throwable e) {
        if (e instanceof HoodieUpsertException) {
            throw (HoodieUpsertException) e;
        }
        throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
    }
}

跟踪代码可以发现execute位于BaseFlinkCommitActionExecutor。我们继续分析。

BaseFlinkCommitActionExecutor

因为record被标记bucket ID之后,Flink能够按照bucket ID进行分发操作。这样executor处理的一批数据是属于同一个bucket的。

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
    HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

    List<WriteStatus> writeStatuses = new LinkedList<>();
    // 获取这批数据的partition path和file ID
    // 因为数据是按照bucket ID进行分发,所以说同一批数据属于同一个bucket。这里只获取第一个数据的信息就可以了
    final HoodieRecord<?> record = inputRecords.get(0);
    final String partitionPath = record.getPartitionPath();
    final String fileId = record.getCurrentLocation().getFileId();
    // 从instant time还原回bucket类型,是新增还是更新
    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
        ? BucketType.INSERT
        : BucketType.UPDATE;
    // 处理分区upsert
    handleUpsertPartition(
        instantTime,
        partitionPath,
        fileId,
        bucketType,
        inputRecords.iterator())
        .forEachRemaining(writeStatuses::addAll);
    // 设置写入操作元数据,包括写入状态和耗时
    setUpWriteMetadata(writeStatuses, result);
    return result;
}

继续分析handleUpsertPartition方法:

protected Iterator<List<WriteStatus>> handleUpsertPartition(
    String instantTime,
    String partitionPath,
    String fileIdHint,
    BucketType bucketType,
    Iterator recordItr) {
    try {
        if (this.writeHandle instanceof HoodieCreateHandle) {
            // During one checkpoint interval, an insert record could also be updated,
            // for example, for an operation sequence of a record:
            //    I, U,   | U, U
            // - batch1 - | - batch2 -
            // the first batch(batch1) operation triggers an INSERT bucket,
            // the second batch batch2 tries to reuse the same bucket
            // and append instead of UPDATE.
            return handleInsert(fileIdHint, recordItr);
        } else if (this.writeHandle instanceof HoodieMergeHandle) {
            return handleUpdate(partitionPath, fileIdHint, recordItr);
        } else {
            switch (bucketType) {
                case INSERT:
                    return handleInsert(fileIdHint, recordItr);
                case UPDATE:
                    return handleUpdate(partitionPath, fileIdHint, recordItr);
                default:
                    throw new AssertionError();
            }
        }
    } catch (Throwable t) {
        String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
        LOG.error(msg, t);
        throw new HoodieUpsertException(msg, t);
    }
}

上面方法根据writeHandle类型确认调用insert逻辑还是update逻辑。如果根据writeHandle无法明确区分,则使用bucketType判断。

insert操作对应的是handleInsert方法:

@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
    throws Exception {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
        LOG.info("Empty partition");
        return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }
    return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
                                         taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
}

这里将record的iterator包装在了FlinkLazyInsertIterable中。FlinkLazyInsertIterable是一种延迟写入的迭代器。只有在遍历获取写入状态的时候才会真正执行数据写入。

update操作对应的是handleUpdate方法:

@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
                                                Iterator<HoodieRecord<T>> recordItr)
    throws IOException {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
        LOG.info("Empty partition with fileId => " + fileId);
        return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }
    // these are updates
    HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;
    return handleUpdateInternal(upsertHandle, fileId);
}

handleUpdateInternal方法逻辑在后面更新数据逻辑中分析。

插入数据逻辑

FlinkLazyInsertIterable

这里继续分析上面提到的FlinkLazyInsertIterable类。FlinkLazyInsertIterable继承了LazyIterableIteratorLazyIterableIterator在遍历的时候将数据写入底层文件。通过分析代码我们可以看到LazyIterableIteratornext方法调用了computeNext方法。

@Override
public O next() {
    try {
        return computeNext();
    } catch (Exception ex) {
        throw new RuntimeException(ex);
    }
}

computeNext方法的实现位于子类FlinkLazyInsertIterable中。这个方法执行了数据写入逻辑。代码如下:

@Override
protected List<WriteStatus> computeNext() {
    // Executor service used for launching writer thread.
    HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
    try {
        // 获取record的schema
        final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
        // 创建了批量执行器
        // 消费(写入)数据的逻辑由getExplicitInsertHandler提供
        // 消费前数据的变形(transform)操作由getTransformer提供,将record,schema和写入配置包装在一起
        bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(),
                                                          getTransformer(schema, hoodieConfig));
        // 执行写入
        final List<WriteStatus> result = bufferedIteratorExecutor.execute();
        checkState(result != null && !result.isEmpty());
        return result;
    } catch (Exception e) {
        throw new HoodieException(e);
    } finally {
        if (null != bufferedIteratorExecutor) {
            bufferedIteratorExecutor.shutdownNow();
            bufferedIteratorExecutor.awaitTermination();
        }
    }
}

数据的写入逻辑由getExplicitInsertHandler方法创建:

  private ExplicitWriteHandler getExplicitInsertHandler() {
    HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle();
    return new ExplicitWriteHandler(handle);
  }

bufferedIteratorExecutor调用ExplicitWriteHandlerconsumer方法处理数据。我们看下ExplicitWriteHandlerconsume方法:

@Override
public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> genResult) {
    final HoodieRecord insertPayload = genResult.getResult();
    handle.write(insertPayload, genResult.schema, new TypedProperties(genResult.props));
}

FlinkCreateHandle

它调用了HoodieWriteHandlewrite方法,又调用了子类的doWrite方法。这里的HoodieWriteHandle实现类为FlinkCreateHandle。它的doWrite方法位于父类HoodieCreateHandle中,内容如下:

@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
    Option<Map<String, String>> recordMetadata = record.getMetadata();
    try {
        // 如果record不需要删除
        if (!HoodieOperation.isDelete(record.getOperation()) && !record.isDelete(schema, config.getProps())) {
            // 是否需要忽略
            if (record.shouldIgnore(schema, config.getProps())) {
                return;
            }

            // 为record增加metadata字段
            MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
            HoodieRecord populatedRecord =
                record.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, config.getProps());

            // 判断是否写入metadata数据,CreateHandleFactory默认指定preserveMetadata为false
            if (preserveMetadata) {
                fileWriter.write(record.getRecordKey(), populatedRecord, writeSchemaWithMetaFields);
            } else {
                fileWriter.writeWithMetadata(record.getKey(), populatedRecord, writeSchemaWithMetaFields);
            }

            // Update the new location of record, so we know where to find it next
            // 解除sealed(密封)状态,密封的record无法setCurrentLocation和setNewLocation
            record.unseal();
            // 配置record的新location,绑定record和它从属的fileID
            record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
            // 重新密封
            record.seal();

            // 已写入数据条数加1
            recordsWritten++;
            // insert数据条数加1
            insertRecordsWritten++;
        } else {
            recordsDeleted++;
        }
        // 标记写入成功
        writeStatus.markSuccess(record, recordMetadata);
        // deflate record payload after recording success. This will help users access payload as a
        // part of marking
        // record successful.
        // 清空HoodieRecord的数据负载
        record.deflate();
    } catch (Throwable t) {
        // Not throwing exception from here, since we don't want to fail the entire job
        // for a single record
        writeStatus.markFailure(record, t, recordMetadata);
        LOG.error("Error writing record " + record, t);
    }
}

以上是写入数据的流程。接下来分析fileWriter的创建流程和具体的写入方法。

FileWriter创建流程

获取fileWriter的逻辑位于HoodieFileWriterFactorygetFileWriter方法,代码如下:

public static <T, I, K, O> HoodieFileWriter getFileWriter(
    String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
    TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) throws IOException {
    final String extension = FSUtils.getFileExtension(path.getName());
    HoodieFileWriterFactory factory = getWriterFactory(recordType);
    return factory.getFileWriterByFormat(extension, instantTime, path, conf, config, schema, taskContextSupplier);
}

getWriterFactory方法根据record类型选用fileWriterFactory

private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
    switch (recordType) {
        case AVRO:
            return new HoodieAvroFileWriterFactory();
        case SPARK:
            try {
                Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
                return (HoodieFileWriterFactory) clazz.newInstance();
            } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) {
                throw new HoodieException("Unable to create hoodie spark file writer factory", e);
            }
        default:
            throw new UnsupportedOperationException(recordType + " record type not supported yet.");
    }
}

通过上面代码可知AVRO类型返回的是HoodieAvroFileWriterFactory。获取到factory之后,在根据文件格式,确定创建哪种具体的writer:

protected <T, I, K, O> HoodieFileWriter getFileWriterByFormat(
    String extension, String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema,
    TaskContextSupplier taskContextSupplier) throws IOException {
    if (PARQUET.getFileExtension().equals(extension)) {
        // parquet格式
        return newParquetFileWriter(instantTime, path, conf, config, schema, taskContextSupplier);
    }
    if (HFILE.getFileExtension().equals(extension)) {
        // hfile格式
        return newHFileFileWriter(instantTime, path, conf, config, schema, taskContextSupplier);
    }
    if (ORC.getFileExtension().equals(extension)) {
        // orc格式
        return newOrcFileWriter(instantTime, path, conf, config, schema, taskContextSupplier);
    }
    throw new UnsupportedOperationException(extension + " format not supported yet.");
}

Writer写入流程

使用Flink写入COW表的时候数据格式为parquet,所以fileWriter的实现类为HoodieAvroParquetWriter。它的write方法位于父类HoodieAvroFileWriter中:

@Override
default void write(String recordKey, HoodieRecord record, Schema schema, Properties props) throws IOException {
    // 转换record为avro格式
    IndexedRecord avroPayload = record.toIndexedRecord(schema, props).get().getData();
    writeAvro(recordKey, avroPayload);
}

writeAvro方法:

@Override
public void writeAvro(String key, IndexedRecord object) throws IOException {
    super.write(object);
    if (populateMetaFields) {
        writeSupport.add(key);
    }
}

其中super.write调用的是HoodieBaseParquetWriterwrite方法,又间接调用了HoodieBaseParquetWriter父类ParquetWriterwrite方法。

写入metadata的方法为writeAvroWithMetadata,如下所示:

@Override
public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException {
    if (populateMetaFields) {
        // 组装元数据到avroRecord中
        prepRecordWithMetadata(key, avroRecord, instantTime,
                               taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
        super.write(avroRecord);
        writeSupport.add(key.getRecordKey());
    } else {
        super.write(avroRecord);
    }
}

再往上追踪就到了org.apache.parquet.hadoopwrite方法。Hoodie从插入数据开始到写入parquet文件的流程到此分析完毕。下面开始分析更新数据逻辑。

更新数据逻辑

handleUpdateInternal 数据更新

BaseFlinkCommitActionExecutorhandleUpdateInternal方法在upsert数据的时候调用,代码如下:

protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
    throws IOException {
    if (upsertHandle.getOldFilePath() == null) {
        // update操作这种情况下,record一定已存在于某个file中,否则会有问题
        throw new HoodieUpsertException(
            "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
    } else {
        // 合并数据
        HoodieMergeHelper.newInstance().runMerge(table, upsertHandle);
    }

    // TODO(vc): This needs to be revisited
    if (upsertHandle.getPartitionPath() == null) {
        LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
                 + upsertHandle.writeStatuses());
    }

    return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}

upsert操作的核心是数据的合并。执行合并操作的runMerge内容较长,如下所示:

@Override
public void runMerge(HoodieTable<?, ?, ?, ?> table,
                     HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws IOException {
    // 获取写入配置
    HoodieWriteConfig writeConfig = table.getConfig();
    // 获取需要写入的baseFile
    HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

    // 获取hadoop的配置
    Configuration hadoopConf = new Configuration(table.getHadoopConf());
    // 获取RecordType,是AVRO还是SPARK
    HoodieRecord.HoodieRecordType recordType = table.getConfig().getRecordMerger().getRecordType();
    // 构建baseFile读取器
    HoodieFileReader baseFileReader = HoodieFileReaderFactory
        .getReaderFactory(recordType)
        .getFileReader(hadoopConf, mergeHandle.getOldFilePath());
    HoodieFileReader bootstrapFileReader = null;

    // 获取写入数据的schema,包含元数据字段
    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
    // 获取baseFile的schema
    Schema readerSchema = baseFileReader.getSchema();

    // In case Advanced Schema Evolution is enabled we might need to rewrite currently
    // persisted records to adhere to an evolved schema
    // Hudi支持schema evolution,此步骤生成旧schema的record转换为新schema的record逻辑的Function
    Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt =
        composeSchemaEvolutionTransformer(readerSchema, writerSchema, baseFile, writeConfig, table.getMetaClient());

    // Check whether the writer schema is simply a projection of the file's one, ie
    //   - Its field-set is a proper subset (of the reader schema)
    //   - There's no schema evolution transformation necessary
    // 检查writerSchema是不是readerSchema的投影
    // writerSchema的列是readerSchema列的子集,并且不存在schema evolution
    boolean isPureProjection = isStrictProjectionOf(readerSchema, writerSchema)
        && !schemaEvolutionTransformerOpt.isPresent();
    // Check whether we will need to rewrite target (already merged) records into the
    // writer's schema
    // 判断是否需要重写已merge的record为writerSchema
    boolean shouldRewriteInWriterSchema = writeConfig.shouldUseExternalSchemaTransformation()
        || !isPureProjection
        || baseFile.getBootstrapBaseFile().isPresent();

    HoodieExecutor<Void> wrapper = null;

    try {
        Iterator<HoodieRecord> recordIterator;

        // In case writer's schema is simply a projection of the reader's one we can read
        // the records in the projected schema directly
        // 获取baseFile中record的迭代器
        // 如果writerSchema是readerSchema的投影,可以直接使用writerSchema来读取baseFile
        ClosableIterator<HoodieRecord> baseFileRecordIterator =
            baseFileReader.getRecordIterator(isPureProjection ? writerSchema : readerSchema);
        Schema recordSchema;
        if (baseFile.getBootstrapBaseFile().isPresent()) {
            // 如果baseFile有bootstrapFile
            // 获取路径
            Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
            // 获取配置
            Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
            // 读取bootstrapFile
            bootstrapFileReader =
                HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath);

            // 合并迭代器,迭代的时候将baseFile和bootstrapFile对应的记录拼装在一起形成一条记录
            recordIterator = new MergingIterator<>(
                baseFileRecordIterator,
                bootstrapFileReader.getRecordIterator(),
                (left, right) ->
                left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
            recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
        } else {
            // 如果没有bootstrapFile,遍历baseFile
            recordIterator = baseFileRecordIterator;
            recordSchema = isPureProjection ? writerSchema : readerSchema;
        }

        // 判断是否buffer record。只有BOUNDED_IN_MEMORY和DISRUPTOR类型的executor才能buffer
        // 默认的SIMPLE类型不会buffer
        boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);

        // new UpdateHandler(mergeHandle)为真正的合并逻辑
        wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
            // 这里是合并之前record的变换逻辑
            HoodieRecord newRecord;
            if (schemaEvolutionTransformerOpt.isPresent()) {
                // 如果使用schemaEvolution,执行变换
                newRecord = schemaEvolutionTransformerOpt.get().apply(record);
            } else if (shouldRewriteInWriterSchema) {
                // 需要重写数据为writerSchema
                newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema);
            } else {
                // 否则无需变换
                newRecord = record;
            }

            // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
            //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
            //       it since these records will be put into queue of QueueBasedExecutorFactory.
            // 如果需要缓存数据,返回数据的副本
            return isBufferingRecords ? newRecord.copy() : newRecord;
        }, table.getPreExecuteRunnable());

        wrapper.execute();
    } catch (Exception e) {
        throw new HoodieException(e);
    } finally {
        // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
        // and executor firstly and then close mergeHandle.
        baseFileReader.close();
        if (bootstrapFileReader != null) {
            bootstrapFileReader.close();
        }
        if (null != wrapper) {
            wrapper.shutdownNow();
            wrapper.awaitTermination();
        }
        mergeHandle.close();
    }
}

执行合并的时候,executor调用了UpdateHandlerconsume方法。这个方法又调用了upsertHandlewrite方法。和插入数据的executor逻辑类似不再赘述。接下来展开mergeHandlewrite方法分析。对于Flink这里的mergeHandleFlinkMergeHandle。它的write方法位于父类HoodieMergeHandle中。逻辑如下:

/**
 * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
 */
public void write(HoodieRecord<T> oldRecord) {
    // 获取新老schema,主要区别是有没有元数据字段
    Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields : writeSchema;
    Schema newSchema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : writeSchema;
    // 是否复制旧数据
    boolean copyOldRecord = true;
    // 获取record key
    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
    // 获取hoodie.payload配置
    TypedProperties props = config.getPayloadConfig().getProps();
    // 如果是新数据
    if (keyToNewRecords.containsKey(key)) {
        // If we have duplicate records that we are updating, then the hoodie record will be deflated after
        // writing the first record. So make a copy of the record to be merged
        HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
        try {
            // 合并新老数据
            Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
            // 获取合并后的schema
            Schema combineRecordSchema = mergeResult.map(Pair::getRight).orElse(null);
            // 合并后的数据
            Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
            if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
                // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
                // 如果是可忽略的record,复制旧数据
                copyOldRecord = true;
                // 否则,将更新后的数据写入
            } else if (writeUpdateRecord(newRecord, oldRecord, combinedRecord, combineRecordSchema)) {
                /*
           * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
           * write the combined new value
           *
           * We no longer need to copy the old record over.
           */
                // 不再复制旧数据
                copyOldRecord = false;
            }
            // 记录这个key,已经写入完成
            writtenRecordKeys.add(key);
        } catch (Exception e) {
            throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
                                            + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
        }
    }

    if (copyOldRecord) {
        // 如果需要复制旧数据
        try {
            // NOTE: We're enforcing preservation of the record metadata to keep existing semantic
            // 旧数据写入到文件
            writeToFile(new HoodieKey(key, partitionPath), oldRecord, oldSchema, props, true);
        } catch (IOException | RuntimeException e) {
            String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
                                          key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
            LOG.debug("Old record is " + oldRecord);
            throw new HoodieUpsertException(errMsg, e);
        }
        // 已写入的数据数加1
        recordsWritten++;
    }
}

合并操作的细节位于recordMerger.merge方法中。下一节继续分析。

HoodieAvroRecordMerger::merge 合并数据

合并方法将新旧两条record按照一定规则,合并成为一条record,合并过程中schema可能发生变更。分析如下:

@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
    // 既然是HoodieAvroRecordMerger,必须检查新旧record的类型为AVRO
    ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
    ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
    // 获取合并方式,默认是COMBINING
    Config.LegacyOperationMode legacyOperatingMode = Config.LegacyOperationMode.valueOf(
        props.getString(Config.LEGACY_OPERATING_MODE.key(), Config.LEGACY_OPERATING_MODE.defaultValue()));

    switch (legacyOperatingMode) {
        // 如果是PRE_COMBINING方式
        case PRE_COMBINING:
            HoodieRecord res = preCombine(older, newer, newSchema, props);
            // 合并后和谁相同,就用谁的schema
            if (res == older) {
                return Option.of(Pair.of(res, oldSchema));
            } else {
                return Option.of(Pair.of(res, newSchema));
            }
        // 如果是COMBINING方式
        case COMBINING:
            return combineAndGetUpdateValue(older, newer, newSchema, props)
                .map(r -> Pair.of(r, (((HoodieAvroIndexedRecord) r).getData()).getSchema()));

        default:
            throw new UnsupportedOperationException(String.format("Unsupported legacy operating mode (%s)", legacyOperatingMode));
    }
}

preCombine方法:

private HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) {
    HoodieRecordPayload payload = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData(), schema, props));
    return new HoodieAvroRecord(newer.getKey(), payload, newer.getOperation());
}

combineAndGetUpdateValue方法:

private Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
    Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData);
    if (!previousAvroData.isPresent()) {
        return Option.empty();
    }

    return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousAvroData.get(), schema, props)
        .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload));
}

上面这两个方法分别间接调用了HoodieAvroRecord中data的preCombinecombineAndGetUpdateValue方法。这个data为Hudi的payload。下面以最为常用的OverwriteWithLatestAvroPayloadPartialUpdateAvroPayload为例,分别分析他们的preCombinecombineAndGetUpdateValue方法。

OverwriteWithLatestAvroPayload

Hudi的payload类型由hoodie.datasource.write.payload.class决定。默认为OverwriteWithLatestAvroPayload。它的逻辑比较简单。preCombine方法使用order field(preCombined字段)决定数据新旧。combineAndGetUpdateValue直接取用新数据的值。代码如下所示:

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
    if (oldValue.recordBytes.length == 0) {
        // use natural order for delete record
        return this;
    }
    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
        // pick the payload with greatest ordering value
        return oldValue;
    } else {
        return this;
    }
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
    return getInsertValue(schema);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
    if (recordBytes.length == 0 || isDeletedRecord) {
        return Option.empty();
    }

    return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}

PartialUpdateAvroPayload

相比OverwriteWithLatestAvroPayloadPartialUpdateAvroPayload的逻辑要复杂的多。它的preCombine方法和combineAndGetUpdateValue逻辑比较相似,都是合并数据。具体可以参考源代码中的注释。简单总结为:例如排序字段(order field)是ts,那么对于record key相同的数据,ts大的会覆盖ts小的。除此之外还有其他的规则:如果新数据某个字段值为null,不会覆盖老数据;如果老数据某个字段值恰好是字段的默认值,它一定会被新数据替换。源代码中的例子如下:

/** 
 *  Illustration with simple data.
 *  let's say the order field is 'ts' and schema is :
 *  {
 *    [
 *      {"name":"id","type":"string"},
 *      {"name":"ts","type":"long"},
 *      {"name":"name","type":"string"},
 *      {"name":"price","type":"string"}
 *    ]
 *  }
 *
 *  case 1
 *  Current data:
 *      id      ts      name    price
 *      1       1       name_1  price_1
 *  Insert data:
 *      id      ts      name    price
 *      1       2       null    price_2
 *
 *  Result data after #preCombine or #combineAndGetUpdateValue:
 *      id      ts      name    price
 *      1       2       name_1  price_2
 *
 *  case 2
 *  Current data:
 *      id      ts      name    price
 *      1       2       name_1  null
 *  Insert data:
 *      id      ts      name    price
 *      1       1       null    price_1
 *
 *  Result data after preCombine or combineAndGetUpdateValue:
 *      id      ts      name    price
 *      1       2       name_1  price_1
 */

仔细分析的话,这两个方法还是有区别的:

  • preCombine使用FlinkOptions中的precombine.field指定的field比较数据的新旧(在PayloadCreationcreatePayload方法中指定)。
  • combineAndGetUpdateValue使用hoodie.payload.ordering.field指定的field比较数据的新旧。

接下来首先分析preCombine调用逻辑。

preCombine 逻辑

@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) {
    if (oldValue.recordBytes.length == 0) {
        // use natural order for delete record
        // 如果old没数据,使用新值
        return this;
    }
    // pick the payload with greater ordering value as insert record
    // 决定用新数据还是旧数据
    // 使用preCombined字段来比较大小,例如时间戳ts
    final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
    try {
        // oldRecord转换为avro格式
        GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
        // 新老数据合并
        Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
        if (mergedRecord.isPresent()) {
            // 返回合并后的payload
            return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
                                                shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
        }
    } catch (Exception ex) {
        return this;
    }
    return this;
}

合并新老数据mergeOldRecord方法:

private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
                                             Schema schema,
                                             boolean isOldRecordNewer) throws IOException {
    // 获取新数据。如果payload无数据或者是被删除的数据,返回Option.empty()
    Option<IndexedRecord> recordOption = getInsertValue(schema);
    if (!recordOption.isPresent()) {
        // use natural order for delete record
        return Option.empty();
    }

    if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
        // handling disorder, should use the metadata fields of the updating record
        // 如果旧数据较新,说明数据乱序
        // 如果有提交时间元数据
        return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
    } else if (isOldRecordNewer) {
        // 否则,合并record,第2个参数的record优先级高于第3个参数的record
        return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
    } else {
        return mergeRecords(schema, (GenericRecord) recordOption.get(), (GenericRecord) oldRecord);
    }
}

带有元数据,合并乱序record的mergeDisorderRecordsWithMetadata方法:

protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
    Schema schema,
    GenericRecord oldRecord,
    GenericRecord updatingRecord) {
    // 如果是删除数据,返回empty
    if (isDeleteRecord(oldRecord)) {
        return Option.empty();
    } else {
        // 开始构建合并后的数据
        final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
        List<Schema.Field> fields = schema.getFields();
        fields.forEach(field -> {
            final GenericRecord baseRecord;
            final GenericRecord mergedRecord;
            if (HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(field.name())) {
                // this is a metadata field
                // 走到这里说明字段是一个元数据字段
                // 元数据字段使用新数据的元数据字段值
                baseRecord = updatingRecord;
                mergedRecord = oldRecord;
            } else {
                // 其他字段使用老数据的字段值
                baseRecord = oldRecord;
                mergedRecord = updatingRecord;
            }
            // 设置字段值
            setField(baseRecord, mergedRecord, builder, field);
        });
        // 返回构造完成的数据
        return Option.of(builder.build());
    }
}

设置字段值的逻辑位于OverwriteNonDefaultsWithLatestAvroPayloadsetField方法:

protected void setField(
    GenericRecord baseRecord,
    GenericRecord mergedRecord,
    GenericRecordBuilder builder,
    Schema.Field field) {
    // 获取baseRecord的field字段值
    Object value = baseRecord.get(field.name());
    // 如果是String类型,调用toString
    value = field.schema().getType().equals(Schema.Type.STRING) && value != null ? value.toString() : value;
    // 获取字段的默认值
    Object defaultValue = field.defaultVal();
    // 下面的逻辑为,如果老数据字段值为字段默认值,替换为新数据字段值。否则采用老数据字段值
    if (!overwriteField(value, defaultValue)) {
        // 如果不需要覆盖字段值,使用baseRecord的值
        builder.set(field, value);
    } else {
        // 否则使用mergedRecord的值
        builder.set(field, mergedRecord.get(field.name()));
    }
}

// overwriteField方法位于OverwriteWithLatestAvroPayload类中
public Boolean overwriteField(Object value, Object defaultValue) {
    // 如果字段默认值是null并且实际值为null,返回true
    if (JsonProperties.NULL_VALUE.equals(defaultValue)) {
        return value == null;
    }
    // 否则当实际值和默认值相同的时候返回true
    return Objects.equals(value, defaultValue);
}

我们回到PartialUpdateAvroPayloadmergeOldRecord方法。继续分析OverwriteNonDefaultsWithLatestAvroPayloadmergeRecords方法。内容如下:

protected Option<IndexedRecord> mergeRecords(Schema schema, GenericRecord baseRecord, GenericRecord mergedRecord) {
    if (isDeleteRecord(baseRecord)) {
        // 如果有_hoodie_is_deleted字段并且标记为true,说明是删除的数据,返回empty
        return Option.empty();
    } else {
        final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
        List<Schema.Field> fields = schema.getFields();
        // 对于schema的字段逐个调用setField方法
        fields.forEach(field -> setField(baseRecord, mergedRecord, builder, field));
        return Option.of(builder.build());
    }
}

combineAndGetUpdateValue 逻辑

我们回到PartialUpdateAvroPayloadcombineAndGetUpdateValue方法。内容如下:

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
    return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
}

mergeOldRecord方法内容和上面的分析一致,不再赘述。

isRecordNewer方法判断record是否是较新的。逻辑如下:

private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) {
    // 获取hoodie.payload.ordering.field配置值
    String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
    if (!StringUtils.isNullOrEmpty(orderingField)) {
        // 如果orderingField存在
        
        // 获取hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled配置值,默认为false
        // 表示是否回转换成一致的timestamp格式
        boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty(
            KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
            KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));

        // 获取ordering field的值
        Comparable oldOrderingVal =
            (Comparable) HoodieAvroUtils.getNestedFieldVal(
            (GenericRecord) record,
            orderingField,
            true,
            consistentLogicalTimestampEnabled);

        // pick the payload with greater ordering value as insert record
        // 比较ordering field的值
        return oldOrderingVal != null
            && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal)
            && oldOrderingVal.compareTo(orderingVal) > 0;
    }
    return false;
}

writeUpdateRecord 写入更新后数据

我们直接分析writeUpdateRecord方法:

protected boolean writeUpdateRecord(HoodieRecord<T> newRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOpt, Schema writerSchema) throws IOException {
    boolean isDelete = false;
    if (combineRecordOpt.isPresent()) {
        updatedRecordsWritten++;
        if (oldRecord.getData() != combineRecordOpt.get().getData()) {
            // 如果旧数据和合并后的新数据不同,需要删除旧数据
            // the incoming record is chosen
            isDelete = HoodieOperation.isDelete(newRecord.getOperation());
        } else {
            // the incoming record is dropped
            // 否则不需要写入新数据
            return false;
        }
    }
    // 写入数据
    return writeRecord(newRecord, combineRecordOpt, writerSchema, config.getPayloadConfig().getProps(), isDelete);
}

最后的writeRecord方法将record写入文件。内容如下:

private boolean writeRecord(HoodieRecord<T> newRecord, Option<HoodieRecord> combineRecord, Schema schema, Properties prop, boolean isDelete) throws IOException {
    Option recordMetadata = newRecord.getMetadata();
    // 检查partition path必须匹配
    if (!partitionPath.equals(newRecord.getPartitionPath())) {
        HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
                                                                    + newRecord.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
        writeStatus.markFailure(newRecord, failureEx, recordMetadata);
        return false;
    }
    try {
        if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, config.getProps()) && !isDelete) {
            // 如果不是删除数据,将数据写入文件
            writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, preserveMetadata && useWriterSchemaForCompaction);
            // 写入record计数加1
            recordsWritten++;
        } else {
            // 否则删除数据计数加1
            recordsDeleted++;
        }
        // 标记写入成功
        writeStatus.markSuccess(newRecord, recordMetadata);
        // deflate record payload after recording success. This will help users access payload as a
        // part of marking
        // record successful.
        // 清空payload
        newRecord.deflate();
        return true;
    } catch (Exception e) {
        LOG.error("Error writing record  " + newRecord, e);
        writeStatus.markFailure(newRecord, e, recordMetadata);
    }
    return false;
}

Flink COW Insert

Insert逻辑和Upsert的调用链很多是相同的,不同之处仅是实现类。这里只分析有差异的地方。整体来说COW表的insert操作直接写入新parquet文件就可以了,省略了合并数据的过程,逻辑较为简单。

Insert逻辑位于HoodieFlinkCopyOnWriteTableinsert方法。内容如下:

  public HoodieWriteMetadata<List<WriteStatus>> insert(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> records) {
    return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
  }

FlinkInsertCommitActionExecutor::execute方法:

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
    return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
                                                config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType);
}

往后的逻辑和COW Upsert逻辑相同,不再赘述。

Bucket分配

BucketAssignFunction

我们从数据流入的processElement方法展开分析。

@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
    if (value instanceof IndexRecord) {
        IndexRecord<?> indexRecord = (IndexRecord<?>) value;
        this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
    } else {
        // 非索引类型数据,调用processRecord方法
        processRecord((HoodieRecord<?>) value, out);
    }
}

processRecord方法为数据标记新的location。逻辑如下:

private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
    // 1. put the record into the BucketAssigner;
    // 2. look up the state for location, if the record has a location, just send it out;
    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
    final HoodieKey hoodieKey = record.getKey();
    // 获取key和partition path
    final String recordKey = hoodieKey.getRecordKey();
    final String partitionPath = hoodieKey.getPartitionPath();
    final HoodieRecordLocation location;

    // Only changing records need looking up the index for the location,
    // append only records are always recognized as INSERT.
    // 从状态中获取上次record位置
    HoodieRecordGlobalLocation oldLoc = indexState.value();
    // upsert,upsert_prepped或者delete的时候isChangingRecords为true
    if (isChangingRecords && oldLoc != null) {
        // Set up the instant time as "U" to mark the bucket as an update bucket.
        // 如果partition path发生了变化
        // record的partition字段值发生变化会导致partition path发生变化
        if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
            // 如果开启了全局索引,意思如果是新旧数据的partition path不同,是否更新旧数据的partition path
            if (globalIndex) {
                // if partition path changes, emit a delete record for old partition path,
                // then update the index state using location with new partition path.
                // 生成一个删除类型的数据,指向旧的partition path
                HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
                                                                      payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));

                deleteRecord.unseal();
                // 设置instant time为U
                deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
                deleteRecord.seal();

                out.collect((O) deleteRecord);
            }
            // 获取新数据的location
            location = getNewRecordLocation(partitionPath);
        } else {
            // 如果partition path没有发生变化
            location = oldLoc.toLocal("U");
            // 为update类型record创建或加入bucket
            this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
        }
    } else {
        // 新增数据,创建新的location
        location = getNewRecordLocation(partitionPath);
    }
    // always refresh the index
    if (isChangingRecords) {
        // 如果数据更新,需要紧接着更新index状态变量
        updateIndexState(partitionPath, location);
    }

    // 配置record的location
    record.unseal();
    record.setCurrentLocation(location);
    record.seal();

    out.collect((O) record);
}

getNewRecordLocation为新数据分配一个location。方法内容如下:

private HoodieRecordLocation getNewRecordLocation(String partitionPath) {
    // 为新增的record查找或创建一个bucket
    final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
    final HoodieRecordLocation location;
    switch (bucketInfo.getBucketType()) {
        // 根据bucket类型是insert或者update,构建record的位置信息HoodieRecordLocation
        case INSERT:
            // This is an insert bucket, use HoodieRecordLocation instant time as "I".
            // Downstream operators can then check the instant time to know whether
            // a record belongs to an insert bucket.
            location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
            break;
        case UPDATE:
            location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
            break;
        default:
            throw new AssertionError();
    }
    return location;
}

update和insert操作最终都指向了BucketAssigner。这个类是location分配的核心类。接下来重点分析。

BucketAssigner

BucketAssigner为record分配其所述的bucket,即数据即将被写入的location。Location信息封装在了BucketInfo类中。

BucketInfo有如下成员变量:

  • bucketType:bucket类型,是insert还是update。
  • fileIdPrefix:存储文件的fileId。
  • partitionPath:分区路径。

继续分析为update类型record分配bucket的addUpdate方法。逻辑如下:

public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
    // 构建key,将partition path和fileID通过下划线拼装在一起
    final String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint);
    // bucketInfoMap缓存了key和bucket信息的对应关系
    if (!bucketInfoMap.containsKey(key)) {
        // 如果没有,创建一个update类型的bucketInfo
        BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
        bucketInfoMap.put(key, bucketInfo);
    }
    // else do nothing because the bucket already exists.
    // 如果缓存的有(已分配bucket),获取对应的bucketInfo
    return bucketInfoMap.get(key);
}

为insert类型record分配bucket的addInsert方法,内容如下。因为要涉及到小文件分配等逻辑,相比addUpdate方法要复杂的多。

public BucketInfo addInsert(String partitionPath) {
    // for new inserts, compute buckets depending on how many records we have for each partition
    // 分配小文件
    SmallFileAssign smallFileAssign = getSmallFileAssign(partitionPath);

    // first try packing this into one of the smallFiles
    if (smallFileAssign != null && smallFileAssign.assign()) {
        // 如果分配给小文件成功,使用小文件的file ID构建bucketInfo并返回
        return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath);
    }

    // if we have anything more, create new insert buckets, like normal
    // 如果没有小文件,或者是小文件没有更多空间可以分配
    // 将record分配到新文件
    // newFileAssignStates代表指向新文件的候选bucket
    if (newFileAssignStates.containsKey(partitionPath)) {
        // 如果候选分配状态中有
        // 获取partitionPath对应的分配状态
        NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
        if (newFileAssignState.canAssign()) {
            // 如果仍有空间,能够分配到这个新文件中
            // 执行分配
            newFileAssignState.assign();
            // 生成key
            final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
            // 查找是否有缓存的bucketInfo
            if (bucketInfoMap.containsKey(key)) {
                // the newFileAssignStates is cleaned asynchronously when received the checkpoint success notification,
                // the records processed within the time range:
                // (start checkpoint, checkpoint success(and instant committed))
                // should still be assigned to the small buckets of last checkpoint instead of new one.

                // the bucketInfoMap is cleaned when checkpoint starts.

                // A promotion: when the HoodieRecord can record whether it is an UPDATE or INSERT,
                // we can always return an UPDATE BucketInfo here, and there is no need to record the
                // UPDATE bucket through calling #addUpdate.
                return bucketInfoMap.get(key);
            }
            return new BucketInfo(BucketType.UPDATE, newFileAssignState.fileId, partitionPath);
        }
    }
    // 如果newFileAssignStates中没有
    // 或者是新文件的空间不足以分配
    // 创建新的bucketInfo。createFileIdOfThisTask创建新的file ID
    BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, createFileIdOfThisTask(), partitionPath);
    // 生成key
    final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
    // 加入到bucket缓存中
    bucketInfoMap.put(key, bucketInfo);
    // 创建新文件分配状态
    NewFileAssignState newFileAssignState = new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket());
    // 分配
    newFileAssignState.assign();
    // 加入到新文件分配缓存中
    newFileAssignStates.put(partitionPath, newFileAssignState);
    return bucketInfo;
}

createFileIdOfThisTask方法为insert record对应的bucket创建对应的fileID。

@VisibleForTesting
public String createFileIdOfThisTask() {
    // 使用UUID
    String newFileIdPfx = FSUtils.createNewFileIdPfx();
    // 使用fileIdOfThisTask计算hash,判断生成的newFileIdPfx是否属于当前task
    // 如果不属于,反复生成直到符合条件为止
    while (!fileIdOfThisTask(newFileIdPfx)) {
        newFileIdPfx = FSUtils.createNewFileIdPfx();
    }
    return newFileIdPfx;
}

fileIdOfThisTask类似于record按照fileId, maxParallelism, numTasks做KeyBy操作(Flink的keyBy算子调用的也是KeyGroupRangeAssignment.assignKeyToParallelOperator方法),判断计算出的Hash是否等于taskID。意思也就是说判断这个file是否归这个task来处理。

private boolean fileIdOfThisTask(String fileId) {
    // the file id can shuffle to this task
    return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID;
}

getSmallFileAssign方法查找并分配相同partition path下的小文件。

private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {
    // 如果这个partition path下有可分配的小文件,获取这个小文件
    if (smallFileAssignMap.containsKey(partitionPath)) {
        return smallFileAssignMap.get(partitionPath);
    }
    // 获取分区下所有的small file,然后再其中查找归当前task负责的小文件
    // 这款车只返回当前task负责的小文件,而不是partition path下所有的小文件
    // 获取逻辑后面分析
    List<SmallFile> smallFiles = smallFilesOfThisTask(writeProfile.getSmallFiles(partitionPath));
    if (smallFiles.size() > 0) {
        // 如果存在分配给当前任务的小文件
        LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
        // 将这些小文件分配状态保存为SmallFileAssignState
        SmallFileAssignState[] states = smallFiles.stream()
            .map(smallFile -> new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, writeProfile.getAvgSize()))
            .toArray(SmallFileAssignState[]::new);
        // 保存到smallFileAssignMap中
        SmallFileAssign assign = new SmallFileAssign(states);
        smallFileAssignMap.put(partitionPath, assign);
        return assign;
    }
    // 没有小文件,返回null
    smallFileAssignMap.put(partitionPath, null);
    return null;
}

smallFileAssignMap缓存的是partition path和SmallFileAssign的对应关系。SmallFileAssign保存了某个partition下所有的小文件分配状态(有多少个小文件,每个小文件已分配多少record,还有多少剩余容量等)。

newFileAssignState缓存的是partitionPath_fileID和NewFileAssignState的对应关系。表示某个新文件的分配状态(每个文件已分配多少record,还有多少剩余容量等)。

WriteProfilegetSmallFiles获取partitionPath下的所有小文件。

public synchronized List<SmallFile> getSmallFiles(String partitionPath) {
    // lookup the cache first
    // 如果缓存的有,返回缓存的内容
    if (smallFilesMap.containsKey(partitionPath)) {
        return smallFilesMap.get(partitionPath);
    }

    List<SmallFile> smallFiles = new ArrayList<>();
    // 获取hoodie.parquet.small.file.limit配置,小于这个大小的都会被视为小文件
    if (config.getParquetSmallFileLimit() <= 0) {
        // 如果该配置小于等于0,没有任何文件被视为小文件
        this.smallFilesMap.put(partitionPath, smallFiles);
        return smallFiles;
    }

    // 查询文件系统,获取所有的小文件
    smallFiles = smallFilesProfile(partitionPath);
    // 加入缓存中
    this.smallFilesMap.put(partitionPath, smallFiles);
    return smallFiles;
}

smallFilesProfile查询文件系统获取partition path下的所有小文件。

protected List<SmallFile> smallFilesProfile(String partitionPath) {
    // smallFiles only for partitionPath
    List<SmallFile> smallFileLocations = new ArrayList<>();

    // 获取已完成的timeline
    HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();

    if (!commitTimeline.empty()) { // if we have some commits
        // 如果有commit
        // 获取最近的一次commit
        HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
        // 从文件系统读取最新的,latestCommitTime之前的base file
        List<HoodieBaseFile> allFiles = fsView
            .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

        for (HoodieBaseFile file : allFiles) {
            // filter out the corrupted files.
            // 遍历这些文件,找出文件大小大于0并且小于hoodie.parquet.small.file.limit的文件
            // 这是被视为小文件的条件
            if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {
                String filename = file.getFileName();
                // 创建SmallFile
                SmallFile sf = new SmallFile();
                sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
                sf.sizeBytes = file.getFileSize();
                smallFileLocations.add(sf);
            }
        }
    }

    return smallFileLocations;
}

smallFilesOfThisTask方法从一系列小文件中找到当前task负责的小文件。逻辑如下:

@VisibleForTesting
public List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
    // computes the small files to write inserts for this task.
    // 使用fileIdOfThisTask逐个判断small file的fileID,从而得知small file是否归当前task处理
    // 符合条件的加入集合中
    return smallFiles.stream()
        .filter(smallFile -> fileIdOfThisTask(smallFile.location.getFileId()))
        .collect(Collectors.toList());
}

SmallFileAssign

上面分析完了小文件的分配流程。小文件分配容量计算和状态维护的逻辑位于SmallFileAssign。本章节重点分析它。

SmallFileAssign用于分配小文件供数据写入。是小文件优化的一部分。SmallFileAssign在前面分析的getSmallFileAssign中创建。从创建可以得知一个SmallFileAssign维护的是同一个task负责的,位于同一个partition path下面的所有小文件分配record的状态。

SmallFileAssign有如下成员变量:

// 保存各个小文件的分配状态
final SmallFileAssignState[] states;
// 是states数组的下标。有多个小文件可以分配的时候,逐个分配这些小文件
// 当某个小文件容量已满无法继续分配的时候(成为大文件),assignIdx自增1,开始分配下一个文件
int assignIdx = 0;
// 标记这个SmallFileAssign是否还有小文件可分配
boolean noSpace = false;

assign方法尝试分配一个小文件。如果分配成功返回true。逻辑如下:

public boolean assign() {
    if (noSpace) {
        // 如果没有空间了,返回false
        return false;
    }
    // assignIdx是当前分配到第几个小文件
    SmallFileAssignState state = states[assignIdx];
    while (!state.canAssign()) {
        // 如果这个小文件不能分配
        // 指针向后移动
        assignIdx += 1;
        if (assignIdx >= states.length) {
            // 如果所有小文件都分配完了
            // 标记没有空间
            noSpace = true;
            return false;
        }
        // move to next slot if possible
        // 分配下一个小文件
        state = states[assignIdx];
    }
    // 分配到这个state中
    state.assign();
    return true;
}

成功完成assign过程之后,可以使用getFileId方法,获取分配到的小文件的fileId:

public String getFileId() {
    return states[assignIdx].fileId;
}

SmallFileAssign中每一个小文件的分配状态位于SmallFileAssignState类。它有如下成员变量:

// 已分配多少record
long assigned;
// 每个bucket平均多少个record,也就是最多能分配多少
// 受hoodie.parquet.max.file.size配置项影响
// 最大file大小 / 平均每条record的大小
long totalUnassigned;
// 对应的fileId
final String fileId;

接下来是判断可否分配的canAssign方法和assign方法:

public boolean canAssign() {
    // 已分配的数量不能超过总的未分配数量
    return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
}

/**
 * Remembers to invoke {@link #canAssign()} first.
 */
public void assign() {
    // 已分配数量自增1
    this.assigned++;
}

到这里小文件的分配流程分析完毕。

MOR Upsert

HoodieFlinkMergeOnReadTable

HoodieFlinkMergeOnReadTable代表了Flink管理的MOR类型Hudi表。我们从这里开始分析。

对于MOR类型表,FlinkWriteHandleFactory.getFactory返回的是DeltaCommitWriteHandleFactory。它的create方法内容如下:

@Override
public HoodieWriteHandle<?, ?, ?, ?> create(
    Map<String, Path> bucketToHandles,
    HoodieRecord<T> record,
    HoodieWriteConfig config,
    String instantTime,
    HoodieTable<T, I, K, O> table,
    Iterator<HoodieRecord<T>> recordItr) {
    final String fileID = record.getCurrentLocation().getFileId();
    final String partitionPath = record.getPartitionPath();
    final TaskContextSupplier contextSupplier = table.getTaskContextSupplier();
    return new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, contextSupplier);
}

上面方法创建出的handle为FlinkAppendHandle。它主要负责将数据写入log文件。

HoodieFlinkMergeOnReadTableinsert方法:

@Override
public HoodieWriteMetadata<List<WriteStatus>> insert(
    HoodieEngineContext context,
    HoodieWriteHandle<?, ?, ?, ?> writeHandle,
    String instantTime,
    List<HoodieRecord<T>> hoodieRecords) {
    if (writeHandle instanceof FlinkAppendHandle) {
        // 如果是FlinkAppendHandle类型handle
        FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
        return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
    } else {
        // 如果不是FlinkAppendHandle,调用父类HoodieFlinkCopyOnWriteTable的insert方法
        return super.insert(context, writeHandle, instantTime, hoodieRecords);
    }
}

和COW的BaseFlinkCommitActionExecutor不同的是。MOR的insert逻辑由FlinkUpsertDeltaCommitActionExecutor执行。MOR表写入log文件称之为delta commit,和COW表是不同的。

FlinkUpsertDeltaCommitActionExecutor

我们从execute方法开始。

@Override
public HoodieWriteMetadata execute() {
    return FlinkWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
                                                config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType);
}

后面的逻辑和BaseFlinkCommitActionExecutor基本相同。但是BaseFlinkDeltaCommitActionExecutorBaseFlinkCommitActionExecutor的子类,覆盖了父类的handleUpdatehandleInsert方法。

我们重点分析父类BaseFlinkDeltaCommitActionExecutorhandleUpdatehandleInsert方法。

@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
    FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
    // 对于更新的数据,调用doAppend方法
    appendHandle.doAppend();
    // 关闭appendHandle并获取写入状态
    List<WriteStatus> writeStatuses = appendHandle.close();
    return Collections.singletonList(writeStatuses).iterator();
}

@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
    // 创建FlinkLazyInsertIterable
    // 和BaseFlinkCommitActionExecutor的handleInsert方法基本相同
    return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table,
                                         idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle));
}

接下来两节分别对更新数据和插入数据分别分析。

更新数据逻辑

FlinkAppendHandledoAppend方法位于父类HoodieAppendHandle中。

public void doAppend() {
    while (recordItr.hasNext()) {
        // 逐个遍历record
        HoodieRecord record = recordItr.next();
        init(record);
        flushToDiskIfRequired(record, false);
        writeToBuffer(record);
    }
    appendDataAndDeleteBlocks(header, true);
    estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
}

init方法查找record所属的file slice,获取log文件对应的writer,准备在下一步写入此文件。逻辑如下:

private void init(HoodieRecord record) {
    // init只执行一次
    if (doInit) {
        // extract some information from the first record
        // 获取这个hudi表的文件系统结构
        SliceView rtView = hoodieTable.getSliceView();
        // 获取最新的file slice(base file + log files)
        Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
        // Set the base commit time as the current instantTime for new inserts into log files
        String baseInstantTime;
        String baseFile = "";
        List<String> logFiles = new ArrayList<>();
        // 如果找到了file slice
        if (fileSlice.isPresent()) {
            // 获取file slice的instant time(slice什么时候创建的)
            baseInstantTime = fileSlice.get().getBaseInstantTime();
            // 获取baseFile名称
            baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
            // 获取所有的log文件
            logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
        } else {
            // 如果没有找到file slice
            baseInstantTime = instantTime;
            // Handle log file only case. This is necessary for the concurrent clustering and writer case (e.g., consistent hashing bucket index).
            // NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
            // 如果record中存了instant time,且instant time是long类型的timestamp,使用record的instant time
            // instant time在更新和插入数据的时候是I或者U,不是long类型的timestamp,这种情况不会进入if分支
            if (record.getCurrentLocation() != null && HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime())) {
                baseInstantTime = record.getCurrentLocation().getInstantTime();
            }
            // This means there is no base data file, start appending to a new log file
            // 创建新的file slice
            fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId));
            LOG.info("New AppendHandle for partition :" + partitionPath);
        }

        // Prepare the first write status
        // 创建新的log写入状态
        writeStatus.setStat(new HoodieDeltaWriteStat());
        // 设置file id
        writeStatus.setFileId(fileId);
        // 设置partition path
        writeStatus.setPartitionPath(partitionPath);
        // 估算出每条数据的平均大小
        averageRecordSize = sizeEstimator.sizeEstimate(record);

        // 组装deltaWriteStat,对应的是log写入状态
        HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) writeStatus.getStat();
        deltaWriteStat.setPrevCommit(baseInstantTime);
        deltaWriteStat.setPartitionPath(partitionPath);
        deltaWriteStat.setFileId(fileId);
        deltaWriteStat.setBaseFile(baseFile);
        deltaWriteStat.setLogFiles(logFiles);

        try {
            // Save hoodie partition meta in the partition path
            // 组装partition元数据
            HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, baseInstantTime,
                                                                                    new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
                                                                                    hoodieTable.getPartitionMetafileFormat());
            // 写入partition元数据
            partitionMetadata.trySave(getPartitionId());

            // Since the actual log file written to can be different based on when rollover happens, we use the
            // base file to denote some log appends happened on a slice. writeToken will still fence concurrent
            // writers.
            // https://issues.apache.org/jira/browse/HUDI-1517
            // 创建一个标记文件
            // 标记要写入log file
            createMarkerFile(partitionPath, FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()));

            // 创建log writer,用于将数据写入log文件
            this.writer = createLogWriter(fileSlice, baseInstantTime);
        } catch (Exception e) {
            LOG.error("Error in update task at commit " + instantTime, e);
            writeStatus.setGlobalError(e);
            throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
                                            + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
        }
        // init执行后设置为false,不会反复执行
        doInit = false;
    }
}

flushToDiskIfRequired在预估record缓存了一定量的时候,flush这些records到log文件。方法逻辑如下:

/**
 * Checks if the number of records have reached the set threshold and then flushes the records to disk.
 */
private void flushToDiskIfRequired(HoodieRecord record, boolean appendDeleteBlocks) {
    // 估算已存入的record数量是否超过了预估record数量
    // 预估record数量为最大block size / 平均record size
    // 最大block由hoodie.logfile.data.block.max.size配置项决定
    // 或者记录的数据条数是100的倍数
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
        || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
        // 修正估算的平均record大小,原来的大小权重占80%,新record估算出的大小占20%
        averageRecordSize = (long) (averageRecordSize * 0.8 + sizeEstimator.sizeEstimate(record) * 0.2);
    }

    // Append if max number of records reached to achieve block size
    // 重新估算完如果还是超过预估record数量
    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
        // Recompute averageRecordSize before writing a new block and update existing value with
        // avg of new and old
        LOG.info("Flush log block to disk, the current avgRecordSize => " + averageRecordSize);
        // Delete blocks will be appended after appending all the data blocks.
        // flush数据block到磁盘,delete block被忽略(appendDeleteBlocks为false)
        // 这个方法后面分析
        appendDataAndDeleteBlocks(header, appendDeleteBlocks);
        // 更新已写入数据预估大小计数器
        estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
        // 已写入数据置0
        numberOfRecords = 0;
    }
}

writeToBuffer方法将record写入到recordList缓存中,要删除的数据位于recordsToDelete缓存。逻辑分析如下:

private void writeToBuffer(HoodieRecord<T> record) {
    // 检查appendHandle中的partition path和record的partition path必须相同
    if (!partitionPath.equals(record.getPartitionPath())) {
        HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
                                                                    + record.getPartitionPath() + " but trying to insert into partition: " + partitionPath);
        writeStatus.markFailure(record, failureEx, record.getMetadata());
        return;
    }

    // update the new location of the record, so we know where to find it next
    if (needsUpdateLocation()) {
        // 恒为true
        record.unseal();
        // 设置新的record location
        record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
        record.seal();
    }
    // fetch the ordering val first in case the record was deflated.
    // 获取顺序值
    final Comparable<?> orderingVal = record.getOrderingValue(writeSchema, recordProperties);
    // 做些准备工作,后面分析
    Option<HoodieRecord> indexedRecord = prepareRecord(record);
    if (indexedRecord.isPresent()) {
        // Skip the ignored record.
        try {
            if (!indexedRecord.get().shouldIgnore(writeSchema, recordProperties)) {
                // 如果payload不为空,不用忽略
                // 加入buffer中,等待flush到磁盘
                recordList.add(indexedRecord.get());
            }
        } catch (IOException e) {
            writeStatus.markFailure(record, e, record.getMetadata());
            LOG.error("Error writing record  " + indexedRecord.get(), e);
        }
    } else {
        // 否则,添加到需要删除的元素缓存中
        recordsToDelete.add(DeleteRecord.create(record.getKey(), orderingVal));
    }
    // record计数器自增1
    numberOfRecords++;
}

prepareRecord方法判断数据是要新增还是要删除的。

private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
    // 获取record的metadata
    Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
    // 非compaction操作useWriterSchema为false,采用writeSchema
    Schema schema = useWriterSchema ? writeSchemaWithMetaFields : writeSchema;
    try {
        // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
        // Whether it is an update or insert record.
        // 如果record有currentLocation,说明数据已存在,为update类型
        boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
        // If the format can not record the operation field, nullify the DELETE payload manually.
        // 如果不允许记录operation元数据字段,并且数据类型为删除
        // 需要清空数据的payload来表示这条数据是删除的
        boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
        recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));

        Option<HoodieRecord> finalRecordOpt = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
        // Check for delete
        if (finalRecordOpt.isPresent() && !finalRecordOpt.get().isDelete(schema, recordProperties)) {
            // 如果不是需要删除的数据
            HoodieRecord finalRecord = finalRecordOpt.get();
            // Check if the record should be ignored (special case for [[ExpressionPayload]])
            if (finalRecord.shouldIgnore(schema, recordProperties)) {
                return finalRecordOpt;
            }

            // Prepend meta-fields into the record
            // 生成metadata字段,将其加入到record中
            MetadataValues metadataValues = populateMetadataFields(finalRecord);
            HoodieRecord populatedRecord =
                finalRecord.prependMetaFields(schema, writeSchemaWithMetaFields, metadataValues, recordProperties);

            // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
            //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
            //       it since these records will be put into the recordList(List).
            // 准备返回数据
            finalRecordOpt = Option.of(populatedRecord.copy());
            if (isUpdateRecord || isLogCompaction) {
                updatedRecordsWritten++;
            } else {
                insertRecordsWritten++;
            }
            recordsWritten++;
        } else {
            // 如果是删除数据
            finalRecordOpt = Option.empty();
            recordsDeleted++;
        }

        writeStatus.markSuccess(hoodieRecord, recordMetadata);
        // deflate record payload after recording success. This will help users access payload as a
        // part of marking
        // record successful.
        hoodieRecord.deflate();
        return finalRecordOpt;
    } catch (Exception e) {
        LOG.error("Error writing record  " + hoodieRecord, e);
        writeStatus.markFailure(hoodieRecord, e, recordMetadata);
    }
    return Option.empty();
}

appendDataAndDeleteBlocks方法将新数据缓存recordList和delete缓存recordsToDelete中的数据写入到log block中。

/**
 * Appends data and delete blocks. When appendDeleteBlocks value is false, only data blocks are appended.
 * This is done so that all the data blocks are created first and then a single delete block is added.
 * Otherwise what can end up happening is creation of multiple small delete blocks get added after each data block.
 */
protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header, boolean appendDeleteBlocks) {
    try {
        // log block header信息添加instant time和schema
        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
        List<HoodieLogBlock> blocks = new ArrayList<>(2);
        if (recordList.size() > 0) {
            // 默认需要添加metadata,否则无法增量查询
            String keyField = config.populateMetaFields()
                // 采用_hoodie_record_key作为key字段
                ? HoodieRecord.RECORD_KEY_METADATA_FIELD
                // 否则使用hoodie.table.recordkey.fields作为key字段
                : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

            // pickLogDataBlockFormat根据base file格式决定log写入格式。对于parquet采用avro格式
            // 根据base file格式,将数据封装为对应格式的log block
            blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, header, keyField));
        }

        if (appendDeleteBlocks && recordsToDelete.size() > 0) {
            // 增加删除的数据block
            blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), header));
        }

        if (blocks.size() > 0) {
            // 写入数据block
            AppendResult appendResult = writer.appendBlocks(blocks);
            // 处理写入结果
            processAppendResult(appendResult, recordList);
            // 清空record缓存
            recordList.clear();
            if (appendDeleteBlocks) {
                // 清空delete record缓存
                recordsToDelete.clear();
            }
        }
    } catch (Exception e) {
        throw new HoodieAppendException("Failed while appending records to " + writer.getLogFile().getPath(), e);
    }
}

具体写入log block的过程可参考HoodieLogFormatWritterappendBlocks方法。

@Override
public AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException, InterruptedException {
    // Find current version
    // 获取log格式的版本,为1
    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
        new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);

    // 获取log文件的outputStream
    FSDataOutputStream originalOutputStream = getOutputStream();
    // 获取写入起始位置
    long startPos = originalOutputStream.getPos();
    // 已写入内容为0
    long sizeWritten = 0;
    // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can be correctly written
    // 确保巨大的block能够正确的写入
    FSDataOutputStream outputStream = new FSDataOutputStream(originalOutputStream, new FileSystem.Statistics(fs.getScheme()), startPos);
    // 遍历需要写入的block
    for (HoodieLogBlock block: blocks) {
        // 已写入数据大小
        long startSize = outputStream.size();

        // 1. Write the magic header for the start of the block
        // 写入magic数据,作为block开头
        // 数据为#HUDI#
        outputStream.write(HoodieLogFormat.MAGIC);

        // bytes for header
        // 获取log block的header数据
        byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
        // content bytes
        // 获取block内容
        byte[] content = block.getContentBytes();
        // bytes for footer
        // 获取block footer
        byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());

        // 2. Write the total size of the block (excluding Magic)
        // 写入block长度
        outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));

        // 3. Write the version of this log block
        // 写入log format的版本
        outputStream.writeInt(currentLogFormatVersion.getVersion());
        // 4. Write the block type
        // 写入block类型
        outputStream.writeInt(block.getBlockType().ordinal());

        // 5. Write the headers for the log block
        // 写入header
        outputStream.write(headerBytes);
        // 6. Write the size of the content block
        // 写入content的长度
        outputStream.writeLong(content.length);
        // 7. Write the contents of the data block
        // 写入content
        outputStream.write(content);
        // 8. Write the footers for the log block
        // 写入footer
        outputStream.write(footerBytes);
        // 9. Write the total size of the log block (including magic) which is everything written
        // until now (for reverse pointer)
        // Update: this information is now used in determining if a block is corrupt by comparing to the
        //   block size in header. This change assumes that the block size will be the last data written
        //   to a block. Read will break if any data is written past this point for a block.
        // 记录log block总大小
        outputStream.writeLong(outputStream.size() - startSize);

        // Fetch the size again, so it accounts also (9).

        // HUDI-2655. Check the size written to avoid log blocks whose size overflow.
        // 检查避免overflow
        if (outputStream.size() == Integer.MAX_VALUE) {
            throw new HoodieIOException("Blocks appended may overflow. Please decrease log block size or log block amount");
        }
        sizeWritten +=  outputStream.size() - startSize;
    }
    // Flush all blocks to disk
    // flush到磁盘
    flush();

    // 构建写入结果
    AppendResult result = new AppendResult(logFile, startPos, sizeWritten);
    // roll over if size is past the threshold
    // 检查日志文件是否超出大小
    // 由hoodie.logfile.max.size配置项控制
    // 超出大小需要滚动到新的日志文件
    rolloverIfNeeded();
    return result;
}

到此为止MOR更新数据的逻辑分析完毕。

插入数据逻辑

和前面COW表的插入数据逻辑FlinkLazyInsertIterable类似。不同的是到执行doWrite方法这一步。

FlinkAppendHandledoWrite方法位于父类HoodieAppendHandle中。doWrite方法内容为:

@Override
protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props) {
    Option<Map<String, String>> recordMetadata = record.getMetadata();
    try {
        init(record);
        flushToDiskIfRequired(record, false);
        writeToBuffer(record);
    } catch (Throwable t) {
        // Not throwing exception from here, since we don't want to fail the entire job
        // for a single record
        writeStatus.markFailure(record, t, recordMetadata);
        LOG.error("Error writing record " + record, t);
    }
}

这三个方法和前面更新数据的逻辑相同,不再赘述。

MOR Insert

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
    HoodieEngineContext context,
    HoodieWriteHandle<?, ?, ?, ?> writeHandle,
    String instantTime,
    List<HoodieRecord<T>> hoodieRecords) {
    // 检查必须是FlinkAppendHandle类型
    ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
                                  "MOR write handle should always be a FlinkAppendHandle");
    FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
    // 调用FlinkUpsertDeltaCommitActionExecutor,和upsert相同
    return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
}

这里逻辑和upsert相同,不再赘述。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容