Flink JobManager 详解

JobManager 详解

JobMaster 在实现中,也依赖了很多的服务,其中最重要的是 SchedulerNGSlotPool,JobMaster 对外提供的接口实现中大都是使用前面这两个服务的方法。

// JobMaster.java
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
    // LegacyScheduler: 用于调度作业的 ExecutionGraph
    private SchedulerNG schedulerNG;
    // SlotPoolImpl: 从名字也能看出它主要处理 slot 相关的内容,在 JM 这边的一个抽象
    private final SlotPool slotPool;
    // HA 服务,这里主要用于监控 RM leader,如果 RM Leader 有变化,这里会与新的 leader 建立连接
    private final HighAvailabilityServices highAvailabilityServices;

    /**
    * 下面这些都是创建上面 SchedulerNG(即 LegacyScheduler)需要使用到的服务
    */
    // 用于将数据上传到 BlobServer,这里上传的主要是 JobInformation 和 TaskInformation
    private final BlobWriter blobWriter;
    // 作业的 JobGraph 信息
    private final JobGraph jobGraph;
    // SchedulerImpl: 它也是一个调度器,将 slot 分配给对应的 task,它会调用 SlotPool 的相关接口(它里面有一个 slotSelectionStrategy 对象,用来决定一个 slot 分配的最佳算法)
    private final Scheduler scheduler;
    // 用于注册 Intermediate result partition,在作业调度的时候会用到
    private final ShuffleMaster<?> shuffleMaster;
    // 用于追踪 Intermediate result partition 的服务
    private final PartitionTracker partitionTracker;
    // --------- BackPressure --------
    private final BackPressureStatsTracker backPressureStatsTracker;
}

JobMaster 中涉及到重要组件如下图所示:

JobMaster 中的组件组成

JobMaster 主要有两个服务:

  1. LegacyScheduler: ExecutionGraph 相关的调度都是在这里实现的,它类似更深层的抽象,封装了 ExecutionGraph 和 BackPressureStatsTracker,JobMaster 不直接去调用 ExecutionGraph 和 BackPressureStatsTracker 的相关方法,都是通过 LegacyScheduler 间接去调用;
  2. SlotPool: 它是 JobMaster 管理其 slot 的服务,它负责向 RM 申请/释放 slot 资源,并维护其相应的 slot 信息。

从前面的图中可以看出,如果 LegacyScheduler 想调用 CheckpointCoordinator 的方法,比如 LegacySchedulertriggerSavepoint() 方法,它是需要先通过 executionGraphgetCheckpointCoordinator() 方法拿到 CheckpointCoordinator,然后再调用 CheckpointCoordinatortriggerSavepoint() 方法来触发这个作业的 savepoint。

JobMaster 的 API 概述

