Flink作业提交(三)--- Job运行

源码分析JobMaster如何run起来 介绍到了JobMaster.start方法,这个方法主要是启动rpc服务,并且运行job,接下来看下怎么run job?本文内容是基于Flink 1.9来讲解。

1. 首先看下JobMaster.start方法源码

    /**
     * Start the rpc service and begin to run the job.
     *
     * @param newJobMasterId The necessary fencing token to run the job
     * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
     */
    public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
        // make sure we receive RPC and async calls
        start();

        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }
  • start():JM/TM之间的通信都是通过rpc进行的,这一步是启动JM rpc server服务
  • startJobExecution:异步调用该方法,开始执行job。下面介绍该方法

2. 接下来看JobMaster.startJobExecution方法

    //-- job starting and stopping  -----------------------------------------------------------------

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        startJobMasterServices();

        log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);

        resetAndStartScheduler();

        return Acknowledge.get();
    }

这里有两个重要的方法 startJobMasterServices(); 和 resetAndStartScheduler();  
这两个方法里涉及到的东西都比较多,分开来介绍。

2.1 开始分析 startJobMasterServices() 方法

2.1.1 startHeartbeatServices();

  • 开启TM心跳定时任务
  • 开启RM心跳定时任务

2.1.2 slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

  1. 背景知识:SlotPool是干什么的?
  • SlotPool是为当前作业的slot请求服务的。
  • 它会缓存一个可用slots map(availableSlots),如果有slot请求,首先会先去check availableSlots是否可以满足需求,如果不可以,会向ResourceManager申请新slot(slot使用生成的AllocationID唯一标识)。如果说ResourceManager不可用,或者请求超时,SlotPool会把请求置为fail。
  • 如果ResourceManager挂掉,SlotPool也可以提供空闲的slot进行分配。
  • 释放不再使用的slot,比如:作业已经完全run起来了还有一些空闲slot。
  1. slotPool.start做了哪些工作?
  • 检查空闲的slot,释放过期的slot。
      - 释放过期slot的过程:
         - 如果slot上没有task,直接把TaskSlotState标记为free。
         - 如果slot不为空,也就是有task,那先把TaskSlotState标记为releasing,并且fail上面的tasks。然后TM通知RM该slot可以置为free,RM的slotManager对象会把slot状态置为free。
      - slot过期时间问题
         - 默认值:50s
         - 可以通过设置slot.idle.timeout来自己调整
  • checkBatchSlotTimeout,这块东西没有细看

2.1.3 scheduler.start(getMainThreadExecutor());

  1. 背景知识:scheduler是干什么的?
  • Scheduler会把task分配到slot
  1. scheduler.start做了哪些工作?
  • 把当前JM main thread executor赋值给componentMainThreadExecutor对象。

