5、quartz使用总结

在此梳理一下项目中用到的关于quartz的知识:
1、Spring提供的类
(1)SchedulerFactoryBean
(2)Job相关的类:Job执行任务的逻辑需要自己写,既然用了spring,自然要使用spring提供的Job相关的类。有两个:MethodInvokingJobDetailFactoryBean和QuartzJobBean。其中MethodInvokingJobDetailFactoryBean不支持存储到数据库,会报java.io.NotSerializableException,遂放弃。

2、并发控制
官方文档提供了一种并发控制方法:@DisallowConcurrentExecution
该限制仅针对于JobDetail,同一时刻仅允许执行一个JobDetail,但可以并发执行多个Job类的不同实例。也就是如果用Job构建了多个JobDetail,如JobDetail1,JobDetail2,JobDetail3,那么这3个JobDetail还是并发执行的。
根据org.quartz.threadPool.threadCount配置的线程个数 和 org.quartz.threadPool.class配置的线程类执行自己写的逻辑。

3、数据持久化
quartz提供两种持久化类型:RAMJobStore和JDBC JobStore
RAMJobStore持久化到内存,重启应用后任务丢失。
JDBC JobStore可以持久化到数据库,重启后任务依然存在。
下载官网提供的quartz-2.2.3-distribution.tar.gz包,quartz\quartz-2.2.3\docs\dbTables提供了各种数据库的脚本,建表,quartz.properties文件中配置jobStore类型,代理类和数据源。同时在配置文件中指定quartz.properties文件的位置。

4、动态管理任务
(1)增加:

scheduler.scheduleJob(jobDetail, trigger);

(2)删除:

scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));

5、执行的状态
执行状态存放在qrtz_triggers表的trigger_state字段,源码中完整的状态有:WAITING,ACQUIRED,EXECUTING,COMPLETE,BLOCKED,ERROR,PAUSED,PAUSED_BLOCKED。配置文件中配置的JobStore是JobStoreTX,但是状态变化的相关代码都在JobStoreSupport类中,JobStoreSupport调用配置的Delegate拼接sql语句,完成状态变化。
从源码中可以看出,acquired状态表示已经获得的,在job自定义逻辑之前执行。
其他网友整理的状态变化图:


image.png

6、自定义Job类中使用spring管理的service
Job继承spring提供的类QuartzJobBean,竟然不能直接注入自己写的service。原因是Quartz初始化是自己的JobContext,不同于Spring的ApplicationContext,所以无法直接注入。后来找到一种解决办法,在构建SchedulerFactoryBean的时候存放到map中。Job中使用时再取出来。

@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setQuartzProperties(quartzProperties());
    // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
    Map<String, Object> springBeanMap = new HashMap<String, Object>();
    springBeanMap.put("testngService", testngService);
    springBeanMap.put("quartzService", quartzService);
    springBeanMap.put("triggerDao", triggerDao);
    factory.setSchedulerContextAsMap(springBeanMap);
    factory.setWaitForJobsToCompleteOnShutdown(true);
    return factory;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
    JobDataMap dataMap = context.getJobDetail().getJobDataMap();
    try {
        testngService = (TestngService)context.getScheduler().getContext().get("testngService");
    } catch (SchedulerException e) {
        e.printStackTrace();
        logger.error(e.getMessage());
    }
}

7、监听器
quartz提供了TriggerListeners、JobListeners和SchedulerListeners,使用方法在quartz\quartz-2.2.3\examples中有,很详细。
注意:经过测试,监听器在运行过程中动态注册,第一次注册可用,重启后失效。

8、总结:这次学习从0开始到应用到项目中,帮助最大的是官方提供的example代码、源代码和说明文档,在理解这些的基础上,学习一些优秀的博客,总结如下:
中文说明文档:https://www.w3cschool.cn/quartz_doc/quartz_doc-lwuv2d2a.html
增删改查:http://snailxr.iteye.com/blog/2076903#comments
并发:http://blog.csdn.net/will_awoke/article/details/38921273
   https://www.cnblogs.com/Rozdy/p/4220186.html
   http://www.blogjava.net/stevenjohn/archive/2015/07/26/426425.html