目前 JobMaster 对外提供的 API 列表如下(主要还是 JobMasterGateway 接口对应的实现):

  1. cancel(): 取消当前正在执行的作业,如果作业还在调度,会执行停止,如果作业正在运行的话,它会向对应的 TM 发送取消 task 的请求(cancelTask() 请求);
  2. updateTaskExecutionState(): 更新某个 task 的状态信息,这个是 TM 主动向 JM 发送的更新请求;
  3. requestNextInputSplit(): Source ExecutionJobVertex 请求 next InputSlipt,这个一般是针对批处理读取而言,有兴趣的可以看下 FLIP-27: Refactor Source Interface,这里是社区计划对 Source 做的改进,未来会将批和流统一到一起;
  4. requestPartitionState(): 获取指定 Result Partition 对应生产者 JobVertex 的执行状态;
  5. scheduleOrUpdateConsumers(): TM 通知 JM 对应的 Result Partition 的数据已经可用,每个 ExecutionVertex 的每个 ResultPartition 都会调用一次这个方法(可能是在第一次生产数据时调用或者所有数据已经就绪时调用);
  6. disconnectTaskManager(): TM 心跳超时或者作业取消时,会调用这个方法,JM 会释放这个 TM 上的所有 slot 资源;
  7. acknowledgeCheckpoint(): 当一个 Task 做完 snapshot 后,通过这个接口通知 JM,JM 再做相应的处理,如果这个 checkpoint 所有的 task 都已经 ack 了,那就意味着这个 checkpoint 完成了;
  8. declineCheckpoint(): TM 向 JM 发送这个消息,告诉 JM 的 Checkpoint Coordinator 这个 checkpoint request 没有响应,比如:TM 触发 checkpoint 失败,然后 Checkpoint Coordinator 就会知道这个 checkpoint 处理失败了,再做相应的处理;
  9. requestKvStateLocation(): 请求某个注册过 registrationName 对应的 KvState 的位置信息;
  10. notifyKvStateRegistered(): 当注册一个 KvState 的时候,会调用这个方法,一些 operator 在初始化的时候会调用这个方法注册一个 KvState;
  11. notifyKvStateUnregistered(): 取消一个 KVState 的注册,这里是在 operator 关闭 state backend 时调用的(比如:operator 的生命周期结束了,就会调用这个方法);
  12. offerSlots(): TM 通知 JM 其上分配到的 slot 列表;
  13. failSlot(): 如果 TM 分配 slot 失败(情况可能很多,比如:slot 分配时状态转移失败等),将会通过这个接口告知 JM;
  14. registerTaskManager(): 向这个 JM 注册 TM,JM 会将 TM 注册到 SlotPool 中(只有注册过的 TM 的 Slot 才被认为是有效的,才可以做相应的分配),并且会通过心跳监控对应的 TM;
  15. disconnectResourceManager(): 与 ResourceManager 断开连接,这个是有三种情况会触发,JM 与 ResourceManager 心跳超时、作业取消、重连 RM 时会断开连接(比如:RM leader 切换、RM 的心跳超时);
  16. heartbeatFromTaskManager(): TM 向 JM 发送心跳信息;
  17. heartbeatFromResourceManager(): JM 向 ResourceManager 发送一个心跳信息,ResourceManager 只会监听 JM 是否超时;
  18. requestJobDetails(): 请求这个作业的 JobDetails(作业的概况信息,比如:作业执行了多长时间、作业状态等);
  19. requestJobStatus(): 请求这个作业的执行状态 JobStatus
  20. requestJob(): 请求这个作业的 ArchivedExecutionGraph(它是 ExecutionGraph 序列化之后的结果);
  21. triggerSavepoint(): 对这个作业触发一次 savepoint;
  22. stopWithSavepoint(): 停止作业前触发一次 savepoint(触发情况是:用户手动停止作业时指定一个 savepoint 路径,这样的话,会在停止前做一次 savepoint);
  23. requestOperatorBackPressureStats(): 汇报某个 operator 反压的情况;
  24. notifyAllocationFailure(): 如果 RM 分配 slot 失败的话,将会通过这个接口通知 JM;

这里可以看到有部分接口的方法是在跟 RM 通信使用的,所以在 RM 的接口中也可以看到对应的方法。另外,JobMaster 上面这些方法在实现时基本都是在调用 LegacySchedulerSlotPool 的具体实现方法来实现的。

SlotPool

SlotPool 是为当前作业的 slot 请求而服务的,它会向 ResourceManager 请求 slot 资源;SlotPool 会维护请求到的 slot 列表信息(即使 ResourceManager 挂掉了,SlotPool 也可以使用当前作业空闲的 slot 资源进行分配),而如果一个 slot 不再使用的话,即使作业在运行,也是可以释放掉的(所有的 slot 都是通过 AllocationID 来区分的)。

