一,前言
XXL-JOB是一个优秀的国产开源分布式任务调度平台,他有着自己的一套调度注册中心,提供了丰富的调度和阻塞策略等,这些都是可视化的操作,使用起来十分方便。
由于是国产的,所以上手还是比较快的,而且他的源码也十分优秀,因为是调试平台所以线程这一块的使用是很频繁的,特别值得学习研究。
XXL-JOB一同分为两个模块,调度中心模块和执行模块。具体解释,我们copy下官网的介绍:
调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。
下面看下springboot环境下的使用方式,首先看下执行器的配置:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//调度中心地址
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
//执行器AppName
xxlJobSpringExecutor.setAppname(appname);
//执行器注册地址,默认为空即可
xxlJobSpringExecutor.setAddress(address);
//执行器IP [选填]:默认为空表示自动获取IP
xxlJobSpringExecutor.setIp(ip);
//执行器端口
xxlJobSpringExecutor.setPort(port);
//执行器通讯TOKEN
xxlJobSpringExecutor.setAccessToken(accessToken);
//执行器运行日志文件存储磁盘路径
xxlJobSpringExecutor.setLogPath(logPath);
//执行器日志文件保存天数
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XXL-JOB提供了多种任务执行方式,我们今天看下最简单的bean执行模式。如下:
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
现在在调度中心稍做配置,我们这段代码就可以按照一定的策略进行调度执行,是不是很神奇?我们先看下官网上的解释:
原理:每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。
纸上得来终觉浅,绝知此事要躬行,今天的任务就是跟着这段话,我们大体看一波源码的实现方式。
二,XxlJobSpringExecutor
XxlJobSpringExecutor其实看名字,我们都能想到,这是XXL-JOB为了适应spring模式的应用而开发的模板类,先看下他的实现结构。
XxlJobSpringExecutor继承自XxlJobExecutor
,同时由于是用在spring环境,所以实现了多个spring内置的接口来配合实现整个执行器模块功能,每个接口的功能就不细说了,相信大家都可以百度查到。
我们看下初始化方法afterSingletonsInstantiated
:
// start
@Override
public void afterSingletonsInstantiated() {
//注册每个任务
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
主流程看上去是比较简单的,首先是注册每一个JobHandler,然后进行初始化操作,GlueFactory.refreshInstance(1)
是为了另一种调用模式时用到的,主要是用到了groovy
,不在这次的分析中,我们就不看了。我们继续看下如何注册JobHandler的。
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 遍历所有beans,取出所有包含有@XxlJob的方法
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//遍历@XxlJob方法,取出executeMethod以及注解中对应的initMethod, destroyMethod进行注册
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
executeMethod.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
}
}
// 注册 jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
}
}
XxlJobSpringExecutor由于实现了ApplicationContextAware
,所以通过applicationContext可以获得所有容器中的bean实例,再通过MethodIntrospector来过滤出所有包含@XxlJob注解的方法,最后把对应的executeMethod以及注解中对应的initMethod, destroyMethod进行注册到jobHandlerRepository
中,jobHandlerRepository
是一个线程安全ConcurrentMap,MethodJobHandler实现自IJobHandler
接口的一个模板类,主要作用就是通过反射去执行对应的方法。看到这,之前那句话任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。我们就明白了。
public class MethodJobHandler extends IJobHandler {
....
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod = initMethod;
this.destroyMethod = destroyMethod;
}
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
三,执行服务器initEmbedServer
看完上面的JobHandler注册,后面紧着就是执行器模块的启动操作了,下面看下start方法:
public void start() throws Exception {
// 初始化日志path
XxlJobFileAppender.initLogPath(logPath);
// 注册adminBizList
initAdminBizList(adminAddresses, accessToken);
// 初始化日志清除线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化回调线程,用来把执行结果回调给调度中心
TriggerCallbackThread.getInstance().start();
// 执行服务器启动
initEmbedServer(address, ip, port, appname, accessToken);
}
前几个操作,我们就不细看了,大家有兴趣的可以自行查看,我们直接进入initEmbedServer方法查看内部服务器如何启动,以及向调试中心注册的。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
...
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
public void start(final String address, final int port, final String appname, final String accessToken) {
```
// 启动netty服务器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 执行向调度中心注册
startRegistry(appname, address);
```
}
因为执行器模块本身需要有通讯交互的需求,不然调度中心是无法调用他的,所以内嵌了一个netty服务器进行通信。启动成功后,正式向调试中心执行注册请求。我们直接看注册的代码:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
//执行注册请求
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
XxlJobRemotingUtil.postBody
就是个符合XXL-JOB规范的restful的http请求处理,里面不止有注册请求,还有下线请求,回调请求等,碍于篇幅,就不一一展示了,调度中心接到对应的请求,会有对应的DB处理:
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
跟到这里,我们就已经大概了解了整个注册的流程。同样当调度中心向我们执行器发送请求,譬如说执行任务调度的请求时,也是同样的http请求发送我们上面分析的执行器中内嵌netty服务进行操作,这边只展示调用方法:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
这样,我们执行器模块收到请求后会执行我们上面注册中的jobHandle进行对应的方法执行,执行器会将请求存入“异步执行队列”并且立即响应调度中心,异步运行对应方法。这样一套注册和执行的流程就大致走下来了。
四,结尾
当然事实上XXL-JOB的代码还有许多丰富的特性,碍于本人实力不能一一道明,我这也是抛转引玉,只是把最基础的一些地方介绍给大家,有兴趣的话,大家可以自行查阅相关代码,总的来说,毕竟是国产开源的优秀项目,还是值得赞赏的,也希望国内以后有越来越多优秀开源框架。