源码简析XXL-JOB的注册和执行过程

一,前言

XXL-JOB是一个优秀的国产开源分布式任务调度平台,他有着自己的一套调度注册中心,提供了丰富的调度和阻塞策略等,这些都是可视化的操作,使用起来十分方便。

由于是国产的,所以上手还是比较快的,而且他的源码也十分优秀,因为是调试平台所以线程这一块的使用是很频繁的,特别值得学习研究。

XXL-JOB一同分为两个模块,调度中心模块和执行模块。具体解释,我们copy下官网的介绍:

  • 调度模块(调度中心):
    负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

  • 执行模块(执行器):
    负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
    接收“调度中心”的执行请求、终止请求和日志请求等。

image

XXL-JOB中“调度模块”和“任务模块”完全解耦,调度模块进行任务调度时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,调度中心提供调用代理的功能,而执行器提供远程服务的功能。

下面看下springboot环境下的使用方式,首先看下执行器的配置:

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        //调度中心地址
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        //执行器AppName
        xxlJobSpringExecutor.setAppname(appname);
        //执行器注册地址,默认为空即可
        xxlJobSpringExecutor.setAddress(address);
        //执行器IP [选填]:默认为空表示自动获取IP
        xxlJobSpringExecutor.setIp(ip);
        //执行器端口
        xxlJobSpringExecutor.setPort(port);
        //执行器通讯TOKEN
        xxlJobSpringExecutor.setAccessToken(accessToken);
        //执行器运行日志文件存储磁盘路径
        xxlJobSpringExecutor.setLogPath(logPath);
        //执行器日志文件保存天数
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

XXL-JOB提供了多种任务执行方式,我们今天看下最简单的bean执行模式。如下:

    /**
     * 1、简单任务示例(Bean模式)
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        XxlJobHelper.log("XXL-JOB, Hello World.");

        for (int i = 0; i < 5; i++) {
            XxlJobHelper.log("beat at:" + i);
            TimeUnit.SECONDS.sleep(2);
        }
        // default success
    }

现在在调度中心稍做配置,我们这段代码就可以按照一定的策略进行调度执行,是不是很神奇?我们先看下官网上的解释:

原理:每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑。

纸上得来终觉浅,绝知此事要躬行,今天的任务就是跟着这段话,我们大体看一波源码的实现方式。

二,XxlJobSpringExecutor

XxlJobSpringExecutor其实看名字,我们都能想到,这是XXL-JOB为了适应spring模式的应用而开发的模板类,先看下他的实现结构。

image

XxlJobSpringExecutor继承自XxlJobExecutor,同时由于是用在spring环境,所以实现了多个spring内置的接口来配合实现整个执行器模块功能,每个接口的功能就不细说了,相信大家都可以百度查到。

我们看下初始化方法afterSingletonsInstantiated

    // start
    @Override
    public void afterSingletonsInstantiated() {

        //注册每个任务
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

主流程看上去是比较简单的,首先是注册每一个JobHandler,然后进行初始化操作,GlueFactory.refreshInstance(1)是为了另一种调用模式时用到的,主要是用到了groovy,不在这次的分析中,我们就不看了。我们继续看下如何注册JobHandler的。

 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // 遍历所有beans,取出所有包含有@XxlJob的方法
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);

            Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }
            //遍历@XxlJob方法,取出executeMethod以及注解中对应的initMethod, destroyMethod进行注册
            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // 注册 jobhandler
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
            }
        }

    }

XxlJobSpringExecutor由于实现了ApplicationContextAware,所以通过applicationContext可以获得所有容器中的bean实例,再通过MethodIntrospector来过滤出所有包含@XxlJob注解的方法,最后把对应的executeMethod以及注解中对应的initMethod, destroyMethod进行注册到jobHandlerRepository中,jobHandlerRepository是一个线程安全ConcurrentMap,MethodJobHandler实现自IJobHandler接口的一个模板类,主要作用就是通过反射去执行对应的方法。看到这,之前那句话任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。我们就明白了。

public class MethodJobHandler extends IJobHandler {
    ....
    public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
        this.target = target;
        this.method = method;

        this.initMethod = initMethod;
        this.destroyMethod = destroyMethod;
    }

    @Override
    public void execute() throws Exception {
        Class<?>[] paramTypes = method.getParameterTypes();
        if (paramTypes.length > 0) {
            method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
        } else {
            method.invoke(target);
        }
    }

三,执行服务器initEmbedServer

看完上面的JobHandler注册,后面紧着就是执行器模块的启动操作了,下面看下start方法:

    public void start() throws Exception {

        // 初始化日志path
        XxlJobFileAppender.initLogPath(logPath);

        // 注册adminBizList
        initAdminBizList(adminAddresses, accessToken);

        // 初始化日志清除线程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 初始化回调线程,用来把执行结果回调给调度中心
        TriggerCallbackThread.getInstance().start();

        // 执行服务器启动
        initEmbedServer(address, ip, port, appname, accessToken);
    }

前几个操作,我们就不细看了,大家有兴趣的可以自行查看,我们直接进入initEmbedServer方法查看内部服务器如何启动,以及向调试中心注册的。

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
        ...
        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

    public void start(final String address, final int port, final String appname, final String accessToken) {
        ```
        // 启动netty服务器
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel channel) throws Exception {
                        channel.pipeline()
                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                .addLast(new HttpServerCodec())
                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                    }
                })
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        // bind
        ChannelFuture future = bootstrap.bind(port).sync();

        logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

        // 执行向调度中心注册
        startRegistry(appname, address);
        ```
    }

因为执行器模块本身需要有通讯交互的需求,不然调度中心是无法调用他的,所以内嵌了一个netty服务器进行通信。启动成功后,正式向调试中心执行注册请求。我们直接看注册的代码:

    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        try {
            //执行注册请求
            ReturnT<String> registryResult = adminBiz.registry(registryParam);
            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                registryResult = ReturnT.SUCCESS;
                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                break;
            } else {
                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
            }
        } catch (Exception e) {
            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
        }
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

XxlJobRemotingUtil.postBody就是个符合XXL-JOB规范的restful的http请求处理,里面不止有注册请求,还有下线请求,回调请求等,碍于篇幅,就不一一展示了,调度中心接到对应的请求,会有对应的DB处理:

        // services mapping
        if ("callback".equals(uri)) {
            List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }

跟到这里,我们就已经大概了解了整个注册的流程。同样当调度中心向我们执行器发送请求,譬如说执行任务调度的请求时,也是同样的http请求发送我们上面分析的执行器中内嵌netty服务进行操作,这边只展示调用方法:

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

这样,我们执行器模块收到请求后会执行我们上面注册中的jobHandle进行对应的方法执行,执行器会将请求存入“异步执行队列”并且立即响应调度中心,异步运行对应方法。这样一套注册和执行的流程就大致走下来了。

四,结尾

当然事实上XXL-JOB的代码还有许多丰富的特性,碍于本人实力不能一一道明,我这也是抛转引玉,只是把最基础的一些地方介绍给大家,有兴趣的话,大家可以自行查阅相关代码,总的来说,毕竟是国产开源的优秀项目,还是值得赞赏的,也希望国内以后有越来越多优秀开源框架。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容