在此梳理一下项目中用到的关于quartz的知识:
1、Spring提供的类
(1)SchedulerFactoryBean
(2)Job相关的类:Job执行任务的逻辑需要自己写,既然用了spring,自然要使用spring提供的Job相关的类。有两个:MethodInvokingJobDetailFactoryBean和QuartzJobBean。其中MethodInvokingJobDetailFactoryBean不支持存储到数据库,会报java.io.NotSerializableException,遂放弃。
2、并发控制
官方文档提供了一种并发控制方法:@DisallowConcurrentExecution
该限制仅针对于JobDetail,同一时刻仅允许执行一个JobDetail,但可以并发执行多个Job类的不同实例。也就是如果用Job构建了多个JobDetail,如JobDetail1,JobDetail2,JobDetail3,那么这3个JobDetail还是并发执行的。
根据org.quartz.threadPool.threadCount配置的线程个数 和 org.quartz.threadPool.class配置的线程类执行自己写的逻辑。
3、数据持久化
quartz提供两种持久化类型:RAMJobStore和JDBC JobStore
RAMJobStore持久化到内存,重启应用后任务丢失。
JDBC JobStore可以持久化到数据库,重启后任务依然存在。
下载官网提供的quartz-2.2.3-distribution.tar.gz包,quartz\quartz-2.2.3\docs\dbTables提供了各种数据库的脚本,建表,quartz.properties文件中配置jobStore类型,代理类和数据源。同时在配置文件中指定quartz.properties文件的位置。
4、动态管理任务
(1)增加:
scheduler.scheduleJob(jobDetail, trigger);
(2)删除:
scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
5、执行的状态
执行状态存放在qrtz_triggers表的trigger_state字段,源码中完整的状态有:WAITING,ACQUIRED,EXECUTING,COMPLETE,BLOCKED,ERROR,PAUSED,PAUSED_BLOCKED。配置文件中配置的JobStore是JobStoreTX,但是状态变化的相关代码都在JobStoreSupport类中,JobStoreSupport调用配置的Delegate拼接sql语句,完成状态变化。
从源码中可以看出,acquired状态表示已经获得的,在job自定义逻辑之前执行。
其他网友整理的状态变化图:
6、自定义Job类中使用spring管理的service
Job继承spring提供的类QuartzJobBean,竟然不能直接注入自己写的service。原因是Quartz初始化是自己的JobContext,不同于Spring的ApplicationContext,所以无法直接注入。后来找到一种解决办法,在构建SchedulerFactoryBean的时候存放到map中。Job中使用时再取出来。
@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
// 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
Map<String, Object> springBeanMap = new HashMap<String, Object>();
springBeanMap.put("testngService", testngService);
springBeanMap.put("quartzService", quartzService);
springBeanMap.put("triggerDao", triggerDao);
factory.setSchedulerContextAsMap(springBeanMap);
factory.setWaitForJobsToCompleteOnShutdown(true);
return factory;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
try {
testngService = (TestngService)context.getScheduler().getContext().get("testngService");
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
7、监听器
quartz提供了TriggerListeners、JobListeners和SchedulerListeners,使用方法在quartz\quartz-2.2.3\examples中有,很详细。
注意:经过测试,监听器在运行过程中动态注册,第一次注册可用,重启后失效。
8、总结:这次学习从0开始到应用到项目中,帮助最大的是官方提供的example代码、源代码和说明文档,在理解这些的基础上,学习一些优秀的博客,总结如下:
中文说明文档:https://www.w3cschool.cn/quartz_doc/quartz_doc-lwuv2d2a.html
增删改查:http://snailxr.iteye.com/blog/2076903#comments
并发:http://blog.csdn.net/will_awoke/article/details/38921273
https://www.cnblogs.com/Rozdy/p/4220186.html
http://www.blogjava.net/stevenjohn/archive/2015/07/26/426425.html
集群:http://www.importnew.com/22896.html
http://soulshard.iteye.com/blog/337886
https://tech.meituan.com/mt-crm-quartz.html
核心概念:http://blog.csdn.net/guolong1983811/article/details/51501346
http://blog.csdn.net/beliefer/article/details/51578546
https://www.cnblogs.com/pzy4447/p/5201674.html
问题:http://blog.csdn.net/jackylovesjava/article/details/50044271
9、以下是一些代码
9.1、完整的配置 quartz.properites
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#线程池相关配置
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#错过执行时间设置
#org.quartz.jobStore.misfireThreshold: 60000
#quartz信息持久化到oracle数据库
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.dataSource: myDS
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.isClustered: false
#数据库连接参数
org.quartz.dataSource.myDS.driver: oracle.jdbc.driver.OracleDriver
org.quartz.dataSource.myDS.URL: jdbc:oracle:thin:@10.10.52.14:1521:wxkfdb
org.quartz.dataSource.myDS.user: autotesting
org.quartz.dataSource.myDS.password: test
org.quartz.dataSource.myDS.maxConnections: 5
9.2、quartz整合spring boot的配置类
@Configuration
public class QuartzCofig {
@Autowired
private TestngService testngService;
@Autowired
private TriggerDao triggerDao;
@Autowired
private QuartzService quartzService;
@Bean(name = "schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
// 把用到的job类中用到的service,dao等传给他,用@Autowired注解无法注入
Map<String, Object> springBeanMap = new HashMap<String, Object>();
springBeanMap.put("testngService", testngService);
springBeanMap.put("quartzService", quartzService);
springBeanMap.put("triggerDao", triggerDao);
factory.setSchedulerContextAsMap(springBeanMap);
factory.setWaitForJobsToCompleteOnShutdown(true);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
// 指定quart.properties文件位置
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
//在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/*
* 通过SchedulerFactoryBean获取Scheduler的实例
* name不能设置为scheduler,否则QuartzService里注入的不是此处定义的scheduler
*/
@Bean(name="myScheduler")
public Scheduler scheduler() throws IOException {
System.out.println("schedulerFactoryBean().getScheduler():" + schedulerFactoryBean().getScheduler());
return schedulerFactoryBean().getScheduler();
}
}
9.3、自定义的Job逻辑
@Configuration
@Component
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ScheduleJob extends QuartzJobBean {
private Logger logger = LoggerFactory.getLogger(ScheduleJob.class);
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
TestSuite testSuite = (TestSuite)dataMap.get("testSuite");
Project project = (Project) dataMap.get("project");
TestngService testngService = null;
try {
testngService = (TestngService)context.getScheduler().getContext().get("testngService");
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
logger.info("---" + context.getJobDetail().getKey() + "想要执行---");
testngService.run(testSuite, project);
}
}
9.4、增加,删除service
@Service("quartzService")
public class QuartzService {
@Resource(name = "myScheduler")
private Scheduler scheduler;
@Autowired
private TriggerDao triggerDao;
@Autowired
private ProcessDao processDao;
@Autowired
private ApplicationContext applicationContext;
private Logger logger = LoggerFactory.getLogger(QuartzService.class);
/**
* 增加或修改一个job
* @param testSuite
* @param project
*/
public void addJob(TestSuite testSuite, Project project) {
String runTime = testSuite.getRuntime();
if(!StringUtils.isEmpty(testSuite.getRuntime())) {
// 生成一个triggerKey
String testSuiteName = testSuite.getName();
String projectName = project.getName();
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("testSuite", testSuite);
jobDataMap.put("project", project);
JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class)
.withIdentity(testSuiteName, projectName)
.usingJobData(jobDataMap)
.build();
// 向Job传值
// jobDetail.getJobDataMap().put("testSuite", testSuite);
// jobDetail.getJobDataMap().put("project", project);
TestSuite testsuite = (TestSuite) jobDetail.getJobDataMap().get("testSuite");
// misfire处理:上一个job执行结束,立即执行这个
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(runTime);
// .withMisfireHandlingInstructionFireAndProceed();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(testSuiteName, projectName)
.withSchedule(scheduleBuilder)
.build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
}
Trigger triggerInsert = new Trigger(project.getName() + "." + testSuite.getName(),
TriggerStateConstant.WAITING, null, project.getProjectid());
triggerDao.insertOne(triggerInsert);
}
}
/**
* 删除job
* @param testSuite
* @param project
*/
public void deleteJob(TestSuite testSuite, Project project) {
try {
scheduler.pauseTrigger(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.unscheduleJob(TriggerKey.triggerKey(testSuite.getName(), project.getName()));
scheduler.deleteJob(JobKey.jobKey(testSuite.getName(), project.getName()));
// 删除sttrigger表的记录
triggerDao.deleteByTriggerId(project.getName() + "." + testSuite.getName());
} catch (SchedulerException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
/**
* 判断该project下是否有trigger触发,如果有返回true
* @param projectId
* @return
*/
public boolean hasTriggerFired(String projectId) {
List<Trigger> triggerList = triggerDao.findByProjectId(projectId);
if(CollectionUtils.isEmpty(triggerList)) {
return true;
} else {
return false;
}
}
/**
* 获取可以执行的process
* @param projectId
* @return
*/
public Process getAvaliableProcess(String projectId) {
// 获取该project下所有process
List<String> processIdList = processDao.findIdByProjectId(projectId);
// 获取正在执行中的testsuite对应的processid
List<String> executingProcssIdList = triggerDao.findExecutingProcess(projectId);
List<String> differentList = new ArrayList<>();
if(!CollectionUtils.isEmpty(executingProcssIdList)) {
Set<String> processIdSet = new HashSet<>();
processIdSet.addAll(processIdList);
Set<String> executingProcssIdSet = new HashSet<String>();
executingProcssIdSet.addAll(executingProcssIdList);
// 取差集
Set<String> differentSet = CommonUtil.getDifferentSet(processIdSet, executingProcssIdSet);
differentList.addAll(differentSet);
} else {
differentList = processIdList;
}
// 随机获取一个Process
if(differentList.size() != 0) {
String randomProcessId = CommonUtil.getRandom(differentList);
return processDao.findByProcessId(randomProcessId);
} else {
return null;
}
}
}
9.5、资源调度的单例类。
(1)用单例模式的原因:要保证每个Job执行的过程中获得的ProcessResource类的对象是同一个对象,map 是同一个map,否则有多个map的话,使用的就不是同一份资源了。
public class ProcessResource {
private ProcessDao processDao = (ProcessDao) SpringUtil.getBean("processDao");
private Map<String, LinkedList<Process>> map = new HashMap<String, LinkedList<Process>>();
private static ProcessResource instance = null;
private Object lock = new Object();
private Logger logger = LoggerFactory.getLogger(ProcessResource.class);
private ProcessResource() {
if (instance != null) {
return;
}
}
public static ProcessResource getInstance() {
if (instance == null) {
synchronized (ProcessResource.class) {
if (instance == null) {
instance = new ProcessResource();
instance.init();
}
}
}
return instance;
}
public void init() {
List<String> projectIdList = processDao.findProjectId();
for(String projectId : projectIdList) {
LinkedList<Process> list = processDao.findByProjectId(projectId);
map.put(projectId, list);
}
instance.setMap(map);
}
public Process getProcess(String projectId) {
synchronized (lock) {
LinkedList<Process> list = instance.getMap().get(projectId);
// 判断list中是否有元素,如果有,返回, 如果没有,打印信息
if(!CollectionUtils.isEmpty(list)) {
return list.removeFirst();
} else {
logger.info("ProcessResource中没有可用的Process了....");
return null;
}
}
}
/**
* 释放资源
* @param process
* @param projectId
*/
public void releaseProcess(Process process, String projectId) {
LinkedList<Process> list = instance.getMap().get(projectId);
// 判断list中是否有元素,如果有,返回, 如果没有,打印信息
list.addLast(process);
}
public void setMap(Map<String, LinkedList<Process>> map) {
this.map = map;
}
public Map<String, LinkedList<Process>> getMap() {
return map;
}
}