目前 SlotPool 提供的 API 列表如下:

  1. connectToResourceManager(): SlotPool 与 ResourceManager 建立连接,之后 SlotPool 就可以向 ResourceManager 请求 slot 资源了;
  2. disconnectResourceManage(): SlotPool 与 ResourceManager 断开连接,这个方法被调用后,SlotPool 就不能从 ResourceManager 请求 slot 资源了,并且所有正在排队等待的 Slot Request 都被取消;
  3. allocateAvailableSlot(): 将指定的 Slot Request 分配到指定的 slot 上,这里只是记录其对应关系(哪个 slot 对应哪个 slot 请求);
  4. releaseSlot(): 释放一个 slot;
  5. requestNewAllocatedSlot(): 从 RM 请求一个新的 slot 资源分配,申请到的 slot 之后也会添加到 SlotPool 中;
  6. requestNewAllocatedBatchSlot(): 上面的方法是 Stream 类型,这里是 batch 类型,但向 RM 申请的时候,这里并没有区别,只是为了做相应的标识;
  7. getAvailableSlotsInformation(): 获取当前可用的 slot 列表;
  8. failAllocation(): 分配失败,并释放相应的 slot,可能是因为请求超时由 JM 触发或者 TM 分配失败;
  9. registerTaskManager(): 注册 TM,这里会记录一下注册过来的 TM,只能向注册过来的 TM 分配 slot;
  10. releaseTaskManager(): 注销 TM,这个 TM 相关的 slot 都会被释放,task 将会被取消,SlotPool 会通知相应的 TM 释放其 slot;
  11. createAllocatedSlotReport(): 汇报指定 TM 上的 slot 分配情况;

通过上面 SlotPool 对外提供的 API 列表,可以看到其相关方法都是跟 Slot 相关的,整体可以分为下面几部分:

  1. 与 ResourceManager 建立/取消 连接;
  2. 注册/注销 TM,这里只是记录注册过 TM 列表,只有是注册过的 TM 才允许使用其上面的 slot 资源;
  3. 向 ResourceManager 请求 slot 资源;
  4. 分配/释放 slot,这里只是更新其状态信息,并不做实质的操作。

SlotPool 这里,更多只是维护一个状态信息,以及与 ResourceManager(请求 slot 资源)和 TM(释放对应的 slot)做一些交互工作,它对这些功能做了相应的封装,方便 JobMaster 来调用。

LegacyScheduler

如前面所述,LegacyScheduler 其实是对 ExecutionGraphBackPressureStatsTracker 方法的一个抽象,它还负责为作业创建对应的 ExecutionGraph 以及对这个作业进行调度。关于 LegacyScheduler 提供的 API 这里就不再展开,有兴趣的可以直接看下源码,它提供的大部分 API 都是在 JobMaster 的 API 列表中,因为 JobMaster 的很多方法实现本身就是调用 LegacyScheduler 对应的方法。

作业调度的详细流程

有了前面的讲述,这里看下一个新提交的作业,JobMaster 是如何调度起来的。当 JobMaster 调用 LegacyScheduler 的 startScheduling() 方法后,就会开始对这个作业进行相应的调度,申请对应的 slot,并部署 task,其实现如下:

// LegacyScheduler.java
//note: ExecutionGraph 开始调度
@Override
public void startScheduling() {
    //note: 启动这个线程
    mainThreadExecutor.assertRunningInMainThread();

    try {
        //note: 调度这个 graph
        executionGraph.scheduleForExecution();
    }
    catch (Throwable t) {
        executionGraph.failGlobal(t);
    }
}

一个作业开始调度后详细流程如下图所示(其中比较核心方法已经标成黄颜色):

一个作业调度的详细流程

ExecutionGraph 通过 scheduleForExecution() 方法对这个作业调度执行,其方法实现如下:

/note: 把 CREATED 状态转换为 RUNNING 状态,并做相应的调度,如果有异常这里会抛出
public void scheduleForExecution() throws JobException {

    assertRunningInJobMasterMainThread();

    final long currentGlobalModVersion = globalModVersion;

    //note: 先将作业状态转移为 RUNNING
    if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

        //note: 这里会真正调度相应的 Execution Graph
        final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
            scheduleMode,
            getAllExecutionVertices(),
            this);

        if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
            schedulingFuture = newSchedulingFuture;
            //note: 前面调度完成后,如果最后的结果有异常,这里会做相应的处理
            newSchedulingFuture.whenComplete(
                (Void ignored, Throwable throwable) -> {
                    if (throwable != null) {
                        final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);

                        if (!(strippedThrowable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(strippedThrowable);
                        }
                    }
                });
        } else {
            newSchedulingFuture.cancel(false);
        }
    }
    else {
        throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
    }
}