集群:http://www.importnew.com/22896.html
   http://soulshard.iteye.com/blog/337886
   https://tech.meituan.com/mt-crm-quartz.html
核心概念:http://blog.csdn.net/guolong1983811/article/details/51501346
     http://blog.csdn.net/beliefer/article/details/51578546
     https://www.cnblogs.com/pzy4447/p/5201674.html
问题:http://blog.csdn.net/jackylovesjava/article/details/50044271

9、以下是一些代码
9.1、完整的配置 quartz.properites

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

#线程池相关配置
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

#错过执行时间设置
#org.quartz.jobStore.misfireThreshold: 60000

#quartz信息持久化到oracle数据库
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.dataSource: myDS
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false

#数据库连接参数
org.quartz.dataSource.myDS.driver: oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.myDS.URL: jdbc:oracle:thin:@10.10.52.14:1521:wxkfdb
org.quartz.dataSource.myDS.user: autotesting
org.quartz.dataSource.myDS.password: test
org.quartz.dataSource.myDS.maxConnections: 5

9.2、quartz整合spring boot的配置类

@Configuration
public class QuartzCofig {

    @Autowired
    private TestngService testngService;

    @Autowired
    private TriggerDao triggerDao;

    @Autowired
    private QuartzService quartzService;

    @Bean(name = "schedulerFactory")
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(quartzProperties());
        // 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
        Map<String, Object> springBeanMap = new HashMap<String, Object>();
        springBeanMap.put("testngService", testngService);
        springBeanMap.put("quartzService", quartzService);
        springBeanMap.put("triggerDao", triggerDao);
        factory.setSchedulerContextAsMap(springBeanMap);
        factory.setWaitForJobsToCompleteOnShutdown(true);
        return factory;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        // 指定quart.properties文件位置
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /*
     * 通过SchedulerFactoryBean获取Scheduler的实例
     * name不能设置为scheduler,否则QuartzService里注入的不是此处定义的scheduler
     */
    @Bean(name="myScheduler")
    public Scheduler scheduler() throws IOException {
        System.out.println("schedulerFactoryBean().getScheduler():" + schedulerFactoryBean().getScheduler());
        return schedulerFactoryBean().getScheduler();
    }
}

9.3、自定义的Job逻辑

@Configuration
@Component
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ScheduleJob extends QuartzJobBean {

    private Logger logger = LoggerFactory.getLogger(ScheduleJob.class);

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();

        TestSuite testSuite = (TestSuite)dataMap.get("testSuite");
        Project project = (Project) dataMap.get("project");

        TestngService testngService = null;
        try {
            testngService = (TestngService)context.getScheduler().getContext().get("testngService");
        } catch (SchedulerException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        logger.info("---" + context.getJobDetail().getKey() + "想要执行---");
        testngService.run(testSuite, project);
    }
}

9.4、增加,删除service

@Service("quartzService")
public class QuartzService {

    @Resource(name = "myScheduler")
    private Scheduler scheduler;

    @Autowired
    private TriggerDao triggerDao;

    @Autowired
    private ProcessDao processDao;

    @Autowired
    private ApplicationContext applicationContext;

    private Logger logger = LoggerFactory.getLogger(QuartzService.class);

    /**
     * 增加或修改一个job
     * @param testSuite
     * @param project
     */
    public void addJob(TestSuite testSuite, Project project) {
        String runTime = testSuite.getRuntime();
         if(!StringUtils.isEmpty(testSuite.getRuntime())) {
            // 生成一个triggerKey
            String testSuiteName = testSuite.getName();
            String projectName = project.getName();

             JobDataMap jobDataMap = new JobDataMap();
             jobDataMap.put("testSuite", testSuite);
             jobDataMap.put("project", project);

            JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class)
                    .withIdentity(testSuiteName, projectName)
                    .usingJobData(jobDataMap)
                    .build();
            // 向Job传值

//            jobDetail.getJobDataMap().put("testSuite", testSuite);
//            jobDetail.getJobDataMap().put("project", project);
             TestSuite testsuite = (TestSuite) jobDetail.getJobDataMap().get("testSuite");
            // misfire处理:上一个job执行结束,立即执行这个
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(runTime);
//                        .withMisfireHandlingInstructionFireAndProceed();
            CronTrigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(testSuiteName, projectName)
                    .withSchedule(scheduleBuilder)
                    .build();
            try {
                scheduler.scheduleJob(jobDetail, trigger);
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
            Trigger triggerInsert = new Trigger(project.getName() + "." + testSuite.getName(),
                    TriggerStateConstant.WAITING, null, project.getProjectid());
            triggerDao.insertOne(triggerInsert);
        }
    }