2.1.4 reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

  • 如果存在RM连接,都close掉
  • 把JM注册到RM,对应ResourceManager.registerJobManager方法
    // ------------------------------------------------------------------------
    //  RPC methods
    // ------------------------------------------------------------------------

    @Override
    public CompletableFuture<RegistrationResponse> registerJobManager(
            final JobMasterId jobMasterId,
            final ResourceID jobManagerResourceId,
            final String jobManagerAddress,
            final JobID jobId,
            final Time timeout) {

        checkNotNull(jobMasterId);
        checkNotNull(jobManagerResourceId);
        checkNotNull(jobManagerAddress);
        checkNotNull(jobId);

        if (!jobLeaderIdService.containsJob(jobId)) {
            try {
                jobLeaderIdService.addJob(jobId);
            } catch (Exception e) {
                ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
                    jobId + " to the job id leader service.", e);

                    onFatalError(exception);

                log.error("Could not add job {} to job leader id service.", jobId, e);
                return FutureUtils.completedExceptionally(exception);
            }
        }

        log.info("Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId);

        CompletableFuture<JobMasterId> jobMasterIdFuture;

        try {
            jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId);
        } catch (Exception e) {
            // we cannot check the job leader id so let's fail
            // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id
            ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
                "job leader id future to verify the correct job leader.", e);

                onFatalError(exception);

            log.debug("Could not obtain the job leader id future to verify the correct job leader.");
            return FutureUtils.completedExceptionally(exception);
        }

        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);

        CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
            jobMasterIdFuture,
            (JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {
                if (Objects.equals(leadingJobMasterId, jobMasterId)) {
                    return registerJobMasterInternal(
                        jobMasterGateway,
                        jobId,
                        jobManagerAddress,
                        jobManagerResourceId);
                } else {
                    final String declineMessage = String.format(
                        "The leading JobMaster id %s did not match the received JobMaster id %s. " +
                        "This indicates that a JobMaster leader change has happened.",
                        leadingJobMasterId,
                        jobMasterId);
                    log.debug(declineMessage);
                    return new RegistrationResponse.Decline(declineMessage);
                }
            },
            getMainThreadExecutor());

        // handle exceptions which might have occurred in one of the futures inputs of combine
        return registrationResponseFuture.handleAsync(
            (RegistrationResponse registrationResponse, Throwable throwable) -> {
                if (throwable != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress, throwable);
                    } else {
                        log.info("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress);
                    }

                    return new RegistrationResponse.Decline(throwable.getMessage());
                } else {
                    return registrationResponse;
                }
            },
            getRpcService().getExecutor());
    }
  • SlotPool与RM建立连接,这样SlotPool就可以向RM申请资源了

2.1.5 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

  • 启动RM leader选举服务

2.2 开始分析 resetAndStartScheduler() 方法

该方法主要处理作业调度相关的工作,包括申请slot以及对Execution进行deploy。首先看下该方法源码

    private void resetAndStartScheduler() throws Exception {
        validateRunsInMainThread();

        final CompletableFuture<Void> schedulerAssignedFuture;

        if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
            schedulerAssignedFuture = CompletableFuture.completedFuture(null);
            schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
        } else {
            suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
            final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
            final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);

            schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
                (ignored, throwable) -> {
                    newScheduler.setMainThreadExecutor(getMainThreadExecutor());
                    assignScheduler(newScheduler, newJobManagerJobMetricGroup);
                    return null;
                }
            );
        }

        schedulerAssignedFuture.thenRun(this::startScheduling);
    }

ExecutionGraph在构建的时候,通过ExecutionGraph成员变量列表可以看到,JobStatus默认是CREATED状态。因此resetAndStartScheduler方法首先走了if逻辑,然后是调用startScheduling,接下来看startScheduling方法逻辑。

    @Override
    public void startScheduling() {
        mainThreadExecutor.assertRunningInMainThread();

        try {
            executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            executionGraph.failGlobal(t);
        }
    }

会调用executionGraph.scheduleForExecution() --> SchedulingUtils.scheduleEager
重点看下SchedulingUtils.scheduleEager,这个方法主要做了两件事情

  • 为每个 ExecutionVertex 申请slot,并把ExecutionVertex的Execution状态从CREATED变为SCHEDULED
  • deploy 所有的 Execution

2.2.1 为每个 ExecutionVertex 申请slot
首先上源码,该源码在SchedulingUtils#scheduleEager方法中

        // collecting all the slots may resize and fail in that operation without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();

        final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
            computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));

        // allocate the slots (obtain all their futures)
        for (ExecutionVertex ev : vertices) {
            // these calls are not blocking, they only return futures
            CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
                slotProviderStrategy,
                LocationPreferenceConstraint.ALL,
                allPreviousAllocationIds);

            allAllocationFutures.add(allocationFuture);
        }

