Quartz应用和集群原理分析:
使用的环境版本:spring4.x+quartz2.2.x
****1.1 如何在spring中集成quartz集群****
1.1.1 基于maven项目,需要在pom.xml引入的j依赖为:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
1.1.2 Quartz集群的基本配置信息:命名为quartz.properties
#调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#远程管理相关的配置,全部关闭
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
ThreadPool 实现的类名
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
#线程数量
org.quartz.threadPool.threadCount: 10
#线程优先级
org.quartz.threadPool.threadPriority: 5
#自创建父线程
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
#容许的最大作业延
org.quartz.jobStore.misfireThreshold: 60000
#ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceId: AUTO
#数据保存方式为持久化
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
#加入集群
org.quartz.jobStore.isClustered: true
#调度实例失效的检查时间间隔
org.quartz.jobStore.clusterCheckinInterval: 10000
1.1.3 在项目中加入Quartz的初始化信息: 命名spring-quartz.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<bean id="quartzScheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<!-- 自定义的bean注入类,解决job里面无法注入spring的service的问题 -->
<property name="jobFactory">
<bean class="com.fc.sales.control.statistics.job.SpringBeanJobFactory" />
</property>
<!-- quartz的数据源 -->
<property name="dataSource" ref="quartz" />
<!-- quartz的基本配置信息引入 -->
<property name="configLocation" value="classpath:quartz.properties"/>
<!-- 调度标识名 -->
<property name="schedulerName" value="DefaultQuartzScheduler" />
<!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动 -->
<property name="startupDelay" value="30" />
<!-- 通过applicationContextSchedulerContextKey属性配置spring上下文 -->
<property name="applicationContextSchedulerContextKey" value="applicationContextKey" />
<!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
<property name="overwriteExistingJobs" value="true" />
<!-- 设置自动启动 -->
<property name="autoStartup" value="true" />
<!-- 注册触发器 -->
<property name="triggers">
<list>
<ref bean="orderSyncScannerTrigger" />
</list>
</property>
<!-- 注册jobDetail -->
<property name="jobDetails">
<list>
<ref bean="orderSyncDetail" />
</list>
</property>
</bean>
<!--配置调度具体执行的方法-->
<bean id="orderSyncDetail"
class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.fc.sales.control.statistics.job.OrderSyncJob"/>
<property name="durability" value="true" />
<property name="requestsRecovery" value="true" />
</bean>
<!--配置调度执行的触发的时间-->
<bean id="orderSyncScannerTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="orderSyncDetail" />
<property name="cronExpression">
<!-- 每天上午00:30点执行任务调度 -->
<value>0 30 00 * * ?</value>
</property>
</bean>
- 1.1.4 在web.xml启动项中加入spring-quartz.xml文件*
<servlet>
<servlet-name>dispatcherServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:/spring/spring-quartz.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
- 1.1.5 附上对应的解决无法注入的jobFactory的代码*:
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
/**
* Created by lyndon on 16/9/13.
*/
@Component
public class jobFactory extends AdaptableJobFactory {
//这个对象Spring会帮我们自动注入进来,也属于Spring技术范畴.
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//进行注入,这属于Spring的技术,不清楚的可以查看Spring的API.
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
****1.2 quartz框架实现分布式定时任务的原理****;
Quartz集群中每个节点都是一个单独的Quartz应用,它又管理着其他的节点。这个集群需要每个节点单独的启动或停止;和我们的应用服务器集群不同,独立的Quratz节点之间是不需要 通信的。不同节点之间是通过数据库表来感知另一个应用。只有使用持久的JobStore才能完成Quartz集群。
- 1.2.1 既然Quartz分布式集群是利用数据库锁机制来实现集群环境下的并发控制,我们就需要了解Quratz的数据库表:可以去官方现在对于版本的sql文件导入。
1.2.2 Quartz线程模型:
Quartz中有两类线程:Scheduler调度线程和任务执行线程。任务执行线程: Quartz不会在主线程(QuartzSchedulerThread)中处理用户job。Quratz是将线程管理的职责委托给ThreadPool,一般的设置使用SimpleThreadPool,SimpleThreadPool创建一定数量的工作线程(WorkerThread),当然这样就意味所有的线程都是异步操作的,所以我们在工作线程的job里面实现业务的时候是没必要重新去创建一个新的线程的,在Quartz创建工作线程的时候已经完成了异步任务的创建。
Scheduler调度线程:QuartzScheduler被创建的时候会创建一个QuratzSchedulerThread实例。
1.2.3 Quartz源码分析:
QuartzSchedulerThreand包含有决定何时下一个Job将被触发的处理循环,主要的逻辑在其的run()方法中,如下图:
由此可知,QuartzSchedulerThread不断的在获取trigger,触发trigger,释放trigger。
那么具体又是如何获取trigger的呢,可以从上面的源码中可以发现:qsRsrcs.getJobStore()返回对象是JobStore ,具体的集群配置参考1.1.2. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
JobStoreTx继承自JobStoreSupport,而JobStoreSupport的acquireNextTrigger,triggerFired,releaseAcquiredTrigger方法负责具体trigger相关操作,都必须获得TRIGGER-ACCESS锁。核心逻辑在executeInNonManagedTxLock方法中。
由上代码可知Quartz集群基于数据库锁的同步操作流程如下图所示: