前言
Hudi Flink支持配置table service的异步执行。Schedule的时机为checkpoint完成的时候。执行过程在线程池中完成。Flink Hudi 常用的table service有compaction,clustering和clean三种。它们对应的配置项为:
- clustering.async.enabled:是否开启异步的clustering。默认不开启。
- compaction.async.enabled:是否开启异步compaction。默认开启。
- clean.async.enabled:是否开启异步clean。默认开启。
本篇主要分析Flink中Hudi table service的排期和执行时机。至于compaction clustering和clean表服务具体的执行逻辑,参见:
- Hudi Compaction使用和源码分析 - 简书 (jianshu.com)
- Hudi 源码之 Clustering - 简书 (jianshu.com)
- Hudi 源码之 Cleaning service - 简书 (jianshu.com)
Scheduling排期
Hudi Flink table service的排期主要位于如下两个方法中:
- StreamWriteOperatorCoordinator::handleInputEvent: batch模式需要schedule的table service。
- StreamWriteOperatorCoordinator::notifyCheckpointComplete: streaming模式需要schedule的table service
接下来我们分别分析这两个方法。
handleEndInputWvent
方法:
private void handleEndInputEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
// 如果已经接收到所有数据
if (allEventsReceived()) {
// start to commit the instant.
// 提交
// 如果数据成功写入,返回true,表示提交成功
boolean committed = commitInstant(this.instant);
if (committed) {
// The executor thread inherits the classloader of the #handleEventFromOperator
// caller, which is a AppClassLoader. Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
// 如果开启了hive sync,执行
syncHive();
// schedules the compaction or clustering if it is enabled in batch execution mode
// 表服务排期
scheduleTableServices(true);
}
}
}
notifyCheckpointComplete
方法。在checkpoint执行成功的时候执行回调。
@Override
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
// The executor thread inherits the classloader of the #notifyCheckpointComplete
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
// 如果数据成功写入,返回true,表示提交成功
final boolean committed = commitInstant(this.instant, checkpointId);
// schedules the compaction or clustering if it is enabled in stream execution mode
// 排期表服务
scheduleTableServices(committed);
if (committed) {
// start new instant.
// 写入instant
startInstant();
// sync Hive if is enabled
// 如果开启了hive sync,执行
syncHiveAsync();
}
}, "commits the instant %s", this.instant
);
}
scheduleTableServices
方法:
private void scheduleTableServices(Boolean committed) {
// if compaction is on, schedule the compaction
// 如果是MOR表,并且开启了compaction.schedule.enabled配置(默认开启)
if (tableState.scheduleCompaction) {
CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed);
}
// if clustering is on, schedule the clustering
// clustering.schedule.enabled如果开启(默认不开启)
// 对于bucket index表,如果配置的是consistent hash(一致性hash),要求写入类型必须是upsert
// 否则(SIMPLE类型)要求写入类型必须是insert
if (tableState.scheduleClustering) {
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
}
}
Executing执行
HoodieTableSink
Hudi Flink创建table service异步任务流位于HoodieTableSink::getSinkRuntimeProvider
。
// ...
// Append mode
// 如果是增量写入模式
if (OptionsResolver.isAppendMode(conf)) {
// close compaction for append mode
// 关闭compaction schedule模式
conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
// append 模式写入数据
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);
// 如果需要异步clustering
// write.operation为insert并且启用了异步clustering(clustering.async.enabled为true)
if (OptionsResolver.needsAsyncClustering(conf)) {
// 执行clustering
return Pipelines.cluster(conf, rowType, pipeline);
// 如果hoodie.cleaner.policy.failed.writes配置为lazy
} else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
// add clean function to rollback failed writes for lazy failed writes cleaning policy
// 执行清理
return Pipelines.clean(conf, pipeline);
} else {
// 否则什么也不做
return Pipelines.dummySink(pipeline);
}
}
DataStream<Object> pipeline;
// bootstrap加载索引
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// write pipeline
// 流式写入
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
// 是否需要异步压缩
// compaction.async.enabled是否为true。默认为true
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
// 如果是bounded数据源(有头有尾),使用同步压缩
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
// 执行压缩
return Pipelines.compact(conf, pipeline);
} else {
// 如果没有配置压缩,执行清理
return Pipelines.clean(conf, pipeline);
}
// ...
通过上面的分析不难得知,Flink中compact,clean和clustering表服务都在Pipeline
中创建。接下来我们分析Pipeline
的源代码。
Pipelines
Pipeline
创建了一条专用的数据流,这些数据流分别用来周期性创建compaction和clustering的执行计划,以及执行compact,clean和clustering。它们独立于系统的业务数据流。
Pipelines::compact
该方法用来启动周期压缩任务流。
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
// 使用CompactionPlanOperator下发compaction执行计划
DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
// plan生成过程必须是单并行度
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1)
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
// 使用CompactorOperator执行压缩计划
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
// 并行度配置为compaction.tasks
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
// CompactionSommitSink检查并提交compaction instant
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
// 执行commit的并行度必须是1
.setParallelism(1); // compaction commit should be singleton
compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
return compactionCommitEventDataStream;
}
CompactionPlanOperator::notifyCheckpointComplete。在checkpoint的时候检查是否生成的有requested状态的compaction instant。如果有,生成CompactionPlanEvent
发往下游。
SteamWriteOperatorCoordinator
用来生成requested状态的compaction instant,CompactionPlanOperator
用来获取到这些compaction instant,读取保存的执行计划然后发往下游。
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
table.getMetaClient().reloadActiveTimeline();
// There is no good way to infer when the compaction task for an instant crushed
// or is still undergoing. So we use a configured timeout threshold to control the rollback: // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS}, // when the earliest inflight instant has timed out, assumes it has failed // already and just rolls it back.
// comment out: do we really need the timeout rollback ? // CompactionUtil.rollbackEarliestCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe
LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);
}
}
scheduleCompaction
方法读取第一个状态为reqested状态的compaction instant,获取到它的compaction plan,将该compaction涉及到的compaction plan中的compactionOperation
(即Compaction操作涉及到的file group信息)包装为CompactionPlanEvent
发往下游。
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// 获取包含所有pending compaction的timeline
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
// the first instant takes the highest priority.
// 找到时间最早的requested compaction instant
Option<HoodieInstant> firstRequested = pendingCompactionTimeline
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
// record metrics
compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
// 如果没有requested状态的compaction instant,说明没有必要schedule
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}
// 获取这个requested compaction instant对应的时间
String compactionInstantTime = firstRequested.get().getTimestamp();
// generate compaction plan
// should support configurable commit metadata
获取compaction plan
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
// 如果没有获取到有效的compactio plan
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("Empty compaction plan for instant " + compactionInstantTime);
} else {
// 获取这个requested状态的 compaction instant
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
// 将它的状态修改为inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
// 获取plan中所有的compaction operation,封装为compactionPlanEvent发往下游
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
// 删除标记文件
WriteMarkersFactory
.get(table.getConfig().getMarkersType(), table, compactionInstantTime)
.deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
// 每一个operation封装为一个CompactionPlanEvent
// 这样到下游的时候可以将这些压缩任务均分
// 每个并行度处理一部分file group的压缩
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}
}
}
CompactOperator
的processElement
方法接收上面生成的CompactionPlanEvent
,执行压缩任务。
@Override
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
final CompactionPlanEvent event = record.getValue();
// 获取inflight compaction instant time
final String instantTime = event.getCompactionInstantTime();
// 获取compaction operation
final CompactionOperation compactionOperation = event.getOperation();
// 如果是异步压缩
// 在线程池中执行压缩,不会影响checkpoint过程
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
// 否则同步执行compaction
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
}
}
接下来的doCompaction
方法间接调用了HoodieFlinkMergeOnReadTableCompactor
的compact
方法,前面的文章已有分析,这里不再赘述。
Pipelines::cluster
该方法启动周期clustering任务流。
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
// 使用ClusteringPlanOperator下发clustering执行计划
DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
// 下发执行计划的并行度必须是1
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1) // plan generate must be singleton
.keyBy(plan ->
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
// 按照ClusteringPlanEvent的fileId分组
// 针对同一个file slice的clustering操作会分配给相同的线程执行,防止并发修改
plan.getClusteringGroupInfo().getOperations() .stream().map(ClusteringOperation::getFileId.collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
// 通过ClusteringOperator执行clustering
new ClusteringOperator(conf, rowType))
// clustering任务的并行度为clustering.tasks
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
// 如果启用了排序,即clustering.plan.strategy.sort.columns配置项不为空
// 配置该步骤的执行内存,对应配置项为write.sort.memory
if (OptionsResolver.sortClusteringEnabled(conf)) {
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
// 检查并提交clustering instant
DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // clustering commit should be singleton
clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
return clusteringCommitEventDataStream;
}
ClusteringPlanOperator
schedule clustering的过程和前面schedule compaction的非常相似。
private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
// 获取request状态的clustering instant
List<HoodieInstant> pendingClusteringInstantTimes =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
// the first instant takes the highest priority.
// 获取时间最早的一个
Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
pendingClusteringInstantTimes.stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
// record metrics
clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);
clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No clustering plan for checkpoint " + checkpointId);
return;
}
String clusteringInstantTime = firstRequested.get().getTimestamp();
// generate clustering plan
// should support configurable commit metadata HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
// 拿到之前生成的clustering plan
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
// 如果没有获取到有效的clustering plan,直接返回
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// do nothing.
LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
} else {
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
// 遍历所有的inputGroup,封装为ClusteringPlanEvent
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
output.collect(new StreamRecord<>(
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())
));
}
}
}
Clustering对应一个file slice的操作封装为了HoodieClusteringGroup
(和compatcion的CompactionOperation
对应)。这里将同一个plan中所有的HoodieClusteringGroup
,每一个封装为ClusteringPlanEvent
,目的是为了下游可以并行执行clustering。
ClusteringOperator
的processElement
方法执行clustering计划。和compaction相同,分为同步执行和异步执行两种方式。
@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
final ClusteringPlanEvent event = element.getValue();
final String instantTime = event.getClusteringInstantTime();
final List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();
if (this.asyncClustering) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doClustering(instantTime, clusteringOperations),
(errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), taskID)),
"Execute clustering for instant %s from task %d", instantTime, taskID);
} else {
// executes the clustering task synchronously for batch mode.
LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);
doClustering(instantTime, clusteringOperations);
}
}
doClustering
方法为clustering过程的纯Flink实现。
private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {
clusteringMetrics.startClustering();
// 采用bulk insert的方式写入clustering之后的数据
BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType, true);
Iterator<RowData> iterator;
// 如果clustering操作涉及到log文件,使用readRecordsForGroupWithLogs
// 否则仅使用readRecordsForGroupBaseFiles
// 这两个方法读取file group中的数据,以iterator的形式返回
if (clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths()))) {
// if there are log files, we read all records into memory for a file group and apply updates.
iterator = readRecordsForGroupWithLogs(clusteringOperations, instantTime);
} else {
// We want to optimize reading records for case there are no log files.
iterator = readRecordsForGroupBaseFiles(clusteringOperations);
}
// 如果配置了clustering.plan.strategy.sort.columns
// 说明需要排序
if (this.sortClusteringEnabled) {
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
// 使用BinaryexternalSorter来排序
// BinaryExternalSorter根据clustering.plan.strategy.sort.columns
// 生成排序代码
BinaryExternalSorter sorter = initSorter();
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
sorter.write(binaryRowData);
}
// 使用bulk insert写入排序之后的数据
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
writerHelper.write(row);
}
sorter.close();
} else {
while (iterator.hasNext()) {
writerHelper.write(iterator.next());
}
}
List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
clusteringMetrics.endClustering();
collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID));
writerHelper.close();
}
Pipelines::clean
clean
方法的实现较为简单。Flink为数据流增加了一个CleanFunction
类型的sink,并行度为1。代码如下所示。
public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
cleanCommitDataStream.getTransformation().setMaxParallelism(1);
return cleanCommitDataStream;
我们继续分析CleanFunction
。如果启用了异步clean,CleanFunction
在启动的时候(open
)异步执行一次。创建checkpoint的时候snapshotState
启动异步clean服务。在checkpoint完成的时候notifyCheckpointComplete
等待clean错操作执行完毕。相关代码如下所示。
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
String instantTime = writeClient.createNewInstantTime();
LOG.info(String.format("exec clean with instant time %s...", instantTime));
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
executor.execute(() -> {
this.isCleaning = true;
try {
this.writeClient.clean(instantTime);
} finally {
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
executor.execute(() -> {
try {
this.writeClient.waitForCleaningFinish();
} finally {
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
try {
this.writeClient.startAsyncCleaning();
this.isCleaning = true;
} catch (Throwable throwable) {
// catch the exception to not affect the normal checkpointing
LOG.warn("Error while start async cleaning", throwable);
}
}
}
除了Pipeline::clean
方法直接使用CleanFunction
之外,我们还注意到ClusteringCommitSink
和CompactionCommitSink
都继承了CleanFunction
,都没有重写snapshotState
和notifyCheckPointComplete
方法。因此这两个sink的行为和CleanFunction
一致,在checkpoint的时候会触发clean操作。
继续观察这两个sink的doCommit
方法,我们发现最后一段都有如下代码:
// Whether to clean up the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
this.writeClient.clean();
}
该段代码表示,如果没有启用异步clean,且当前时刻没有clean没有正在执行,执行同步clean操作。
接下来我们分析this.writeClient.startAsyncCleaning()
调用,一路跟踪下去。跟踪过程中的非关键代码这里不再展示。
- CleanFunction的this.writeClient.startAsyncCleaning()
- HoodieFlinkWriteClient的tableServiceClient.startAsyncCleanerService(this);
- BaseHoodieTableServiceClient的this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
分析到这里,我们确定了clean表服务是在AsyncCleanerService
中封装的。继续分析startAsyncCleaningIfEnabled
方法,该方法首先判断是否开启了异步clean配置,如果开启了,创建一个异步clean服务,代码如下所示:
public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {
HoodieWriteConfig config = writeClient.getConfig();
// hoodie.clean.automatic和hoodie.clean.async这两个任意一个配置为false,不运行该服务
if (!config.isAutoClean() || !config.isAsyncClean()) {
LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");
return null;
}
// 创建并启动AsyncCleanerService
AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
return asyncCleanerService;
}
我们继续分析asyncCleanerService.start
,它位于HoodieAsyncService
的start
方法中。
其中的startService方法将异步服务逻辑本身和运行异步服务的executor封装为Pair返回。
public void start(Function<Boolean, Boolean> onShutdownCallback) {
if (started) {
LOG.warn("The async service already started.");
return;
}
Pair<CompletableFuture, ExecutorService> res = startService();
future = res.getKey();
executor = res.getValue();
started = true;
shutdownCallback(onShutdownCallback);
}
startService
方法在实现类AsyncCleanerFunction
中,如下所示。
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = writeClient.createNewInstantTime();
LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));
//在线程池中启动异步clean操作
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(instantTime);
return true;
}, executor), executor);
}
调用完该方法之后,异步clean的逻辑封装赋值到future
变量中。
按照上面的分析notifyCheckpointComplete
的时候执行waitForCompletion
方法。如果clean操作还没有结束,这里阻塞等待其执行完毕。
public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) {
LOG.info("Waiting for async clean service to finish");
try {
asyncCleanerService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException("Error waiting for async clean service to finish", e);
}
}
}
分析HoodieAsnycService
的waitForshutdown
方法,同步等待clean执行完毕,内容如下:
public void waitForShutdown() throws ExecutionException, InterruptedException {
if (future == null) {
return;
}
try {
future.get();
} catch (ExecutionException ex) {
LOG.error("Service shutdown with error", ex);
throw ex;
}
}
参考文献
Compaction | Apache Hudi
Cleaning | Apache Hudi
Clustering | Apache Hudi