源码分析JobMaster如何run起来 介绍到了JobMaster.start方法,这个方法主要是启动rpc服务,并且运行job,接下来看下怎么run job?本文内容是基于Flink 1.9来讲解。
1. 首先看下JobMaster.start方法源码
/**
* Start the rpc service and begin to run the job.
*
* @param newJobMasterId The necessary fencing token to run the job
* @return Future acknowledge if the job could be started. Otherwise the future contains an exception
*/
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
- start():JM/TM之间的通信都是通过rpc进行的,这一步是启动JM rpc server服务
- startJobExecution:异步调用该方法,开始执行job。下面介绍该方法
2. 接下来看JobMaster.startJobExecution方法
//-- job starting and stopping -----------------------------------------------------------------
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
resetAndStartScheduler();
return Acknowledge.get();
}
这里有两个重要的方法 startJobMasterServices(); 和 resetAndStartScheduler();
这两个方法里涉及到的东西都比较多,分开来介绍。
2.1 开始分析 startJobMasterServices() 方法
2.1.1 startHeartbeatServices();
- 开启TM心跳定时任务
- 开启RM心跳定时任务
2.1.2 slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
- 背景知识:SlotPool是干什么的?
- SlotPool是为当前作业的slot请求服务的。
- 它会缓存一个可用slots map(availableSlots),如果有slot请求,首先会先去check availableSlots是否可以满足需求,如果不可以,会向ResourceManager申请新slot(slot使用生成的AllocationID唯一标识)。如果说ResourceManager不可用,或者请求超时,SlotPool会把请求置为fail。
- 如果ResourceManager挂掉,SlotPool也可以提供空闲的slot进行分配。
- 释放不再使用的slot,比如:作业已经完全run起来了还有一些空闲slot。
- slotPool.start做了哪些工作?
- 检查空闲的slot,释放过期的slot。
- 释放过期slot的过程:
- 如果slot上没有task,直接把TaskSlotState标记为free。
- 如果slot不为空,也就是有task,那先把TaskSlotState标记为releasing,并且fail上面的tasks。然后TM通知RM该slot可以置为free,RM的slotManager对象会把slot状态置为free。
- slot过期时间问题
- 默认值:50s
- 可以通过设置slot.idle.timeout来自己调整 - checkBatchSlotTimeout,这块东西没有细看
2.1.3 scheduler.start(getMainThreadExecutor());
- 背景知识:scheduler是干什么的?
- Scheduler会把task分配到slot
- scheduler.start做了哪些工作?
- 把当前JM main thread executor赋值给componentMainThreadExecutor对象。
2.1.4 reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
- 如果存在RM连接,都close掉
- 把JM注册到RM,对应ResourceManager.registerJobManager方法
// ------------------------------------------------------------------------
// RPC methods
// ------------------------------------------------------------------------
@Override
public CompletableFuture<RegistrationResponse> registerJobManager(
final JobMasterId jobMasterId,
final ResourceID jobManagerResourceId,
final String jobManagerAddress,
final JobID jobId,
final Time timeout) {
checkNotNull(jobMasterId);
checkNotNull(jobManagerResourceId);
checkNotNull(jobManagerAddress);
checkNotNull(jobId);
if (!jobLeaderIdService.containsJob(jobId)) {
try {
jobLeaderIdService.addJob(jobId);
} catch (Exception e) {
ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
jobId + " to the job id leader service.", e);
onFatalError(exception);
log.error("Could not add job {} to job leader id service.", jobId, e);
return FutureUtils.completedExceptionally(exception);
}
}
log.info("Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId);
CompletableFuture<JobMasterId> jobMasterIdFuture;
try {
jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId);
} catch (Exception e) {
// we cannot check the job leader id so let's fail
// TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id
ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
"job leader id future to verify the correct job leader.", e);
onFatalError(exception);
log.debug("Could not obtain the job leader id future to verify the correct job leader.");
return FutureUtils.completedExceptionally(exception);
}
CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);
CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
jobMasterIdFuture,
(JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {
if (Objects.equals(leadingJobMasterId, jobMasterId)) {
return registerJobMasterInternal(
jobMasterGateway,
jobId,
jobManagerAddress,
jobManagerResourceId);
} else {
final String declineMessage = String.format(
"The leading JobMaster id %s did not match the received JobMaster id %s. " +
"This indicates that a JobMaster leader change has happened.",
leadingJobMasterId,
jobMasterId);
log.debug(declineMessage);
return new RegistrationResponse.Decline(declineMessage);
}
},
getMainThreadExecutor());
// handle exceptions which might have occurred in one of the futures inputs of combine
return registrationResponseFuture.handleAsync(
(RegistrationResponse registrationResponse, Throwable throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress, throwable);
} else {
log.info("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress);
}
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
return registrationResponse;
}
},
getRpcService().getExecutor());
}
- SlotPool与RM建立连接,这样SlotPool就可以向RM申请资源了
2.1.5 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
- 启动RM leader选举服务
2.2 开始分析 resetAndStartScheduler() 方法
该方法主要处理作业调度相关的工作,包括申请slot以及对Execution进行deploy。首先看下该方法源码
private void resetAndStartScheduler() throws Exception {
validateRunsInMainThread();
final CompletableFuture<Void> schedulerAssignedFuture;
if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
schedulerAssignedFuture = CompletableFuture.completedFuture(null);
schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
} else {
suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);
schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
(ignored, throwable) -> {
newScheduler.setMainThreadExecutor(getMainThreadExecutor());
assignScheduler(newScheduler, newJobManagerJobMetricGroup);
return null;
}
);
}
schedulerAssignedFuture.thenRun(this::startScheduling);
}
ExecutionGraph在构建的时候,通过ExecutionGraph成员变量列表可以看到,JobStatus默认是CREATED状态。因此resetAndStartScheduler方法首先走了if逻辑,然后是调用startScheduling,接下来看startScheduling方法逻辑。
@Override
public void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
try {
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
会调用executionGraph.scheduleForExecution() --> SchedulingUtils.scheduleEager
重点看下SchedulingUtils.scheduleEager,这个方法主要做了两件事情
- 为每个 ExecutionVertex 申请slot,并把ExecutionVertex的Execution状态从CREATED变为SCHEDULED
- deploy 所有的 Execution
2.2.1 为每个 ExecutionVertex 申请slot
首先上源码,该源码在SchedulingUtils#scheduleEager方法中
// collecting all the slots may resize and fail in that operation without slots getting lost
final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();
final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));
// allocate the slots (obtain all their futures)
for (ExecutionVertex ev : vertices) {
// these calls are not blocking, they only return futures
CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
slotProviderStrategy,
LocationPreferenceConstraint.ALL,
allPreviousAllocationIds);
allAllocationFutures.add(allocationFuture);
}
底层真正申请slot的源码在SchedulerImpl#allocateSingleSlot方法中
private CompletableFuture<LogicalSlot> allocateSingleSlot(
SlotRequestId slotRequestId,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@Nullable Time allocationTimeout) {
Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);
if (slotAndLocality.isPresent()) {
// already successful from available
try {
return CompletableFuture.completedFuture(
completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
} catch (FlinkException e) {
return FutureUtils.completedExceptionally(e);
}
} else if (allowQueuedScheduling) {
// we allocate by requesting a new slot
return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout)
.thenApply((PhysicalSlot allocatedSlot) -> {
try {
return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
} catch (FlinkException e) {
throw new CompletionException(e);
}
});
} else {
// failed to allocate
return FutureUtils.completedExceptionally(
new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
}
}
slot申请流程总结如下:
- SlotPoolImpl会保留一个availableSlots map,首先会先去查找availableSlots是否可以满足slot申请条件
- 如果availableSlots没有可用slot,那会向RM申请资源
- flink1.9之后都是按需申请资源,如果作业执行需要的slot没有得到满足,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager
2.2.2 deploy 所有的 Execution
当所有的ExecutionVertex节点申请到slot之后,就开始进行部署,首先看下源码,该源码在SchedulingUtils#scheduleEager方法中
// this future is complete once all slot futures are complete.
// the future fails once one slot future fails.
final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
return allAllocationsFuture.thenAccept(
(Collection<Execution> executionsToDeploy) -> {
for (Execution execution : executionsToDeploy) {
try {
execution.deploy();
} catch (Throwable t) {
throw new CompletionException(
new FlinkException(
String.format("Could not deploy execution %s.", execution),
t));
}
}
})
deploy方法主要做了下面几件事情
- 把ExecutionVertex节点这次执行对应的Execution,状态从SCHEDULED切换到DEPLOYING,如果状态切换失败,直接release 这个 slot
- 往TM上异步提交task
1. 生成对任务的描述 TaskDeploymentDescription
- 输入的描述 InputGateDeploymentDescriptor
- 输出的描述 ResultPartitionDeploymentDescriptor
2. 重新从 blob store 加载JobInformation和TaskInformation
3. 构建Task对象
- 在构造函数中会进行一些初始化工作,比较重要的是根据 InputGateDeploymentDescriptors 创建 InputGates,根据 ResultPartitionDeploymentDescriptors 创建 ResultPartitions
- Task是在一个TM上单并发subtask的一次执行
- Task是对Flink operator的封装
- Task提供了所有的必要服务,比如消费输入数据,输出结果以及和JM进行通信等
- Flink operator只有数据reader和writers,以及某些callback事件。task会把它们和network栈以及actor message进行连接,还可以追踪execution的状态以及处理exception
- 某个Task并不知道自己与其他task的关系,这些信息都在JM中;Task只知道自己需要执行的代码,task配置信息,以及消费或者生产的intermediate results 的IDs
4. 执行Task,task实现了Runnable接口,其实就是启动task线程。源码可以看Task#doRun方法
- 切换Task的状态,从CREATED变成DEPLOYING
- 在blob cache中注册job
- 获取user-code classloader,下载job的jar包或者class的时候会用到
- 反序列化拿到 ExecutionConfig,ExecutionConfig 中包含所有算子相关的信息,比如ExecutionMode,并发度等
- 向网络栈注册Task。会逐一注册所有的ResultPartitions和InputGates,并分配bufferPool。在注册InputGate的时候,会为每一个channel都请求对应的子分区。
- 切换Task的状态,从DEPLOYING变成RUNNING
- 通过反射调用task
至此,作业已经运行起来了
小结
JobMaster启动作业,主要分成两个步骤
- 启动作业运行依赖的服务,比如TM/RM心跳监控,slotPool,scheduler等
- 为所有的ExecutionVertex分配slot,并且deploy
- JM,TM 申请流程,入口是JobMaster#start方法
3.1 首先向RM申请JobMaster,并向RM注册
3.2 然后执行JobMaster#resetAndStartScheduler方法的时候,会先去为每个 ExecutionVertex 申请slot。如果availableSlots可以满足需求,就使用availableSlots;如果availableSlots不能满足需求,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager。