写在前面
elasticjob是当当开源的分布式定时任务调度中间件,目前在github上有6k+的star。elasticjob有两种模式:lite模式和cloud模式,本文只是想探究主流分布式定时任务调度中间件的实现原理,因此只解析较为简单的lite模式。
不知道大家有没有想过一个单机定时任务是怎么去驱动的?我们很容易想到jdk的ScheduledExecutorService,它的原理就是将任务丢进按照执行时间排序的队列,然后线程池中的线程不断扫描最近的任务,如果任务已经到了执行时间,取出并执行任务即可。那还有没有其他方式呢?是不是也可以把任务存储在非本地内存上,存在本地文件或者redis zetset或者db,然后也是通过线程不断扫描取出任务执行,这样做能够有效避免本地内存任务数据的丢失。或者也可以通过更好的数据结构如时间轮存储任务数据,这样做能够有效减少任务的扫描范围,提高执行效率。
理解了这一单机定时任务的实现原理后,对于分布式的定时任务就很容易理解了,需要解决的就是如何跟执行同一任务的集群协调、如何获取分布式任务、如何将分布式任务分片"降级"成单机任务罢了。重点已经不是最后的单机任务执行,而是集群的任务协调处理方式。
elasticjob的分布式任务的集群协调通过成熟的分布式协调服务zookeeper来实现,而单机定时任务通过成熟的定时任务框架quartz来实现。elasticjob的架构图如下。
job的初始化
下面开始源码解析,ScheduleJobBootstrap是elasticjob的启动器,JobScheduler的构造方法拥有三个必选的入参:CoordinatorRegistryCenter协调中心、ElasticJob自定义任务处理器、JobConfiguration任务配置,一个可选入参:ElasticJobListener任务监听器。然后调用schedule方法,进而调用jobScheduler.getJobScheduleController().scheduleJob进行定时任务的调度。
public ScheduleJobBootstrap(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobScheduler = new JobScheduler(regCenter, elasticJob, jobConfig, elasticJobListeners);
}
public void schedule() {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobScheduler.getJobConfig().getCron()), "Cron can not be empty.");
jobScheduler.getJobScheduleController().scheduleJob(jobScheduler.getJobConfig().getCron());
}
进入JobScheduler的构造方法,主要有5种类型的方法:生成SetUpFacade、SchedulerFacade、LiteJobFacade,生成ElasticJobExecutor,调用setUpFacade.setUpJobConfiguration初始化配置,调用setGuaranteeServiceForElasticJobListeners为listener设置GuaranteeService,createJobScheduleController完成收尾,接下来一个个分析这些方法。
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final TracingConfiguration<?> tracingConfig,
final ElasticJobListener... elasticJobListeners) {
this.regCenter = regCenter;
this.elasticJob = elasticJob;
elasticJobType = null;
this.elasticJobListeners = Arrays.asList(elasticJobListeners);
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners, tracingConfig);
jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners);
jobScheduleController = createJobScheduleController();
}
生成SetUpFacade、SchedulerFacade、LiteJobFacade,这三个Facade都是对regCenter的封装,可以向zookeeper进行增删改查,先不细说。
public SetUpFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
configService = new ConfigurationService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName) {
this.jobName = jobName;
leaderService = new LeaderService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
}
public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final TracingConfiguration tracingConfig) {
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionContextService = new ExecutionContextService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
failoverService = new FailoverService(regCenter, jobName);
this.elasticJobListeners = elasticJobListeners;
this.jobEventBus = null == tracingConfig ? new JobEventBus() : new JobEventBus(tracingConfig);
}
生成ElasticJobExecutor,对elasticJob、jobConfig、jobFacade进行赋值,通过JobItemExecutorFactory.getExecutor获取到jobItemExecutor,JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService获取到executorService,JobErrorHandlerFactory.getHandler获取到jobErrorHandler,对itemErrorMessages进行赋值。
public ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade) {
this(elasticJob, jobConfig, jobFacade, JobItemExecutorFactory.getExecutor(elasticJob.getClass()));
}
private ElasticJobExecutor(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final JobItemExecutor jobItemExecutor) {
this.elasticJob = elasticJob;
this.jobConfig = jobConfig;
this.jobFacade = jobFacade;
this.jobItemExecutor = jobItemExecutor;
executorService = JobExecutorServiceHandlerFactory.getHandler(jobConfig.getJobExecutorServiceHandlerType()).createExecutorService(jobConfig.getJobName());
jobErrorHandler = JobErrorHandlerFactory.getHandler(jobConfig.getJobErrorHandlerType());
itemErrorMessages = new ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}
setUpFacade.setUpJobConfiguration会判断是否config路径还没有数据并且需要覆盖配置,如果是就将config写入zookeeper的/jobName/config的路径中,并且写入jobClassName到/jobName的路径中。否则调用load从本地缓存或者zookeeper的config路径读取配置。
public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) {
return configService.setUpJobConfiguration(jobClassName, jobConfig);
}
public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) {
checkConflictJob(jobClassName, jobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || jobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, YamlEngine.marshal(JobConfigurationPOJO.fromJobConfiguration(jobConfig)));
jobNodeStorage.replaceJobRootNode(jobClassName);
return jobConfig;
}
return load(false);
}
public JobConfiguration load(final boolean fromCache) {
String result;
if (fromCache) {
result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
if (null == result) {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
} else {
result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
}
return YamlEngine.unmarshal(result, JobConfigurationPOJO.class).toJobConfiguration();
}
setGuaranteeServiceForElasticJobListeners将生成GuaranteeService设置到listener中,不再细说。
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}
createJobScheduleController是重头戏,首先会创建调用createScheduler、createJobDetail、 getJobConfig().getJobName()获取入参生成JobScheduleController,然后调用 JobRegistry.getInstance().registerJob向本地的map注册JobScheduleController,然后调用registerStartUpInfo注册启动信息。
private JobScheduleController createJobScheduleController() {
JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
registerStartUpInfo();
return result;
}
先来看JobScheduleController的生成,入参是quartz的Scheduler、JobDetail、triggerIdentity。
Scheduler通过StdSchedulerFactory并且注入一些quartz的配置参数,然后调用其getScheduler获取,最后注册listener到Scheduler,来处理错过调度到的任务。
JobDetail通过注入jobExecutor来生成,使得任务执行时会调用jobExecutor的execute方法。
triggerIdentity即是jobName。
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getQuartzProps());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
private Properties getQuartzProps() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", getJobConfig().getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
private JobDetail createJobDetail() {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(getJobConfig().getJobName()).build();
result.getJobDataMap().put(JOB_EXECUTOR_DATA_MAP_KEY, jobExecutor);
return result;
}
接下来是registerStartUpInfo,分别会调用JobRegistry的registerRegistryCenter、addJobInstance、setCurrentShardingTotalCount,最后调用setUpFacade.registerStartUpInfo。
private void registerStartUpInfo() {
JobRegistry.getInstance().registerRegistryCenter(jobConfig.getJobName(), regCenter);
JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance());
JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount());
setUpFacade.registerStartUpInfo(!jobConfig.isDisabled());
}
JobRegistry的registerRegistryCenter、addJobInstance、setCurrentShardingTotalCount三个是将regCenter、jobInstance、currentShardingTotalCount进行缓存,并且初始化对/jobName路径下数据的缓存。
public void registerRegistryCenter(final String jobName, final CoordinatorRegistryCenter regCenter) {
regCenterMap.put(jobName, regCenter);
regCenter.addCacheData("/" + jobName);
}
public void addJobInstance(final String jobName, final JobInstance jobInstance) {
jobInstanceMap.put(jobName, jobInstance);
}
public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
}
setUpFacade.registerStartUpInfo是最终的初始化方法,里面的方法很重要,对每一个都分析一下。
public void registerStartUpInfo(final boolean enabled) {
listenerManager.startAllListeners();
leaderService.electLeader();
serverService.persistOnline(enabled);
instanceService.persistOnline();
shardingService.setReshardingFlag();
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
listenerManager.startAllListeners会启动所有对zookeeper路径数据变化的listener。
public void startAllListeners() {
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
electionListenerManager监听以下事件:
1.当leader节点被删除或者leader节点不存在时,触发leaderService.electLeader选举。
2.如果本身节点被置为不可用并且本身节点是leader,则移除本身节点的leader节点,以此来触发leaderService.electLeader选举。
class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}
private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
}
private boolean isPassiveElection(final String path, final Type eventType) {
JobInstance jobInstance = JobRegistry.getInstance().getJobInstance(jobName);
return !Objects.isNull(jobInstance) && isLeaderCrashed(path, eventType) && serverService.isAvailableServer(jobInstance.getIp());
}
private boolean isLeaderCrashed(final String path, final Type eventType) {
return leaderNode.isLeaderInstancePath(path) && Type.NODE_DELETED == eventType;
}
private boolean isLocalServerEnabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
}
}
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
shardingListenerManager监听以下事件:
1.配置节点中的shardingTotalCount发生改变,调用shardingService.setReshardingFlag,强制任务执行时重新分配分片。
2.server或者instance节点发生改变,调用shardingService.setReshardingFlag,强制任务执行时重新分配分片。
class ShardingTotalCountChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
class ListenServersChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_CHANGED != eventType;
}
private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}
failoverListenerManager监听以下事件:
1.instance节点发生下线,将其拥有的失效转移任务再次触发失效转移到其他节点执行。
2.配置节点中的isFailover发生改变,若不需要失效转移,则直接清除现有的失效转移任务。
class JobCrashedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}
}
class FailoverSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isFailover()) {
failoverService.removeFailoverInfo();
}
}
}
monitorExecutionListenerManager监听以下事件:
1.配置节点中的isMonitorExecution发生改变,若不需要提醒执行信息,则直接清除现有的显示执行的节点。
class MonitorExecutionSettingsChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().isMonitorExecution()) {
executionService.clearAllRunningInfo();
}
}
}
shutdownListenerManager监听以下事件:
1.本身的instance节点被删除,需要调用schedulerFacade.shutdownInstance关闭掉本身节点的定时任务。
class InstanceShutdownStatusJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()
&& isRemoveInstance(path, eventType) && !isReconnectedRegistryCenter()) {
schedulerFacade.shutdownInstance();
}
}
private boolean isRemoveInstance(final String path, final Type eventType) {
return instanceNode.isLocalInstancePath(path) && Type.NODE_DELETED == eventType;
}
private boolean isReconnectedRegistryCenter() {
return instanceService.isLocalJobInstanceExisted();
}
}
triggerListenerManager监听以下事件:
1.出现指定本身节点的trigger节点,将清除trigger节点,然后调用triggerJob立刻执行一次任务。
class JobTriggerStatusJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_CHANGED != eventType) {
return;
}
instanceService.clearTriggerFlag();
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
// TODO At present, it cannot be triggered when the job is running, and it will be changed to a stacked trigger in the future.
JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
}
}
}
rescheduleListenerManager监听以下事件:
1.配置节点中的cron发生改变,则调用rescheduleJob重新调度定时任务。
class CronSettingAndJobEventChangedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_CHANGED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(YamlEngine.unmarshal(data, JobConfigurationPOJO.class).toJobConfiguration().getCron());
}
}
}
guaranteeListenerManager监听以下事件:
1.start节点发生改变,将调用listener的notifyWaitingTaskStart通知任务启动。
2.complete发生改变,将调用listener的notifyWaitingTaskComplete通知任务完成。
class StartedNodeRemovedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
}
}
}
}
}
class CompletedNodeRemovedJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
}
}
}
}
}
regCenterConnectionStateListener监听的是对zookeeper的连接状态,若连接出现挂起或者丢失状态,则暂停任务,如果重新连接上,则重新写入server、instance本身节点到zookeeper,清除本身节点的运行节点,然后重新调度定时任务。
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return;
}
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) {
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
instanceService.persistOnline();
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
jobScheduleController.resumeJob();
}
}
完成所有listener的初始化后,将调用leaderService.electLeader开始选举,将使用zookeeper分布式锁的机制,所有节点都往同一路径下创建顺序节点,只有获取到最小序号的节点会执行LeaderElectionExecutionCallback的execute方法,将本身节点的信息写入leader节点,宣布选举成功。因此后续其他节点再进入LeaderElectionExecutionCallback的execute方法时,发现leader节点拥有数据,自己只能成为follower结束选举流程。
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
public void electLeader() {
log.debug("Elect a new leader now.");
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
由上文讲到,listener基本上都是围绕server、instance节点来进行处理,接下来serverService.persistOnline和instanceService.persistOnline就是创建本身节点的server、instance节点。
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}
public void persistOnline() {
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
}
因为是初始化,还是需要调用shardingService.setReshardingFlag,强制定时任务执行前进行分片。
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
最后是reconcileService.startAsync,会定时检测leader状态触发重新选举和触发重新分片,避免完全依赖listener处理核心逻辑,使得系统拥有一个自检测修复的功能。
@Override
protected void runOneIteration() {
int reconcileIntervalMinutes = configService.load(true).getReconcileIntervalMinutes();
if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) {
lastReconcileTime = System.currentTimeMillis();
if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) {
log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
shardingService.setReshardingFlag();
}
}
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}
job的执行
来到job的执行流程解析,在定时任务执行时,会调用到ElasticJobExecutor的execute方法,下面会一步步分析。
public void execute() {
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(),
shardingContexts.getShardingItemParameters().keySet()));
return;
}
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, ExecutionSource.MISFIRE);
}
jobFacade.failoverIfNecessary();
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
jobFacade.checkJobExecutionEnvironment最终会调用到ConfigurationService的checkMaxTimeDiffSecondsTolerable方法,原理是通过在zookeeper写入一个节点,然后获取其写入时间,然后与本地时间对比获取时间差,若时间差大于最大容忍的时间差则抛出错误,不执行任务。
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
if (-1 == maxTimeDiffSeconds) {
return;
}
long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
if (timeDiff > maxTimeDiffSeconds * 1000L) {
throw new JobExecutionEnvironmentException(
"Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
}
}
jobFacade.getShardingContexts是获取ShardingContexts的方法,主要是获取到具体的分片items,然后将jobConfig和分片items结合生成ShardingContexts返回。如果是支持failover则返回failover的分片,否则需要调用shardingService.shardingIfNecessary检测是否需要重新分片,然后再获取到分到本节点的分片返回。
@Override
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
}
shardingService.shardingIfNecessary();
List<Integer> shardingItems = shardingService.getLocalShardingItems();
if (isFailover) {
shardingItems.removeAll(failoverService.getLocalTakeOffItems());
}
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
return executionContextService.getJobShardingContext(shardingItems);
}
public ShardingContexts getJobShardingContext(final List<Integer> shardingItems) {
JobConfiguration jobConfig = configService.load(false);
removeRunningIfMonitorExecution(jobConfig.isMonitorExecution(), shardingItems);
if (shardingItems.isEmpty()) {
return new ShardingContexts(buildTaskId(jobConfig, shardingItems), jobConfig.getJobName(), jobConfig.getShardingTotalCount(),
jobConfig.getJobParameter(), Collections.emptyMap());
}
Map<Integer, String> shardingItemParameterMap = new ShardingItemParameters(jobConfig.getShardingItemParameters()).getMap();
return new ShardingContexts(buildTaskId(jobConfig, shardingItems), jobConfig.getJobName(), jobConfig.getShardingTotalCount(),
jobConfig.getJobParameter(), getAssignedShardingItemParameterMap(shardingItems, shardingItemParameterMap));
}
shardingService.shardingIfNecessary是重新分片的方法,上文已经说到,如果需要分片,则会在zookeeper加上shard目录加上necessary节点。如果检测到需要重新分片,follower会等待重新分片完成,leader则执行重新分片逻辑,需要先等到正在执行的分片item完成,然后标记重新分片为进行中,调用resetShardingInfo将shard节点调整为正确数量(扩缩容),然后调用jobShardingStrategy的sharding方法,获取到分片结果,最终调用jobNodeStorage.executeInTransaction将分片结果写入到zookeeper。
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
继续回到主流程,拿到ShardingContexts后就调用execute方法执行,忽略一些前后钩子方法的执行,其实就是根据分片号执行任务,如果是只有1个分片,则直接在原线程执行。否则将丢进线程池,然后使用CountDownLatch等待所有任务执行完成返回。
private void execute(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
return;
}
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
try {
process(shardingContexts, executionSource);
} finally {
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
} else {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
private void process(final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(() -> {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
jobFacade.postJobExecutionEvent(startEvent);
log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
JobExecutionEvent completeEvent;
try {
jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
jobFacade.postJobExecutionEvent(completeEvent);
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
下面来分析一下elasticjob三种job形态。
SimpleJobExecutor直接调用用户定义的elasticJob的execute执行即可。
@Override
public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
elasticJob.execute(shardingContext);
}
ScriptJobExecutor将分片参数转化成命令行,调用底层的命令行执行方法即可。
@Override
public void process(final ElasticJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
CommandLine commandLine = CommandLine.parse(getScriptCommandLine(jobConfig.getProps()));
commandLine.addArgument(GsonFactory.getGson().toJson(shardingContext), false);
try {
new DefaultExecutor().execute(commandLine);
} catch (final IOException ex) {
throw new JobSystemException("Execute script failure.", ex);
}
}
private String getScriptCommandLine(final Properties props) {
String result = props.getProperty(ScriptJobProperties.SCRIPT_KEY);
if (Strings.isNullOrEmpty(result)) {
throw new JobConfigurationException("Cannot find script command line, job is not executed.");
}
return result;
}
DataflowJobExecutor会根据配置选择对应的流式处理模式,如果是oneOffExecute,则调用一次fetchData拉取数据,再调用一次processData处理数据就结束。如果是streamingExecute,则会不断进行拉取处理的循环,直到拉取的数据为空。
@Override
public void process(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
if (Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString())) {
streamingExecute(elasticJob, jobConfig, jobFacade, shardingContext);
} else {
oneOffExecute(elasticJob, shardingContext);
}
}
private void streamingExecute(final DataflowJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
List<Object> data = fetchData(elasticJob, shardingContext);
while (null != data && !data.isEmpty()) {
processData(elasticJob, shardingContext, data);
if (!isEligibleForJobRunning(jobConfig, jobFacade)) {
break;
}
data = fetchData(elasticJob, shardingContext);
}
}
private boolean isEligibleForJobRunning(final JobConfiguration jobConfig, final JobFacade jobFacade) {
return !jobFacade.isNeedSharding() && Boolean.parseBoolean(jobConfig.getProps().getOrDefault(DataflowJobProperties.STREAM_PROCESS_KEY, false).toString());
}
private void oneOffExecute(final DataflowJob elasticJob, final ShardingContext shardingContext) {
List<Object> data = fetchData(elasticJob, shardingContext);
if (null != data && !data.isEmpty()) {
processData(elasticJob, shardingContext, data);
}
}
@SuppressWarnings("unchecked")
private List<Object> fetchData(final DataflowJob elasticJob, final ShardingContext shardingContext) {
return elasticJob.fetchData(shardingContext);
}
@SuppressWarnings("unchecked")
private void processData(final DataflowJob elasticJob, final ShardingContext shardingContext, final List<Object> data) {
elasticJob.processData(shardingContext, data);
}
继续主流程的错过调度的分片item处理,上文分析到本地的quartz是注册了错过调度listener,如果检测到错过调度,则向zookeeper注册错过调度节点。此时则是取出错过调度节点,然后执行。
execute(shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, ExecutionSource.MISFIRE);
}
最后是jobFacade.failoverIfNecessary,也就是再一次取出failover的节点进行执行。
@Override
public void failoverIfNecessary() {
if (configService.load(true).isFailover()) {
failoverService.failoverIfNecessary();
}
}
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
private boolean needFailover() {
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
&& !JobRegistry.getInstance().isJobRunning(jobName);
}
@Override
public void execute() {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO Instead of using triggerJob, use executor for unified scheduling
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}