配合前面图中的流程,接下来,看下这个作业在 SchedulingUtils 中是如何调度的:

// SchedulingUtils.java
public static CompletableFuture<Void> schedule(
        ScheduleMode scheduleMode,
        final Iterable<ExecutionVertex> vertices,
        final ExecutionGraph executionGraph) {

    switch (scheduleMode) {
        // LAZY 的意思是:是有上游数据就绪后,下游的 task 才能调度,这个主要是批场景会用到,流不能走这个模式
        case LAZY_FROM_SOURCES:
        case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
            return scheduleLazy(vertices, executionGraph);

        // 流默认的是这个调度模式
        case EAGER:
            return scheduleEager(vertices, executionGraph);

        default:
            throw new IllegalStateException(String.format("Schedule mode %s is invalid.", scheduleMode));
    }
}

/**
 * Schedule vertices eagerly. That means all vertices will be scheduled at once.
 * note: 所有的节点会被同时调度
 *
 * @param vertices Topologically sorted vertices to schedule.
 * @param executionGraph The graph the given vertices belong to.
 */
public static CompletableFuture<Void> scheduleEager(
        final Iterable<ExecutionVertex> vertices,
        final ExecutionGraph executionGraph) {

    executionGraph.assertRunningInJobMasterMainThread();

    checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently");

    // Important: reserve all the space we need up front.
    // that way we do not have any operation that can fail between allocating the slots
    // and adding them to the list. If we had a failure in between there, that would
    // cause the slots to get lost

    // collecting all the slots may resize and fail in that operation without slots getting lost
    final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();

    final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
    final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
        computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));

    // allocate the slots (obtain all their futures)
    for (ExecutionVertex ev : vertices) {
        // these calls are not blocking, they only return futures
        //note: 给每个 Execution 分配相应的资源
        CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
            slotProviderStrategy,
            LocationPreferenceConstraint.ALL,
            allPreviousAllocationIds);

        allAllocationFutures.add(allocationFuture);
    }

    // this future is complete once all slot futures are complete.
    // the future fails once one slot future fails.
    final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

    return allAllocationsFuture.thenAccept(
        (Collection<Execution> executionsToDeploy) -> {
            for (Execution execution : executionsToDeploy) {
                try {
                    //note: 部署每个 Execution
                    execution.deploy();
                } catch (Throwable t) {
                    throw new CompletionException(
                        new FlinkException(
                            String.format("Could not deploy execution %s.", execution),
                            t));
                }
            }
        })
        // Generate a more specific failure message for the eager scheduling
        .exceptionally(
            //...
        );
}

由于对于流作业来说,它默认的调度模式(ScheduleMode)是 ScheduleMode.EAGER,也就是说,所有 task 会同时调度起来,上面的代码里也可以看到调度的时候有两个主要方法:

  1. allocateResourcesForExecution(): 它的作用是给这个 Execution 分配资源,获取要分配的 slot(它还会向 ShuffleMaster 注册 produced partition,这个 shuffle 部分内容后面文章再讲述,这里就不展开了);
  2. deploy(): 这个方法会直接向 TM 提交这个 task 任务;

这里,主要展开一下 allocateResourcesForExecution() 方法的实现,deploy() 的实现将会在后面 TaskManager 这篇文章中讲述。

如何给 ExecutionVertex 分配 slot

通过前面的代码,我们知道,allocateResourcesForExecution() 方法会给每一个 ExecutionVertex 分配一个 slot,而它具体是如何分配的,这个流程是在 Execution 的 allocateAndAssignSlotForExecution() 方法中实现的,代码如下如下:

/**
 * Allocates and assigns a slot obtained from the slot provider to the execution.
 * note: 从 slot provider 获取一个 slot,将任务分配到这个 slot 上
 *
 * @param slotProviderStrategy to obtain a new slot from
 * @param locationPreferenceConstraint constraint for the location preferences
 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
 *                                                 Can be empty if the allocation ids are not required for scheduling.
 * @return Future which is completed with the allocated slot once it has been assigned
 *          or with an exception if an error occurred.
 */
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
        SlotProviderStrategy slotProviderStrategy,
        LocationPreferenceConstraint locationPreferenceConstraint,
        @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {

    checkNotNull(slotProviderStrategy);

    assertRunningInJobMasterMainThread();

    //note: 获取这个 vertex 的相关信息
    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();

    // sanity check
    //note: 做相应的检查
    if (locationConstraint != null && sharingGroup == null) {
        throw new IllegalStateException(
                "Trying to schedule with co-location constraint but without slot sharing allowed.");
    }

    // this method only works if the execution is in the state 'CREATED'
    //note: 这个只会在 CREATED 下工作
    if (transitionState(CREATED, SCHEDULED)) {

        final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;

        //note: 创建一个 ScheduledUnit 对象(跟 sharingGroup/locationConstraint 都有关系)
        ScheduledUnit toSchedule = locationConstraint == null ?
                new ScheduledUnit(this, slotSharingGroupId) :
                new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

        // try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
        //note: 如果能找到之前调度的 AllocationID,会尽量先重新调度在同一个 slot 上
        ExecutionVertex executionVertex = getVertex();
        AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();

        Collection<AllocationID> previousAllocationIDs =
            lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();

        // calculate the preferred locations
        //note: 这里先根据 state 和上游数据的输入节点获取这个 Task Execution 的最佳 TM location
        final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
            calculatePreferredLocations(locationPreferenceConstraint);

        final SlotRequestId slotRequestId = new SlotRequestId();

        //note: 根据指定的需求分配这个 slot
        final CompletableFuture<LogicalSlot> logicalSlotFuture =
            preferredLocationsFuture.thenCompose(
                (Collection<TaskManagerLocation> preferredLocations) ->
                    slotProviderStrategy.allocateSlot(
                        slotRequestId,
                        toSchedule,
                        new SlotProfile(
                            vertex.getResourceProfile(),
                            preferredLocations,
                            previousAllocationIDs,
                            allPreviousExecutionGraphAllocationIds)));

        // register call back to cancel slot request in case that the execution gets canceled
        releaseFuture.whenComplete(
            (Object ignored, Throwable throwable) -> {
                if (logicalSlotFuture.cancel(false)) {
                    slotProviderStrategy.cancelSlotRequest(
                        slotRequestId,
                        slotSharingGroupId,
                        new FlinkException("Execution " + this + " was released."));
                }
            });

        // This forces calls to the slot pool back into the main thread, for normal and exceptional completion
        //note: 返回 LogicalSlot
        return logicalSlotFuture.handle(
            (LogicalSlot logicalSlot, Throwable failure) -> {

                if (failure != null) {
                    throw new CompletionException(failure);
                }

                if (tryAssignResource(logicalSlot)) {
                    return logicalSlot;
                } else {
                    // release the slot
                    logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
                    throw new CompletionException(
                        new FlinkException(
                            "Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
                }
            });
    } else {
        // call race, already deployed, or already done
        throw new IllegalExecutionStateException(this, CREATED, state);
    }
}

这里,简单总结一下上面这个方法的流程:

  1. 状态转换,将这个 Execution 的状态(ExecutionState)从 CREATED 转为 SCHEDULED 状态;
  2. 根据是否是一个有状态的 operator 以及它上游输入节点位置,来计算一个最佳的 TM 位置列表(TaskManagerLocation)列表;
  3. 如果这个 Execution 之前有调度记录,也就是说,这次由 failover 导致的重启,这里会拿到上次调度的 TM 位置信息;
  4. 根据 2、3 拿到 TM 位置信息,去调用 SlotProviderStrategy 的 allocateSlot() 获取要分配的 slot。

在 SchedulerImpl 去分配 slot 的时候,其实是会分两种情况的:

  1. allocateSingleSlot(): 如果对应的 task 节点没有设置 SlotSharingGroup,会直接走这个方法,就不会考虑 share group 的情况,直接给这个 task 分配对应的 slot;
  2. allocateSharedSlot(): 如果对应的 task 节点有设置 SlotSharingGroup,就会走到这个方法,在分配 slot 的时候,考虑的因素就会多一些。

分配时如何选择最优的 TM 列表

这里,我们先来看下如何给这个 slot 选择一个最佳的 TM 列表,具体的方法实现是在 Execution 中的 calculatePreferredLocations() 方法中实现的,其具体的实现如下:

// Execution.java
/**
 * Calculates the preferred locations based on the location preference constraint.
 * note: 根据 LocationPreferenceConstraint 策略计算前置输入节点的 TaskManagerLocation
 *
 * @param locationPreferenceConstraint constraint for the location preference
 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
 *      have been a resource assigned.
 */
@VisibleForTesting
public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    //note: 获取一个最佳分配的 TM location 集合
    final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocations();
    final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;

    switch(locationPreferenceConstraint) {
        case ALL:
            //note: 默认是 ALL,就是前面拿到的列表,这里都可以使用
            preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
            break;
        case ANY:
            //note: 遍历所有 input,先获取已经完成 assign 的 input 列表
            final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());

            for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
                if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
                    //note: 在这个 future 完成(没有异常的情况下),这里会使用这个 taskManagerLocation 对象
                    final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);

                    if (taskManagerLocation == null) {
                        throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
                    }

                    completedTaskManagerLocations.add(taskManagerLocation);
                }
            }

            preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
            break;
        default:
            throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
    }

    return preferredLocationsFuture;
}