    /**
     * 删除job
     * @param testSuite
     * @param project
     */
    public void deleteJob(TestSuite testSuite, Project project) {
        try {
            scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
            scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
            scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
            // 删除sttrigger表的记录
            triggerDao.deleteByTriggerId(project.getName() + "." + testSuite.getName());
        } catch (SchedulerException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    /**
     * 判断该project下是否有trigger触发,如果有返回true
     * @param projectId
     * @return
     */
    public boolean hasTriggerFired(String projectId) {
        List<Trigger> triggerList = triggerDao.findByProjectId(projectId);
        if(CollectionUtils.isEmpty(triggerList)) {
            return true;
        } else {
            return false;
        }
    }

    /**
     * 获取可以执行的process
     * @param projectId
     * @return
     */
    public Process getAvaliableProcess(String projectId) {
        // 获取该project下所有process
        List<String> processIdList = processDao.findIdByProjectId(projectId);
        // 获取正在执行中的testsuite对应的processid
        List<String> executingProcssIdList = triggerDao.findExecutingProcess(projectId);

        List<String> differentList = new ArrayList<>();
        if(!CollectionUtils.isEmpty(executingProcssIdList)) {
            Set<String> processIdSet = new HashSet<>();
            processIdSet.addAll(processIdList);
            Set<String> executingProcssIdSet = new HashSet<String>();
            executingProcssIdSet.addAll(executingProcssIdList);
            // 取差集
            Set<String> differentSet = CommonUtil.getDifferentSet(processIdSet, executingProcssIdSet);
            differentList.addAll(differentSet);
        } else {
            differentList = processIdList;
        }
        // 随机获取一个Process
        if(differentList.size() != 0) {
            String randomProcessId = CommonUtil.getRandom(differentList);
            return processDao.findByProcessId(randomProcessId);
        } else {
            return null;
        }
    }
}

9.5、资源调度的单例类。
(1)用单例模式的原因:要保证每个Job执行的过程中获得的ProcessResource类的对象是同一个对象,map 是同一个map,否则有多个map的话,使用的就不是同一份资源了。

public class ProcessResource {

    private ProcessDao processDao = (ProcessDao) SpringUtil.getBean("processDao");
    private Map<String, LinkedList<Process>> map = new HashMap<String, LinkedList<Process>>();
    private static ProcessResource instance = null;
    private Object lock = new Object();

    private Logger logger = LoggerFactory.getLogger(ProcessResource.class);


    private ProcessResource() {
        if (instance != null) {
            return;
        }
    }

    public static ProcessResource getInstance() {
        if (instance == null) {
            synchronized (ProcessResource.class) {
                if (instance == null) {
                    instance = new ProcessResource();
                    instance.init();
                }
            }
        }
        return instance;
    }


    public void init() {
        List<String> projectIdList = processDao.findProjectId();
        for(String projectId : projectIdList) {
            LinkedList<Process> list = processDao.findByProjectId(projectId);
            map.put(projectId, list);
        }
        instance.setMap(map);
    }

    public Process getProcess(String projectId) {
        synchronized (lock) {
            LinkedList<Process> list = instance.getMap().get(projectId);
            // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
            if(!CollectionUtils.isEmpty(list)) {
                return list.removeFirst();
            } else {
                logger.info("ProcessResource中没有可用的Process了....");
                return null;
            }
        }
    }

    /**
     * 释放资源
     * @param process
     * @param projectId
     */
    public void releaseProcess(Process process, String projectId) {
        LinkedList<Process> list = instance.getMap().get(projectId);
        // 判断list中是否有元素,如果有,返回, 如果没有,打印信息
        list.addLast(process);
    }

    public void setMap(Map<String, LinkedList<Process>> map) {
        this.map = map;
    }

    public Map<String, LinkedList<Process>> getMap() {
        return map;
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容