底层真正申请slot的源码在SchedulerImpl#allocateSingleSlot方法中

    private CompletableFuture<LogicalSlot> allocateSingleSlot(
            SlotRequestId slotRequestId,
            SlotProfile slotProfile,
            boolean allowQueuedScheduling,
            @Nullable Time allocationTimeout) {

        Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);

        if (slotAndLocality.isPresent()) {
            // already successful from available
            try {
                return CompletableFuture.completedFuture(
                    completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
            } catch (FlinkException e) {
                return FutureUtils.completedExceptionally(e);
            }
        } else if (allowQueuedScheduling) {
            // we allocate by requesting a new slot
            return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout)
                .thenApply((PhysicalSlot allocatedSlot) -> {
                    try {
                        return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
                    } catch (FlinkException e) {
                        throw new CompletionException(e);
                    }
                });
        } else {
            // failed to allocate
            return FutureUtils.completedExceptionally(
                new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
        }
    }

slot申请流程总结如下:

  1. SlotPoolImpl会保留一个availableSlots map,首先会先去查找availableSlots是否可以满足slot申请条件
  2. 如果availableSlots没有可用slot,那会向RM申请资源
      - flink1.9之后都是按需申请资源,如果作业执行需要的slot没有得到满足,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager

2.2.2 deploy 所有的 Execution
当所有的ExecutionVertex节点申请到slot之后,就开始进行部署,首先看下源码,该源码在SchedulingUtils#scheduleEager方法中

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

        return allAllocationsFuture.thenAccept(
            (Collection<Execution> executionsToDeploy) -> {
                for (Execution execution : executionsToDeploy) {
                    try {
                        execution.deploy();
                    } catch (Throwable t) {
                        throw new CompletionException(
                            new FlinkException(
                                String.format("Could not deploy execution %s.", execution),
                                t));
                    }
                }
            })

deploy方法主要做了下面几件事情

  • 把ExecutionVertex节点这次执行对应的Execution,状态从SCHEDULED切换到DEPLOYING,如果状态切换失败,直接release 这个 slot
  • 往TM上异步提交task
       1. 生成对任务的描述 TaskDeploymentDescription
          - 输入的描述 InputGateDeploymentDescriptor
          - 输出的描述 ResultPartitionDeploymentDescriptor
       2. 重新从 blob store 加载JobInformation和TaskInformation
       3. 构建Task对象
          - 在构造函数中会进行一些初始化工作,比较重要的是根据 InputGateDeploymentDescriptors 创建 InputGates,根据 ResultPartitionDeploymentDescriptors 创建 ResultPartitions
          - Task是在一个TM上单并发subtask的一次执行
          - Task是对Flink operator的封装
          - Task提供了所有的必要服务,比如消费输入数据,输出结果以及和JM进行通信等
          - Flink operator只有数据reader和writers,以及某些callback事件。task会把它们和network栈以及actor message进行连接,还可以追踪execution的状态以及处理exception
          - 某个Task并不知道自己与其他task的关系,这些信息都在JM中;Task只知道自己需要执行的代码,task配置信息,以及消费或者生产的intermediate results 的IDs
       4. 执行Task,task实现了Runnable接口,其实就是启动task线程。源码可以看Task#doRun方法
          - 切换Task的状态,从CREATED变成DEPLOYING
          - 在blob cache中注册job
          - 获取user-code classloader,下载job的jar包或者class的时候会用到
          - 反序列化拿到 ExecutionConfig,ExecutionConfig 中包含所有算子相关的信息,比如ExecutionMode,并发度等
          - 向网络栈注册Task。会逐一注册所有的ResultPartitions和InputGates,并分配bufferPool。在注册InputGate的时候,会为每一个channel都请求对应的子分区。
          - 切换Task的状态,从DEPLOYING变成RUNNING
          - 通过反射调用task

至此,作业已经运行起来了

小结

JobMaster启动作业,主要分成两个步骤

  1. 启动作业运行依赖的服务,比如TM/RM心跳监控,slotPool,scheduler等
  2. 为所有的ExecutionVertex分配slot,并且deploy
  3. JM,TM 申请流程,入口是JobMaster#start方法
       3.1 首先向RM申请JobMaster,并向RM注册
       3.2 然后执行JobMaster#resetAndStartScheduler方法的时候,会先去为每个 ExecutionVertex 申请slot。如果availableSlots可以满足需求,就使用availableSlots;如果availableSlots不能满足需求,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容