从上面的实现可以看出,这里是先通过 ExecutionVertexgetPreferredLocations() 方法获取一个 TaskManagerLocation 列表,然后再根据 LocationPreferenceConstraint 的模式做过滤,如果是 ALL,那么前面拿到的所有列表都会直接返回,而如果是 ANY,只会把那些已经分配好的 input 节点的 TaskManagerLocation 返回。

这里,看下 ExecutionVertexgetPreferredLocations() 方法的实现逻辑:

// ExecutionVertex.java
/**
 * Gets the overall preferred execution location for this vertex's current execution.
 * The preference is determined as follows:
 *
 * <ol>
 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
 *         is the location of the previous execution (if there is a previous execution attempt).
 *     <li>If the task execution has no state or no previous location, then the location preference
 *         is based on the task's inputs.
 * </ol>
 * note: 如果这个 task Execution 是从 checkpoint 加载的状态,那么这个 location preference 就是之前执行的状态;
 * note: 如果这个 task Execution 没有状态信息或之前的 location 记录,这个 location preference 依赖于 task 的输入;
 *
 * <p>These rules should result in the following behavior:
 *
 * note: 1. 无状态 task 总是基于与输入共享的方式调度;
 * note: 2. 有状态 task 基于与输入共享的方式来初始化他们最开始的调度;
 * note: 3. 有状态 task 的重复执行会尽量与他们的 state 共享执行;
 * <ul>
 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
 * </ul>
 */
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
    Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
    return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
}


/**
 * Gets the preferred location to execute the current task execution attempt, based on the state that the execution attempt will resume.
 * note: 根据这个 Execution 试图恢复的状态来获取当前 task execution 的首选位置
 */
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() {
    TaskManagerLocation priorLocation;
    if (currentExecution.getTaskRestore() != null && (priorLocation = getLatestPriorLocation()) != null) {
        return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
    }
    else {
        return null;
    }
}

