任务调度简介
1、什么时候需要任务调度
业务场景:
1)账单日或者还款日上午 9 点,给每个信用卡客户发送账单通知,还款通知,如何判断客户的账单日、还款日,完成通知的发送?
2)银行业务系统,夜间要完成跑批的一系列流程,清理数据,下载文件,解析文件,对账清算、切换结算日期等等,如何触发一系列流程的执行?
3)金融机构跟人民银行二代支付系统对接,人民银行要求低于 5W 的金额(小额支付)半个小时打一次包发送,以缓解并发压力。所以,银行的跨行转账分成了多个流程:
录入、复核、发送。如何把半个小时以内的所有数据一次性发送?
类似于这种 1、基于准确的时刻或者固定的时间间隔触发的任务,或者 2、有批量数据需要处理,或者 3、要实现两个动作解耦的场景,我们都可以用任务调度来实现
2、任务调度需求分析
基本需求:
1)可以定义触发的规则,比如基于时刻、时间间隔、表达式
2)可以定义需要执行的任务。比如执行一个脚本或者一段代码。任务和规则是分开的
3)集中管理配置,持久配置。不用把规则写在代码里面,可以看到所有的任务配置,方便维护,重启之后任务可以再次调度——配置文件或者配置中心
4)支持任务的串行执行,例如执行 A 任务后再执行 B 任务再执行 C 任务
5)支持多个任务并发执行,互不干扰(例如ScheduledThreadPoolExecutor 线程池)
6)有自己的调度器,可以启动、中断、停止任务
7)容易集成到 Spring
3、任务调度工具对比
层次 | 举例 | 特点 |
---|---|---|
操作系统 | Linux Window 计划任务 | 只能执行简单脚本或者命令 |
数据库 | MySQL Oracle | 可以操作数据,不能执行java代码 |
工具 | kettle | 可以操作数据库,执行脚本,没有集中配置 |
开发语言 | JDK Timer、ScheduledThreadPool | Timer:单线程 JDK1.5 之后:ScheduledThreadPool(Cache、Fiexed、Single):没有集中配置,日常管理不够灵活 |
容器 | Spring Task、@Scheduled | 不支持集群 |
分布式框架 | XXL-JOB、Elastic-Job | 支持集群,集中配置,容易管理 |
@Scheduled 也是用 JUC 的 ScheduledExecutorService 实现的
Scheduled(cron = “0/5 * * * * ?”)
1) ScheduledAnnotationBeanPostProcessor
的 postProcessAfterInitialization
方法将@Scheduled
的方法包装为指定的 task添加到 ScheduledTaskRegistrar
中
2) ScheduledAnnotationBeanPostProcessor
会监听 Spring 的容器初始化事件,在 Spring 容器初始化完成后进行TaskScheduler
实现类实例的查找,若发现有 SchedulingConfigurer
的实现类实例,则跳过 3
3) 查找 TaskScheduler
的实现类实例默认是通过类型查找,若有多个实现则会查找名字为"taskScheduler"
的实现 Bean,若没有找到则在 ScheduledTaskRegistrar
调度任务的时候会创建一个newSingleThreadScheduledExecutor
,将TaskScheduler
的实现类实例设置到 ScheduledTaskRegistrar
属性中
4)ScheduledTaskRegistrar
的 scheduleTasks
方法触发任务调度
5)真正调度任务的类是 TaskScheduler
实现类中的 ScheduledExecutorService
,由 J.U.C 提供
Quartz基本介绍
Quatz 是一个特性丰富的,开源的任务调度库,它几乎可以嵌入所有的 Java 程序,从很小的独立应用程序到大型商业系统。Quartz 可以用来创建成百上千的简单的或者复杂的任务,这些任务可以用来执行任何程序可以做的事情。Quartz 拥有很多企业级的特性,包括支持 JTA 事务和集群
Quartz 是一个老牌的任务调度系统,98 年构思,01 年发布到 sourceforge。现在更新比较慢,因为已经非常成熟了。
Quartz 的目的就是让任务调度更加简单,开发人员只需要关注业务即可。他是用 Java 语言编写的(也有.NET 的版本)。Java 代码能做的任何事情,Quartz 都可以调度。
特点:
a)精确到毫秒级别的调度
b)可以独立运行,也可以集成到容器中
c)支持事务(JobStoreCMT )
d)支持集群
e)支持持久化
Quartz Java编程
1、引入依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
2、默认配置文件
org.quartz.core 包下,有一个默认的配置文件,quartz.properties,当我们没有定义一个同名的配置文件的时候,就会使用默认配置文件里面的配置
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
3、创建Job
实现唯一的方法 execute(),方法中的代码就是任务执行的内容
public class MyJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("定时任务测试");
}
}
在测试类方法中,把 Job 进一步包装成 JobDetail
,必须要指定 JobName
和 groupName
,两个合起来是唯一标识符,可以携带 KV 的数据(JobDataMap
),用于扩展属性,在运行的时候可以从 context
获取到
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class)
.withIdentity("job1", "group1")
.usingJobData("vincent","666")
.usingJobData("liao",1314)
.build();
4、创建Trigger
在测试类 main()方法中,基于 SimpleTrigger 定义了一个每 2 秒钟运行一次、不断重复的 Trigger:
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();
5、创建Scheduler
在测试类 main()方法中,通过 Factory 获取调度器的实例,把 JobDetail
和 Trigger
绑定,注册到容器中
Scheduler
启动顺序无所谓,只要有 Trigger 到达触发条件,就会执行任务
SchedulerFactory factory = new StdSchedulerFactory();
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
调度器一定是单例的
6、体系结构总结
1)JobDetail
创建一个实现 Job 接口的类,使用 JobBuilder
包装成 JobDetail
,它可以携带KV
的数据
2)Trigger
定义任务的触发规律:Trigger
,使用 TriggerBuilder 来构建
JobDetail
跟 Trigger
是 1:N 的关系
为什么要解耦?
Trigger
接口在 Quartz
有 4 个继承的子接口:
子接口 | 描述 | 特点 |
---|---|---|
SimpleTrigger | 简单触发器 | 固定时刻或时间间隔,单位是毫秒 |
CalendarIntervalTrigger | 基于日历的触发器 | 比简单触发器更多时间单位,支持非固定时间的触发,例如一年可能 365/366,一个月可能 28/29/30/31 |
DailyTimeIntervalTrigger | 基于日期的触发器 | 每天的某个时间段 |
CronTrigger | 基于Cron表达式的触发器 |
SimpleTrigger
SimpleTrigger 可以定义固定时刻或者固定时间间隔的调度规则(精确到毫秒)
例如:每天 9 点钟运行;每隔 30 分钟运行一次
CalendarIntervalTrigger
可以定义更多时间单位的调度需求,精确到秒
好处是:不需要去计算时间间隔,比如 1 个小时等于多少毫秒
例如每年、每个月、每周、每天、每小时、每分钟、每秒、每年的月数和每个月的天数不是固定的,这种情况也适用
DailyTimeIntervalTrigger
每天的某个时间段内,以一定的时间间隔执行任务
例如:每天早上 10 点到晚上 7 点,每隔半个小时执行一次,并且只在周一到周六执行
CronTrigger
可以定义基于 Cron 表达式的调度规则,是最常用的触发器类型
Cron表达式
位置 | 时间域 | 特殊值 | |
---|---|---|---|
1 | 秒 | 0-59 | , - * / |
2 | 分钟 | 0-59 | , - * / |
3 | 小时 | 0-23 | , - * / |
4 | 日期 | 1-31 | , - * ? / L W C |
5 | 月份 | 1-12 | , - * / |
6 | 星期 | 1-7 | , - * ? / L W C |
7 | 年份 | 1-31 | , - * / |
解析:
星号(*):可用在所有字段中,表示对应时间域的每一个时刻,例如,在分钟字段时,表示“每分钟”;
问号(?):该字符只在日期和星期字段中使用,它通常指定为“无意义的值”,相当于点位符;
减号(-):表达一个范围,如在小时字段中使用“10-12”,则表示从 10 到 12 点,即 10,11,12;
逗号(,):表达一个列表值,如在星期字段中使用“MON,WED,FRI”,则表示星期一,星期三和星期五;
斜杠(/):x/y 表达一个等步长序列,x 为起始值,y 为增量步长值。如在分钟字段中使用 0/15,则表示为 0,15,30 和45 秒,而 5/15 在分钟字段中表示 5,20,35,50,你也可以使用*/y,它等同于 0/y;
L:该字符只在日期和星期字段中使用,代表“Last”的意思,但它在两个字段中意思不同。L 在日期字段中,表示这个月份的最后一天,如一月的 31 号,非闰年二月的 28 号;如果 L 用在星期中,则表示星期六,等同于 7。但是,如果 L 出现在星期字段里,而且在前面有一个数值 X,则表示“这个月的最后 X 天”,例如,6L 表示该月的最后星期五;
W:该字符只能出现在日期字段里,是对前导日期的修饰,表示离该日期最近的工作日。例如 15W 表示离该月 15号最近的工作日,如果该月 15 号是星期六,则匹配 14 号星期五;如果 15 日是星期日,则匹配 16 号星期一;如果 15号是星期二,那结果就是 15 号星期二。但必须注意关联的匹配日期不能够跨月,如你指定 1W,如果 1 号是星期六,结果匹配的是 3 号星期一,而非上个月最后的那天。W 字符串只能指定单一日期,而不能指定日期范围;
LW 组合:在日期字段可以组合使用 LW,它的意思是当月的最后一个工作日;`井号(#)`:该字符只能在星期字段中使用,表示当月某个工作日。如 `6#3` 表示当月的第三个星期五(6 表示星期五,`#3`表示当前的第三个),而 `4#5` 表示当月的第五个星期三,假设当月没有第五个星期三,忽略不触发;
C:该字符只在日期和星期字段中使用,代表“Calendar”的意思。它的意思是计划所关联的日期,如果日期没有被关联,则相当于日历中所有日期。例如 5C 在日期字段中就相当于日历 5 日以后的第一天。1C 在星期字段中相当于星期日后的第一天。Cron 表达式对特殊字符的大小写不敏感,对代表星期的缩写英文大小写也不敏感
3)Scheduler
调度器,是 Quartz
的指挥官,由 StdSchedulerFactory
产生。它是单例的
Quartz 中最重要的 API,默认是实现类是 StdScheduler
,里面包含了一个QuartzScheduler
,QuartzScheduler
里面又包含了一个 QuartzSchedulerThread
Scheduler 中的方法主要分为三大类:
1)操作调度器本身,例如调度器的启动 start()、调度器的关闭 shutdown()
2)操作 Trigger,例如 pauseTriggers()、resumeTrigger()
3)操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()
4)Listener
需求:在每个任务运行结束之后发送通知给运维管理员,那是不是
要在每个任务的最后添加一行代码呢?这种方式对原来的代码造成了入侵,不利于维护,如果代码不是写在任务代码的最后一行,怎么知道任务执行完了呢?或者说,怎么监测到任务的生命周期呢?
观察者模式:定义对象间一种一对多的依赖关系,使得每当一个对象改变状态,则所有依赖它的对象都会得到通知并自动更新
Quartz 中提供了三种 Listener:
监听 Scheduler 的,监听 Trigger 的,监听 Job 的
只需要创建类实现相应的接口,并在 Scheduler 上注册 Listener,便可实现对核心对象的监听
JobListener
public class MyJobListener implements JobListener {
//返回JobListener名称
public String getName() {
String name = getClass().getSimpleName();
System.out.println( "Method 111111 :"+ "获取到监听器名称:"+name);
return name;
}
//Scheduler在JobDetail将要被执行时调用这个方法
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 222222 :"+ jobName + " ——任务即将执行 ");
}
//Scheduler在JobDetail即将被执行,但又被TriggerListener否决了时调用这个方法
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 333333 :"+ jobName + " ——任务被否决 ");
}
//Scheduler在JobDetail被执行之后调用这个方法
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String jobName = context.getJobDetail().getKey().getName();
System.out.println("Method 444444 :"+ jobName + " ——执行完毕 ");
System.out.println("------------------");
}
}
public class MyJobListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Job Listener
scheduler.getListenerManager().addJobListener(new MyJobListener(), EverythingMatcher.allJobs());
scheduler.start();
}
}
工具类:ListenerManager,用于添加、获取、移除监听器
工具类:Matcher,主要是基于 groupName 和 keyName 进行匹配。
TriggerListener
public class MyTriggerListener implements TriggerListener {
private String name;
public MyTriggerListener(String name) {
this.name = name;
}
//返回监听器的名称
public String getName() {
return name;
}
// Trigger 被触发,Job 上的 execute() 方法将要被执行时,Scheduler就调用这个方法
public void triggerFired(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 11111 " + triggerName + " was fired");
}
// 在 Trigger 触发后,Job 将要被执行时由 Scheduler 调用这个方法
// TriggerListener 给了一个选择去否决 Job 的执行,如果返回true时,这个任务不会被触发
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 222222 " + triggerName + " was not vetoed");
return false;
}
//Trigger 错过触发时调用
public void triggerMisfired(Trigger trigger) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 333333 " + triggerName + " misfired");
}
//Trigger 被触发并且完成了 Job 的执行时,Scheduler 调用这个方法
public void triggerComplete(Trigger trigger, JobExecutionContext context,
Trigger.CompletedExecutionInstruction triggerInstructionCode) {
String triggerName = trigger.getKey().getName();
System.out.println("Method 444444 " + triggerName + " is complete");
System.out.println("------------");
}
}
public class MyTriggerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建并注册一个全局的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener1"), EverythingMatcher.allTriggers());
// 创建并注册一个局部的Trigger Listener
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener2"), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "gourp1")));
// 创建并注册一个特定组的Trigger Listener
GroupMatcher<TriggerKey> matcher = GroupMatcher.triggerGroupEquals("gourp1");
scheduler.getListenerManager().addTriggerListener(new MyTriggerListener("myListener3"), matcher);
scheduler.start();
}
}
SchedulerListener
public class MySchedulerListener implements SchedulerListener {
public void jobScheduled(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
System.out.println( jobName + " has been scheduled");
}
public void jobUnscheduled(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being unscheduled");
}
public void triggerFinalized(Trigger trigger) {
System.out.println("Trigger is finished for " + trigger.getJobKey().getName());
}
public void triggerPaused(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being paused");
}
public void triggersPaused(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being paused");
}
public void triggerResumed(TriggerKey triggerKey) {
System.out.println(triggerKey + " is being resumed");
}
public void triggersResumed(String triggerGroup) {
System.out.println("trigger group "+triggerGroup + " is being resumed");
}
public void jobAdded(JobDetail jobDetail) {
System.out.println(jobDetail.getKey()+" is added");
}
public void jobDeleted(JobKey jobKey) {
System.out.println(jobKey+" is deleted");
}
public void jobPaused(JobKey jobKey) {
System.out.println(jobKey+" is paused");
}
public void jobsPaused(String jobGroup) {
System.out.println("job group "+jobGroup+" is paused");
}
public void jobResumed(JobKey jobKey) {
System.out.println(jobKey+" is resumed");
}
public void jobsResumed(String jobGroup) {
System.out.println("job group "+jobGroup+" is resumed");
}
public void schedulerError(String msg, SchedulerException cause) {
System.out.println(msg + cause.getUnderlyingException().getStackTrace());
}
public void schedulerInStandbyMode() {
System.out.println("scheduler is in standby mode");
}
public void schedulerStarted() {
System.out.println("scheduler has been started");
}
public void schedulerStarting() {
System.out.println("scheduler is being started");
}
public void schedulerShutdown() {
System.out.println("scheduler has been shutdown");
}
public void schedulerShuttingdown() {
System.out.println("scheduler is being shutdown");
}
public void schedulingDataCleared() {
System.out.println("scheduler has cleared all data");
}
}
public class MySchedulerListenerTest {
public static void main(String[] args) throws SchedulerException {
// JobDetail
JobDetail jobDetail = JobBuilder.newJob(MyJob1.class).withIdentity("job1", "group1").build();
// Trigger
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()).build();
// SchedulerFactory
SchedulerFactory factory = new StdSchedulerFactory();
// Scheduler
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
// 创建Scheduler Listener
scheduler.getListenerManager().addSchedulerListener(new MySchedulerListener());
scheduler.start();
}
}
5)JobStore
最多可以运行多少个任务(磁盘、内存、线程数)?
Jobstore 用来存储任务和触发器相关的信息,例如所有任务的名称、数量、状态等等
Quartz 中有两种存储任务的方式:一种存在内存,一种是存在数据库
RAMJobStore
Quartz 默认的 JobStore 是 RAMJobstore,也就是把任务和触发器信息运行的信息存储在内存中,用到了 HashMap、TreeSet、HashSet
等等数据结构
如果程序崩溃或重启,所有存储在内存中的数据都会丢失,所以我们需要把这些数据持久化到磁盘
JDBCJobStore
JDBCJobStore 可以通过 JDBC 接口,将任务运行数据保存在数据库中
JDBC 的实现方式有两种,JobStoreSupport 类的两个子类:
JobStoreTX:在独立的程序中使用,自己管理事务,不参与外部事务
JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事务时,可以使用他
使用JDBCJobStore时,需要配置数据的信息:
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 使用 quartz.properties,不使用默认配置
org.quartz.jobStore.useProperties:true
#数据库中 quartz 表的表名前缀
org.quartz.jobStore.tablePrefix:QRTZ_
org.quartz.jobStore.dataSource:myDS
#配置数据源
org.quartz.dataSource.myDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL:jdbc:mysql://localhost:3306/vincent?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user:root
org.quartz.dataSource.myDS.password:123456
org.quartz.dataSource.myDS.validationQuery=select 0 from dual
问题来了?需要建什么表?表里面有什么字段?字段类型和长度是什么?
在官网的 Downloads 链接中,提供了 11 张表的建表语句
2.3 的版本在这个路径下:src\org\quartz\impl\jdbcjobstore
表名和作用:
QRTZ_BLOB_TRIGGERS:Trigger 作为 Blob 类型存储
QRTZ_CALENDARS:存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS:存储 CronTrigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS:存储与已触发的 Trigger 相关的状态信息,以及相关 Job 的执行信息
QRTZ_JOB_DETAILS:存储每一个已配置的 Job 的详细信息
QRTZ_LOCKS:存储程序的悲观锁的信息
QRTZ_PAUSED_TRIGGER_GRPS:存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE:存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例
QRTZ_SIMPLE_TRIGGERS:存储 SimpleTrigger 的信息,包括重复次数、间隔、以及已触的次数
QRTZ_SIMPROP_TRIGGERS:存储 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 两种类型的触发器
QRTZ_TRIGGERS:存储已配置的 Trigger 的信息
Quartz集成到Spring
Spring 在 spring-context-support.jar
中直接提供了对Quartz
的支持
可以在配置文件中把 JobDetail、Trigger、Scheduler 定义成 Bean,交给Spring去管理
1)定义Job
<bean name="myJob1" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="name" value="my_job_1"/>
<property name="vincent" value="my_group"/>
<property name="jobClass" value="com.vincent.quartz.MyJob1"/>
<property name="durability" value="true"/>
</bean>
2)定义Trigger
<bean name="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
<property name="name" value="my_trigger_1"/>
<property name="group" value="my_group"/>
<property name="jobDetail" ref="myJob1"/>
<property name="startDelay" value="1000"/>
<property name="repeatInterval" value="5000"/>
<property name="repeatCount" value="2"/>
</bean>
3)定义Scheduler
<bean name="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="simpleTrigger"/>
<ref bean="cronTrigger"/>
</list>
</property>
</bean>
既然可以在配置文件配置,当然也可以用@Bean 注解配置。在配置类上加上@Configuration 让 Spring 读取到
public class QuartzConfig {
@Bean
public JobDetail printTimeJobDetail(){
return JobBuilder.newJob(MyJob1.class)
.withIdentity("vincentJob")
.usingJobData("vincent", "只为更好的你")
.storeDurably()
.build();
}
@Bean
public Trigger printTimeJobTrigger() {
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
return TriggerBuilder.newTrigger()
.forJob(printTimeJobDetail())
.withIdentity("quartzTaskService")
.withSchedule(cronScheduleBuilder)
.build();
}
}
public class QuartzTest {
private static Scheduler scheduler;
public static void main(String[] args) throws SchedulerException {
// 获取容器
ApplicationContext ac = new ClassPathXmlApplicationContext("spring_quartz.xml");
// 从容器中获取调度器
scheduler = (StdScheduler) ac.getBean("scheduler");
// 启动调度器
scheduler.start();
}
}
动态调度的实现
1)配置管理
用最简单的数据库的实现
问题 1:建一张什么样的表?参考 JobDetail 的属性
CREATE TABLE `sys_job` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`job_name` varchar(512) NOT NULL COMMENT '任务名称',
`job_group` varchar(512) NOT NULL COMMENT '任务组名',
`job_cron` varchar(512) NOT NULL COMMENT '时间表达式',
`job_class_path` varchar(1024) NOT NULL COMMENT '类路径,全类型',
`job_data_map` varchar(1024) DEFAULT NULL COMMENT '传递 map 参数',
`job_status` int(2) NOT NULL COMMENT '状态:1 启用 0 停用',
`job_describe` varchar(1024) DEFAULT NULL COMMENT '任务功能描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8;
2)数据操作与任务调度
操作数据表非常简单,SSM 增删改查
但是在修改了表的数据之后,怎么让调度器知道呢?
调度器的接口:Scheduler
在我们的需求中,我们需要做的事情:
1、 新增一个任务
2、 删除一个任务
3、 启动、停止一个任务
4、 修改任务的信息(包括调度规律)
因 此 可 以 把 相 关 的 操 作 封 装 到 一 个 工 具 类 中:
public class SchedulerUtil {
private static Logger logger = LoggerFactory.getLogger(SchedulerUtil.class);
/**
* 新增定时任务
* @param jobClassName 类路径
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @param jobDataMap 需要传递的参数
* @throws Exception
*/
public static void addJob(String jobClassName,String jobName, String jobGroupName, String cronExpression,String jobDataMap) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 启动调度器
scheduler.start();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass())
.withIdentity(jobName, jobGroupName).build();
// JobDataMap用于传递任务运行时的参数,比如定时发送邮件,可以用json形式存储收件人等等信息
if (StringUtils.isNotEmpty(jobDataMap)) {
JSONObject jb = JSONObject.parseObject(jobDataMap);
Map<String, Object> dataMap =(Map<String, Object>) jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("创建定时任务失败" + e);
throw new Exception("创建定时任务失败");
}
}
/**
* 停用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobPause(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 启用一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobresume(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 删除一个定时任务
* @param jobName 任务名称
* @param jobGroupName 组别
* @throws Exception
*/
public static void jobdelete(String jobName, String jobGroupName) throws Exception {
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName));
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
}
/**
* 更新定时任务表达式
* @param jobName 任务名称
* @param jobGroupName 组别
* @param cronExpression Cron表达式
* @throws Exception
*/
public static void jobReschedule(String jobName, String jobGroupName, String cronExpression) throws Exception {
try {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).startNow().build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (SchedulerException e) {
System.out.println("更新定时任务失败" + e);
throw new Exception("更新定时任务失败");
}
}
/**
* 检查Job是否存在
* @throws Exception
*/
public static Boolean isResume(String jobName, String jobGroupName) throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
Boolean state = scheduler.checkExists(triggerKey);
return state;
}
/**
* 暂停所有任务
* @throws Exception
*/
public static void pauseAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
scheduler.pauseAll();
}
/**
* 唤醒所有任务
* @throws Exception
*/
public static void resumeAlljob() throws Exception {
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
sched.resumeAll();
}
/**
* 获取Job实例
* @param classname
* @return
* @throws Exception
*/
public static BaseJob getClass(String classname) throws Exception {
try {
Class<?> c = Class.forName(classname);
return (BaseJob) c.newInstance();
} catch (Exception e) {
throw new Exception("类["+classname+"]不存在!");
}
}
}
3)容器启动与Service注入
a)容器启动
任务没有定义在 ApplicationContext.xml 中,而是放到了数据库中,SpringBoot 启动时,怎么读取任务信息?怎么在 Spring 启动完成的时候做一些事情?
创建一个类,实现 CommandLineRunner
接口,实现 run
方法
从表中查出状态是 1 的任务,然后构建
@Component
public class InitStartSchedule implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ISysJobService sysJobService;
@Autowired
private MyJobFactory myJobFactory;
@Override
public void run(String... args) throws Exception {
/**
* 用于程序启动时加载定时任务,并执行已启动的定时任务(只会执行一次,在程序启动完执行)
*/
//查询job状态为启用的
HashMap<String,String> map = new HashMap<String,String>();
map.put("jobStatus", "1");
List<SysJob> jobList= sysJobService.querySysJobList(map);
if( null == jobList || jobList.size() ==0){
logger.info("系统启动,没有需要执行的任务... ...");
}
// 通过SchedulerFactory获取一个调度器实例
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler scheduler = sf.getScheduler();
// 如果不设置JobFactory,Service注入到Job会报空指针
scheduler.setJobFactory(myJobFactory);
// 启动调度器
scheduler.start();
for (SysJob sysJob:jobList) {
String jobClassName=sysJob.getJobName();
String jobGroupName=sysJob.getJobGroup();
//构建job信息
JobDetail jobDetail = JobBuilder.newJob(getClass(sysJob.getJobClassPath()).getClass()).withIdentity(jobClassName, jobGroupName).build();
if (StringUtils.isNotEmpty(sysJob.getJobDataMap())) {
JSONObject jb = JSONObject.parseObject(sysJob.getJobDataMap());
Map<String, Object> dataMap = (Map<String, Object>)jb.get("data");
for (Map.Entry<String, Object> m:dataMap.entrySet()) {
jobDetail.getJobDataMap().put(m.getKey(),m.getValue());
}
}
//表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(sysJob.getJobCron());
//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobClassName, jobGroupName)
.withSchedule(scheduleBuilder).startNow().build();
// 任务不存在的时候才添加
if( !scheduler.checkExists(jobDetail.getKey()) ){
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
logger.info("\n创建定时任务失败"+e);
throw new Exception("创建定时任务失败");
}
}
}
}
public static BaseJob getClass(String classname) throws Exception
{
Class<?> c= Class.forName(classname);
return (BaseJob)c.newInstance();
}
}
b)Service类注入到Job中
Spring Bean 如何注入到实现了 Job 接口的类中?
如果没有任何配置,注入会报空指针异常
因为定时任务Job
对象的实例化过程是在 Quartz 中进行的,而 Service Bean
是由Spring 容器管理的,Quartz 察觉不到 Service Bean 的存在,所以无法将 Service Bean装配到 Job 对象中
分析:
Quartz 集成到 Spring
中,用到 SchedulerFactoryBean
,其实现了 InitializingBean
方法,在唯一的方法 afterPropertiesSet()
在 Bean 的属性初始化后调用
调度器用 AdaptableJobFactory
对 Job 对象进行实例化,如果我们可以把这个 JobFactory
指定为我们自定义的工厂的话,就可以在 Job 实例化完成之后,把 Job纳入到 Spring 容器中管理
解决:
1)定义一个 AdaptableJobFactory
,实现 JobFactory
接口,实现接口定义的newJob
方法,在这里面返回 Job
实例
public class AdaptableJobFactory implements JobFactory {
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler arg1) throws SchedulerException {
return newJob(bundle);
}
public Job newJob(TriggerFiredBundle bundle) throws SchedulerException {
try {
// 返回Job实例
Object jobObject = createJobInstance(bundle);
return adaptJob(jobObject);
}
catch (Exception ex) {
throw new SchedulerException("Job instantiation failed", ex);
}
}
// 通过反射的方式创建实例
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Method getJobDetail = bundle.getClass().getMethod("getJobDetail");
Object jobDetail = ReflectionUtils.invokeMethod(getJobDetail, bundle);
Method getJobClass = jobDetail.getClass().getMethod("getJobClass");
Class jobClass = (Class) ReflectionUtils.invokeMethod(getJobClass, jobDetail);
return jobClass.newInstance();
}
protected Job adaptJob(Object jobObject) throws Exception {
if (jobObject instanceof Job) {
return (Job) jobObject;
}
else if (jobObject instanceof Runnable) {
return new DelegatingJob((Runnable) jobObject);
}
else {
throw new IllegalArgumentException("Unable to execute job class [" + jobObject.getClass().getName() +
"]: only [org.quartz.Job] and [java.lang.Runnable] supported.");
}
}
}
2)定义一个MyJobFactory,继承AdaptableJobFactory
@Component
public class MyJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
3)指定Scheduler的JobFactory为自定义的JobFactory
scheduler.setJobFactory(myJobFactory);
Quartz集群部署
1)为什么要集群
a)防止单点故障,减少对业务的影响
b)减少节点的压力,例如在 11 点要触发 5000 个任务,如果有 10 个节点,则每个节点之需要执行 500 个任务
2)集群需要解决的问题
a)任务重跑,因为节点部署的内容是一样的,到 10 点的时候,每个节点都会执行相同的操作,引起数据混乱,比如跑批,绝对不能执行多次
b)任务漏跑,假如任务是平均分配的,本来应该在某个节点上执行的任务,因为节点故障,一直没有得到执行
c)水平集群需要注意时间同步问题
d)Quartz 使用的是随机的负载均衡算法,不能指定节点执行
所以必须要有一种共享数据或者通信的机制,在分布式系统的不同节点中,我们可以采用什么样的方式,实现数据共享?
两两通信,或者基于分布式的服务,实现数据共享
例如:ZK、Redis、DB
在 Quartz 中,提供了一种简单的方式,基于数据库共享任务执行信息。也就是说,一个节点执行任务的时候,会操作数据库,其他的节点查询数据库,便可以感知到了
3)集群配置与验证
quartz.properties 配置。
四个配置:集群实例 ID、集群开关、数据库持久化、数据源信息
注意先清空 quartz 所有表、改端口、两个任务频率改成一样
验证 1:先后启动 2 个节点,任务是否重跑
验证 2:停掉一个节点,任务是否漏跑
Quartz调度原理
带着问题看源码:
Job 没有继承 Thread 和实现 Runnable,是怎么被调用的?通过反射还是什么?
任务是什么时候被调度的?是谁在监视任务还是监视 Trigger?
任务是怎么被调用的?谁执行了任务?
任务本身有状态吗?还是触发器有状态?
源码入口:
Scheduler scheduler = factory.getScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
1)获取调度器实例
a、读取配置文件
public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
// 读取 quartz.properties 配置文件
initialize();
}
// 这个类是一个 HashMap,用来基于调度器的名称保证调度器的唯一性
SchedulerRepository schedRep = SchedulerRepository.getInstance();
Scheduler sched = schedRep.lookup(getSchedulerName());
// 如果调度器已经存在了
if (sched != null) {
// 调度器关闭了,移除
if (sched.isShutdown()) {
schedRep.remove(getSchedulerName());
} else {
// 返回调度器
return sched;
}
}
// 调度器不存在,初始化
sched = instantiate();
return sched;
}
instantiate()方法中做了初始化的所有工作:
// 存储任务信息的 JobStore
JobStore js = null;
// 创建线程池,默认是 SimpleThreadPool
ThreadPool tp = null;
// 创建调度器
QuartzScheduler qs = null;
// 连接数据库的连接管理器
DBConnectionManager dbMgr = null;
// 自动生成 ID
// 创建线程执行器,默认为 DefaultThreadExecutor
ThreadExecutor threadExecutor;
b、创建线程池(包工头)
创建了一个线程池,默认是配置文件中指定的SimpleThreadPool
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
SimpleThreadPool
里面维护了三个 list,分别存放所有的工作线程、空闲的工作线程和忙碌的工作线程,我们可以把 SimpleThreadPool
理解为包工头
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
tp 的 runInThread()方法是线程池运行线程的接口方法。参数 Runnable 是执行的任务内容,取出 WorkerThread 去执行参数里面的 runnable
(JobRunShell)
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
c、WorkerThread(工人)
WorkerThread
是 SimpleThreadPool的内部类 , 用来执行任务 ,我 们 把WorkerThread理解为工人。在WorkerThread的run
方法中,执行传入的参数runnable任务:runnable.run();
d、创建调度线程(项目经理)
创建了调度器 QuartzScheduler:
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
在 QuartzScheduler
的构造函数中,创建了QuartzSchedulerThread
,我们把它理解为项目经理,它会调用包工头的工人资源,给他们安排任务
并且创建了线程执行器 schedThreadExecutor , 执 行 了 这 个QuartzSchedulerThread
,也就是调用了它的 run
方法
// 创建一个线程,resouces 里面有线程名称
this.schedThread = new QuartzSchedulerThread(this, resources);
// 线程执行器
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//执行这个线程,也就是调用了线程的 run 方法
schedThreadExecutor.execute(this.schedThread);
在QuartzSchedulerThread 类,找到 run 方法,这个是 Quartz 任务调度的核心方法:
public void run() {
boolean lastAcquireFailed = false;
// 检查 scheuler 是否为停止状态
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
// 检查是否为暂停状态
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
// 暂停的话会尝试去获得信号锁,并 wait 一会
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
//从线程池获取可用的线程
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 获取需要下次执行的 triggers
// idleWaitTime: 默认 30s
// availThreadCount:获取可用(空闲)的工作线程数量,总会大于 1,因为该方法会一直阻塞,直到有工作线程空闲下来。
//maxBatchSize:一次拉取 trigger 的最大数量,默认是 1
//batchTimeWindow:时间窗口调节参数,默认是 0
//misfireThreshold: 超过这个时间还未触发的 trigger,被认为发生了 misfire,默认 60s
//调度线程一次会拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个 triggers,默认情况下,会拉取未来 30s、过去 60s 之间还未 fire 的 1 个 trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
//触发 Trigger,把 ACQUIRED 状态改成 EXECUTING
//如果这个 trigger 的 NEXTFIRETIME 为空,也就是未来不再触发,就将其状态改为COMPLETE
//如果trigger不允许并发执行(即Job的实现类标注了@DisallowConcurrentExecution),则将状态变为 BLOCKED,否则就将状态改为 WAITING
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
//循环处理 Trigger
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
//根据 trigger 信息实例化JobRunShell(implements Runnable),同时依据JOB_CLASS_NAME 实例化 Job,随后我们将 JobRunShell 实例丢入工作线。
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
// 执行 JobRunShell 的 run 方法
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
JobRunShell
的作用:
JobRunShell
用来为 Job 提供安全的运行环境的,执行 Job 中所有的作业,捕获运行中的异常,在任务执行完毕的时候更新 Trigger 状态等等
JobRunShell 实例是用 JobRunShellFactory
为 QuartzSchedulerThread
创建的,在调度器决定一个 Job 被触发的时候,它从线程池中取出一个线程来执行任务
e、线程模型总结
SimpleThreadPool:包工头,管理所有 WorkerThread
WorkerThread:工人,把 Job 包装成 JobRunShell
,执行
QuartSchedulerThread:项目经理,获取即将触发的 Trigger,从包工头拿出拿到worker,执行 Trigger 绑定的任务
2)绑定JobDetail和Trigger
// 存储 JobDetail 和 Trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知相关的 Listener
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
3)启动调度器
// 通知监听器
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
resources.getJobStore().schedulerResumed();
}
// 通知 QuartzSchedulerThread 不再等待,开始干活
schedThread.togglePause(false);
// 通知监听器
notifySchedulerListenersStarted();
4)源码总结
getScheduler
方法创建线程池 ThreadPool
,创建调度器 QuartzScheduler
,创建调度线程 QuartzSchedulerThread
,调度线程初始处于暂停状态
scheduleJob 将任务添加到 JobStore
中
scheduler.start()
方法激活调度器,QuartzSchedulerThread 从 timeTrriger
取出待触发的任务,并包装成 TriggerFiredBundle,然后由 JobRunShellFactory 创建TriggerFiredBundle 的 执 行 线 程 JobRunShell , 调 度 执 行 通 过 线 程 池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是任务类的execute方法:job.execute(JobExecutionContext context)
5)集群原理
基于数据库,如何实现任务的不重跑不漏跑?
问题 1:如果任务执行中的资源是“下一个即将触发的任务”,怎么基于数据库实现这个资源的竞争?
问题 2:怎么对数据的行加锁?
QuartzSchedulerThread 获取下一个即将触发的 Trigger:
triggers = qsRsrcs.getJobStore().acquireNextTriggers()
调用 JobStoreSupport 的 acquireNextTriggers()方法
调用 JobStoreSupport.executeInNonManagedTXLock()方法
return executeInNonManagedTXLock(lockName,
尝试获得锁:
transOwner = getLockHandler().obtainLock(conn, lockName);
调用 DBSemaphore 的 obtainLock()方法:
public boolean obtainLock(Connection conn, String lockName)
throws LockException {
if (!isLockOwner(lockName)) {
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
调用 StdRowLockSemaphore 的 executeSQL()方法
最终用 JDBC 执行 SQL,语句内容是 expandedSQL 和 expandedInsertSQL
ps = conn.prepareStatement(expandedSQL);
问题:expandedSQL 和 expandedInsertSQL 是一条什么 SQL 语句?似乎我们没有赋值?
在 StdRowLockSemaphore 的构造函数中,把定义的两条 SQL 传进去:
public StdRowLockSemaphore() {
super(DEFAULT_TABLE_PREFIX, null, SELECT_FOR_LOCK, INSERT_LOCK);
}
public static final String SELECT_FOR_LOCK = "SELECT * FROM "+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" + SCHED_NAME_SUBST + ", ?)";
它调用了父类 DBSemaphore 的构造函数:
public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
this.tablePrefix = tablePrefix;
this.schedName = schedName;
setSQL(defaultSQL);
setInsertSQL(defaultInsertSQL);
}
在 setSQL()和 setInsertSQL()中为 expandedSQL 和expandedInsertSQL 赋值
执行的 SQL 语句:
select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
在执行官方的建表脚本的时候,QRTZ_LOCKS 表,它会为每个调度器创建两行数据,获取 Trigger 和触发 Trigger 是两把锁:
6)任务为什么重复执行
有多个调度器,任务没有重复执行,也就是默认会加锁,什么情况下不会上锁呢?
JobStoreSupport
的 executeInNonManagedTXLock()
方法,如果 lockName
为空,则不上锁
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
if (conn == null) {
conn = getNonManagedTXConnection();
}
而 上 一 步 JobStoreSupport
的 acquireNextTriggers()
方 法 , 如 果isAcquireTriggersWithinLock()
值是 false 并且 maxCount>1
的话,lockName 赋值为null
,否则赋值为 LOCK_TRIGGER_ACCESS
,这种情况获取 Trigger 下默认不加锁
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
}
acquireTriggersWithinLock 默认是空的:
private boolean acquireTriggersWithinLock = false;
maxCount 来自 QuartzSchedulerThread:
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),qsRsrcs.getBatchTimeWindow());
getMaxBatchSize()
来自 QuartzSchedulerResources
,代表 Scheduler 一次拉取trigger 的最大数量,默认是 1:
private int maxBatchSize = 1;
这个值可以通过参数修改:
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=50
理论上把 batchTriggerAcquisitionMaxCount 的值改掉以后,在获取 Trigger 的时候就不会再上锁了,但是实际上为什么没有出现频繁的重复执行问题?
因为每个调度器的线程持有锁的时间太短了
QuartzSchedulerThread 的 triggersFired()方法:
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
调用了 JobStoreSupport 的 triggersFired()方法,接着又调用了一个 triggerFired,triggerFired(Connection conn, OperableTrigger trigger)
方法:
如果 Trigger 的状态不是 ACQUIRED,也就是说被其他的线程 fire 了,返回空。但是这种乐观锁的检查在高并发下难免会出现 ABA 的问题,比如线程 A 拿到的时候还是ACQUIRED 状态,但是刚准备执行的时候已经变成了 EXECUTING 状态,这个时候就会出现重复执行的问题
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
总结:
如果设置的数量>1,并且使用 JDBC JobStore
(RAMJobStore 不支持分布式,只有 一 个 调 度 器 实 例 , 所 以 不 加 锁 ) , 则 属 性org.quartz.jobStore.acquireTriggersWithinLock
应设置为 true,否则不加锁会导致任务重复执行
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1
org.quartz.jobStore.acquireTriggersWithinLock=true
Quartz-Misfire
什么情况下错过触发?
示例:线程池只有 5 个线程,当有 5 个任务都在执行的时候,第六个任务即将触发,这个时候任务就不能得到执行,在 quartz.properties
有一个属性 misfireThreshold
,用来定义触发器超时的"临界值",也就是超过了这个时间,就算错过触发了
例如,如果 misfireThreshold 是 60000(60 秒),9 点整应该执行的任务,9 点零1 分还没有可用线程执行它,就会超时(misfires)
可能造成 misfired job的原因:
1、 没有可用线程
2、 Trigger 被暂停
3、 系统重启
4、 禁止并发执行的任务在到达触发时间时,上次执行还没有结束
错过触发怎么办?
Misfire 策略设置,每一种 Trigger 都定义了自己的 Misfire 策略,不同的策略通过不同的方法来设置
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionNowWithExistingCount()
.withIntervalInSeconds(1)
.repeatForever()).build();
一般来说有 3 种:
1、 忽略
2、 立即跑一次
3、 下次跑
文章参考:
Quartz Scheduler misfireThreshold属性的意义与触发器超时后的处理策略
怎么避免任务错过触发?
合理地设置线程池数量,以及任务触发间隔