这里简单介绍一下其处理逻辑:

  1. 如果这个作业是从 Checkpoint 恢复的话,这里会根据它之前的状态信息获取上次的位置信息,直接返回这个位置信息;
  2. 另一种情况是,根据这个 ExecutionVertex 的 inputEdges,获取其上游 ExecutionVertex 的位置信息列表,但是如果这个列表的数目超过阈值(默认是 8),就会直接返回 null(上游过于分散,再根据 input 位置信息去分配就没有太大意义了)。

可以看出,在选取最优的 TaskManagerLocation 列表时,主要是根据 state 和 input 的位置信息来判断,会优先选择 state,也就是上次 checkpoint 中记录的位置。

最优的 slot 分配算法

在上面选择了最优的 TaskManagerLocation 列表后,这里来看下如何给 task 选择具体的 slot,这个是在 SlotSelectionStrategy 中的 selectBestSlotForProfile() 方法中做的,目前 SlotSelectionStrategy 有两个实现类:PreviousAllocationSlotSelectionStrategyLocationPreferenceSlotSelectionStrategy,这个是在 state.backend.local-recovery 参数中配置的,默认是 false,选择的是 PreviousAllocationSlotSelectionStrategy,如果配置为 true,那么就会选择 PreviousAllocationSlotSelectionStrategy,这部分的逻辑如下:

// DefaultSchedulerFactory.java
@Nonnull
private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
    // 根据 state.backend.local-recover 配置选择
    if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
        return PreviousAllocationSlotSelectionStrategy.INSTANCE;
    } else {
        return LocationPreferenceSlotSelectionStrategy.INSTANCE;
    }
}

这里分别看下这两个实现类的 selectBestSlotForProfile() 的实现逻辑:

  1. PreviousAllocationSlotSelectionStrategy: 它会根据上次的分配记录,如果这个位置刚好在 SlotPool 的可用列表里,这里就会直接选这个 slot,否则会走到 LocationPreferenceSlotSelectionStrategy 的处理逻辑;
  2. LocationPreferenceSlotSelectionStrategy: 这个是对可用的 slot 列表做打分,选择分数最高的(分数相同的话,会选择第一个),如果 slot 在前面得到的最优 TaskManagerLocation 列表中,分数就会比较高。

allocateSharedSlot VS allocateSingleSlot

在分配 slot 时,这里分为两种情况:

  1. allocateSingleSlot(): 如果没有设置 SlotSharingGroup 将会走到这个方法,直接给这个 SlotRequestId 分配一个 slot,具体选择哪个 slot 就是上面的逻辑;
  2. allocateSharedSlot(): 而如果设置了 SlotSharingGroup 就会走到这里,先根据 SlotSharingGroupId 获取或创建对应的 SlotSharingManager,然后创建(或者根据 SlotSharingGroup 获取)一个的 MultiTaskSlot(每个 SlotSharingGroup 会对应一个 MultiTaskSlot 对象),这里再将这个 task 分配到这个 MultiTaskSlot 上(这个只是简单介绍,后面在调度模型文章中,将会详细讲述)。

小结

到这里,Flink JobManager 的大部分内容已经讲述完了,还有一些小点会在后面的系列文章中再给大家讲述。这里总结一下,JobManager 主要是为一个具体的作业而服务的,它负责这个作业每个 task 的调度、checkpoint/savepoint(后面 checkpoint 的文章中会详述其流程)的触发以及容错恢复,它有两个非常重点的服务组件 —— LegacySchedulerSlotPool,其中:

  1. LegacyScheduler: 它封装了作业的 ExecutionGraph 以及 BackPressureStatsTracker 中的接口,它会负责这个作业具体调度、savepoint 触发等工作;
  2. SlotPool: 它主要负责这个作业 slot 相关的内容,像与 ResourceManager 通信、分配或释放 slot 资源等工作。

文章的后半部分,又总结了一个作业是如何调度起来的,首先是分配 slot,最后是通过 deploy() 接口向 TM 提交这个 task,本文着重关注了 slot 的分配,task 的部署将会在下节的 TaskManager 详解中给大家介绍。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容