上一篇已经把分布式定时任务组件的基本原理
讲述了一下,这次咱们直接实现。
话不多说,直接上代码:
编译工具用的是gradle,和maven的功能差不多,下面是依赖:
compileOnly 'org.projectlombok:lombok'
compile 'org.quartz-scheduler:quartz:2.3.0'
compile 'org.apache.commons:commons-lang3:3.9'
compile 'com.google.guava:guava:27.1-jre'
实现定时任务的框架用的是quartz,不过没有直接使用quartz的分布式调度策略,而是自定义的分布式调度策略
下面是用来对job添加,修改和删除的一个管理器
import com.github.dynamicschedule.support.JobDefinition;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Objects;
@Slf4j
public class DefaultJobManager {
private static Scheduler scheduler;
static {
try {
scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
} catch (SchedulerException e) {
log.error("scheduler start fail", e);
}
}
/** 指定在jobDetail中存放数据的key值 */
public static final String DATA_KEY = "task";
/** 默认的Group */
private static final String DEFAULT_GROUP = "default_group";
/**
* 添加job
*
* @param task
*/
public static void addJob(JobDefinition task) {
try {
Scheduler scheduler = stdSchedulerFactory.getScheduler();
String jobName = task.getJobName();
String jobGroup = task.getJobGroup();
String cronExpress = task.getCronExpression();
jobGroup = StringUtils.isBlank(jobGroup) ? DEFAULT_GROUP : jobGroup;
JobKey jobKey = new JobKey(jobName, jobGroup);
TriggerKey triggerKey = new TriggerKey(jobName, jobGroup);
CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpress);
// 如果不存在cronTrigger,就创建一个新的cronTrigger
if (Objects.isNull(cronTrigger)) {
boolean concurrent = task.isConcurrent();
JobDetail jobDetail =
JobBuilder.newJob(
concurrent
? DefaultExecution.class
: DisallowConcurrentExecution.class)
.withIdentity(jobKey)
.build();
jobDetail.getJobDataMap().put(DATA_KEY, task);
cronTrigger =
TriggerBuilder.newTrigger()
.withSchedule(scheduleBuilder)
.withIdentity(triggerKey)
.build();
scheduler.scheduleJob(jobDetail, cronTrigger);
} else {
// 更新定时策略
scheduler.getJobDetail(jobKey).getJobDataMap().put(DATA_KEY, task);
cronTrigger =
cronTrigger
.getTriggerBuilder()
.withIdentity(triggerKey)
.withSchedule(scheduleBuilder)
.build();
scheduler.rescheduleJob(triggerKey, cronTrigger);
}
} catch (Exception e) {
log.error("addJob失败", e);
}
}
/**
* 删除任务
*
* @param task
*/
public static void deleteJob(JobDefinition task) {
try {
String jobName = task.getJobName();
String jobGroup = task.getJobGroup();
jobGroup = StringUtils.isBlank(jobGroup) ? DEFAULT_GROUP : jobGroup;
JobKey jobKey = new JobKey(jobName, jobGroup);
scheduler.deleteJob(jobKey);
} catch (Exception e) {
log.error("deleteJob失败", e);
}
}
}
自定义的任务描述对象
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public class JobDefinition {
/** 任务ID(要求全局唯一,包括集群范围内) */
private String jobId;
/** 任务名称 */
private String jobName;
/** 任务分组 */
private String jobGroup;
/** 任务描述 */
private String description;
/** 执行策略 */
private String cronExpression;
/** 任务状态 */
private int status;
/** 假设时间间隔很短,上一次任务还没执行完毕,是否并行执行这次任务 */
private boolean isConcurrent;
/** 类名 */
private String className;
/** spring容器中的bean名称 */
private String springId;
/** 方法名称 */
private String methodName;
/** 方法参数(只能是json) */
private String methodArg;
/** 分布式锁对象的名字 */
private String lockObjectName;
/** 创建时间 */
private long createTime;
/** 更新时间 */
private long updateTime;
}
自定义的job执行器,分别定义两个不同的执行器。
第一个执行器:如果任务执行时间超过间隔时间,允许两个任务都执行。
第二个执行器:如果任务执行时间超过间隔时间,不允许两个任务都执行,必须当前正在执行的任务执行完毕后,才能执行下面一个任务。
import com.github.dynamicschedule.support.JobDefinition;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
public class DefaultExecution implements Job {
@Override
public void execute(JobExecutionContext context) {
JobDefinition task = (JobDefinition) context.getMergedJobDataMap().get(DefaultJobManager.DATA_KEY);
JobActuator.invoke(task);
}
}
@org.quartz.DisallowConcurrentExecution
public class DisallowConcurrentExecution extends DefaultExecution {}
通过反射的原理调用JobDefinition中指定的对象方法
import com.github.dynamicschedule.lock.DistributedLock;
import com.github.dynamicschedule.support.JobDefinition;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
@Slf4j
public class JobActuator {
public static void invoke(JobDefinition jobDefinition) {
// 是否开启集群模式
boolean enableCluster =
Objects.equals(
DefaultJobDefinitionLoader.getProperty("dynamicschedule.cluster.enabled"),
"true");
// 分布式锁的名字
String lockObjectName = jobDefinition.getLockObjectName();
// 获取锁对象
DistributedLock distributedLock = null;
if (enableCluster && Objects.isNull(distributedLock = getLockObject(lockObjectName))) {
return;
}
String jobId = jobDefinition.getJobId();
String springId = jobDefinition.getSpringId();
String methodName = jobDefinition.getMethodName();
String methodArgs = jobDefinition.getMethodArg();
/** 集群 */
if (enableCluster) {
// 获取锁就开始执行,没获取锁就不执行
if (distributedLock.tryLock(jobId)) {
try {
invoke(springId, methodName, methodArgs);
} catch (NoSuchMethodException e) {
log.error("无效的methodName:" + methodName, e);
} catch (Exception e) {
log.error("执行定时任务失败", e);
log.error("id:{}, springId:{}, methodName:{}", jobId, springId, methodName);
} finally {
// 解锁
distributedLock.tryUnLock(jobId);
}
}
} else {
// 单机
try {
invoke(springId, methodName, methodArgs);
} catch (NoSuchMethodException e) {
log.error("无效的methodName:" + methodName, e);
} catch (Exception e) {
log.error("执行定时任务失败", e);
log.error("id:{}, springId:{}, methodName:{}", jobId, springId, methodName);
}
}
}
/**
* 获取分布式锁对象
*
* @param lockObjectName
* @return
*/
private static DistributedLock getLockObject(String lockObjectName) {
DistributedLock distributedLock =
(DistributedLock) DefaultJobDefinitionLoader.getBean(lockObjectName);
if (Objects.isNull(distributedLock)) {
log.error("没有指定的分布式锁对象:");
return null;
}
return distributedLock;
}
/**
* 执行定时任务
*
* @param springId
* @param methodName
* @param methodArgs
* @throws InvocationTargetException
* @throws IllegalAccessException
* @throws NoSuchMethodException
*/
private static void invoke(String springId, String methodName, String methodArgs)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
Object object = DefaultJobDefinitionLoader.getBean(springId);
if (Objects.isNull(object)) {
log.error("无效的springId:" + springId);
return;
}
Class<?> clazz = object.getClass();
Method method;
if (StringUtils.isBlank(methodArgs)) {
method = clazz.getDeclaredMethod(methodName);
} else {
method = clazz.getDeclaredMethod(methodName, new Class[] {String.class});
}
method.setAccessible(true);
if (StringUtils.isBlank(methodArgs)) {
method.invoke(object);
} else {
method.invoke(object, methodArgs);
}
}
}
利用spring的载入顺序,实现ApplicationContextAware接口获取上下文对象,并且执行数据载入操作
import com.github.dynamicschedule.support.JobDefinition;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.CollectionUtils;
import java.util.List;
public class DefaultJobDefinitionLoader implements ApplicationContextAware {
private static ApplicationContext appContext;
private JobDefinitionRepository jobDefinitionReader;
public DefaultJobDefinitionLoader(JobDefinitionRepository jobDefinitionReader) {
this.jobDefinitionReader = jobDefinitionReader;
}
/** 载入所有的JobDefinition */
private void loadJobDefinition() {
List<JobDefinition> jobDefinitions = jobDefinitionReader.load();
if (CollectionUtils.isEmpty(jobDefinitions)) {
return;
}
// 添加所有的task
jobDefinitions.forEach(e -> DefaultJobManager.addJob(e));
}
/**
* 获取Bean
*
* @param className
* @return
*/
public static Object getBean(String className) {
return appContext.getBean(className);
}
/**
* 获取bean
*
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz) {
return appContext.getBean(clazz);
}
/**
* @param className
* @return
*/
public static Class<?> getType(String className) {
return appContext.getType(className);
}
/**
* @param name
* @return
*/
public static String getProperty(String name) {
return appContext.getEnvironment().getProperty(name);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
this.loadJobDefinition();
}
}
自定义存储数据操作接口,对外开放
import com.github.dynamicschedule.support.JobDefinition;
import com.github.dynamicschedule.support.PageJobDefinition;
import java.util.List;
public interface JobDefinitionRepository {
/**
* 从任意存储空间载入JobDefinition
*
* @return
*/
List<JobDefinition> load();
/**
* 分页结果
*
* @param search
* @param page
* @param size
* @return
*/
PageJobDefinition find4Page(String search, int page, int size);
/**
* 添加
*
* @param jobDefinition
* @return
*/
JobDefinition add(JobDefinition jobDefinition);
/**
* 删除
*
* @param jobId
* @return
*/
JobDefinition delete(String jobId);
/**
* 修改
*
* @param jobDefinition
* @return
*/
JobDefinition update(JobDefinition jobDefinition);
}
自定义分布式锁,接口对外开发
/** 分布式锁 */
public interface DistributedLock {
/**
* 尝试获取锁
*
* @param lockName
* @return
*/
boolean tryLock(String lockName);
/**
* 解锁
*
* @param lockName
* @return
*/
boolean tryUnLock(String lockName);
}
自定义消息通知对象,接口对外开放
import com.github.dynamicschedule.support.JobDefinition;
import javax.annotation.PostConstruct;
public abstract class EventHandler {
@PostConstruct
public void init() {
registe(new EventConsumer());
}
/** 事件注册 */
public abstract void registe(EventConsumer consumer);
/**
* 发送通知
*
* @param jobDefinition
* @param eventType
* @return
*/
public abstract boolean send(JobDefinition jobDefinition, EventType eventType);
}
public enum EventType {
ADD,
DELETE,
UPDATE
}
import com.github.dynamicschedule.core.DefaultJobManager;
import com.github.dynamicschedule.support.JobDefinition;
import java.util.Objects;
/** 事件注册 **/
public class EventConsumer {
/**
* 消费消息
*
* @param jobDefinition
* @param eventType
*/
public void consume(JobDefinition jobDefinition, EventType eventType) {
if (Objects.equals(EventType.ADD, eventType)
|| Objects.equals(EventType.UPDATE, eventType)) {
DefaultJobManager.addJob(jobDefinition);
} else if (Objects.equals(EventType.DELETE, eventType)) {
DefaultJobManager.deleteJob(jobDefinition);
}
}
}
针对job的增删查改service
import com.github.dynamicschedule.core.DefaultJobManager;
import com.github.dynamicschedule.core.JobDefinitionRepository;
import com.github.dynamicschedule.event.EventHandler;
import com.github.dynamicschedule.event.EventType;
import com.github.dynamicschedule.support.JobDefinition;
import com.github.dynamicschedule.support.PageJobDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestPart;
@Service
public class JobManagerService {
/** job数据操作 */
@Autowired private JobDefinitionRepository jobDefinitionRepository;
/** 消息通知 */
@Autowired(required = false)
private EventHandler eventHandler;
/** 是否是集群 */
@Value("${dynamicschedule.cluster.enabled:false}")
private boolean enableCluster;
/**
* 增加
*
* @param jobDefinition
* @return
*/
public JobDefinition add(JobDefinition jobDefinition) {
// 对存储空间做处理
JobDefinition result = jobDefinitionRepository.add(jobDefinition);
// 如果是集群环境,发出通知
if (enableCluster) {
eventHandler.send(result, EventType.ADD);
} else {
DefaultJobManager.addJob(jobDefinition);
}
return result;
}
/**
* 删除
*
* @param id
* @return
*/
public JobDefinition delete(String id) {
// 对存储空间做处理
JobDefinition result = jobDefinitionRepository.delete(id);
// 如果是集群环境,发出通知
if (enableCluster) {
eventHandler.send(result, EventType.DELETE);
} else {
DefaultJobManager.deleteJob(result);
}
return result;
}
/**
* 查询
*
* @param search
* @param page
* @param size
* @return
*/
public PageJobDefinition find4Page(String search, int page, int size) {
return jobDefinitionRepository.find4Page(search, page, size);
}
/**
* 修改
*
* @param jobDefinition
* @return
*/
public JobDefinition update(@RequestPart("jobDefinition") JobDefinition jobDefinition) {
// 对存储空间做处理
JobDefinition result = jobDefinitionRepository.update(jobDefinition);
// 如果是集群环境,发出通知
if (enableCluster) {
eventHandler.send(result, EventType.UPDATE);
} else {
DefaultJobManager.addJob(jobDefinition);
}
return result;
}
}
分页对象
import lombok.Data;
import java.util.List;
/** 分页结果 */
@Data
public class PageJobDefinition {
/** 列表 */
private List<JobDefinition> content;
/** 当前页码 */
private int curPage;
/** 总页数 */
private int totalPage;
}
自定义的接口,配置文件中开启了web才允许使用(dynamicschedule.web.enabled=true)
import com.github.dynamicschedule.service.JobManagerService;
import com.github.dynamicschedule.support.JobDefinition;
import com.github.dynamicschedule.support.PageJobDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.*;
@ConditionalOnProperty(prefix = "dynamicschedule.web", name = "enabled", havingValue = "true")
@RequestMapping("/job")
@RestController
public class JobManagerController {
@Autowired private JobManagerService jobManagerService;
/**
* 增加
*
* @param jobDefinition
* @return
*/
@PostMapping("/add")
public JobDefinition add(@RequestPart("jobDefinition") JobDefinition jobDefinition) {
return jobManagerService.add(jobDefinition);
}
/**
* 删除
*
* @param id
* @return
*/
@DeleteMapping("/delete/{id}")
public JobDefinition delete(@PathVariable("id") String id) {
return jobManagerService.delete(id);
}
/**
* 查询
*
* @param search
* @param page
* @param size
* @return
*/
@GetMapping("/find4Page")
public PageJobDefinition find4Page(String search, int page, int size) {
// 添加到存储空间
return jobManagerService.find4Page(search, page, size);
}
/**
* 修改
*
* @param jobDefinition
* @return
*/
@PutMapping("/update")
public JobDefinition delete(@RequestPart("jobDefinition") JobDefinition jobDefinition) {
return jobManagerService.update(jobDefinition);
}
}
自定义注解
import org.springframework.context.annotation.Import;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DynamicSchedulingConfigurationSelector.class)
public @interface EnableDynamicScheduling {
}
import org.springframework.context.annotation.ImportSelector;
import org.springframework.core.type.AnnotationMetadata;
public class DynamicSchedulingConfigurationSelector implements ImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] {DynamicSchedulingBeanConfigurer.class.getName()};
}
}
import com.github.dynamicschedule.core.DefaultJobDefinitionLoader;
import com.github.dynamicschedule.core.JobDefinitionRepository;
import com.github.dynamicschedule.lock.DistributedLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
public class DynamicSchedulingBeanConfigurer {
@Autowired(required = false) DistributedLock distributedLock;
@Bean
public DefaultJobDefinitionLoader jobDefinitionLoader(
@Autowired JobDefinitionRepository jobDefinitionReader) {
return new DefaultJobDefinitionLoader(jobDefinitionReader);
}
}
上面的代码是整个组件实现的代码。下面告诉大家怎么使用:
@Configuration
@EnableDynamicScheduling
public class DynamicScheduleApplication {
/**
* 任务读取
*
* @return
*/
@Bean
public JobDefinitionRepository jobDefinitionReader() {
return new JobDefinitionRepository() {
@Override
public List<JobDefinition> load() {
List<JobDefinition> list = Lists.newArrayList();
JobDefinition jobDefinition = new JobDefinition();
jobDefinition.setClassName(MyTestService.class.getName());
jobDefinition.setConcurrent(false);
jobDefinition.setCreateTime(System.currentTimeMillis());
jobDefinition.setCronExpression("0/10 * * * * ? *");
jobDefinition.setDescription("第一个定时任务");
jobDefinition.setJobGroup("Group1");
jobDefinition.setJobId(UUID.randomUUID().toString());
jobDefinition.setLockObjectName("distributedLock");
jobDefinition.setJobName("JobName");
jobDefinition.setMethodArg("");
jobDefinition.setMethodName("test");
jobDefinition.setSpringId("myTestService");
jobDefinition.setStatus(0);
jobDefinition.setUpdateTime(System.currentTimeMillis());
JobDefinition jobDefinition2 = new JobDefinition();
jobDefinition2.setClassName(MyTestService.class.getName());
jobDefinition2.setConcurrent(false);
jobDefinition2.setCreateTime(System.currentTimeMillis());
jobDefinition2.setCronExpression("0/10 * * * * ? *");
jobDefinition2.setDescription("第二个定时任务");
jobDefinition2.setJobGroup("Group1");
jobDefinition2.setJobId(UUID.randomUUID().toString());
jobDefinition2.setLockObjectName("distributedLock");
jobDefinition2.setJobName("JobName2");
jobDefinition2.setMethodArg("第二个定时任务");
jobDefinition2.setMethodName("test2");
jobDefinition2.setSpringId("myTestService");
jobDefinition2.setStatus(0);
jobDefinition2.setUpdateTime(System.currentTimeMillis());
list.add(jobDefinition);
list.add(jobDefinition2);
return list;
}
@Override
public PageJobDefinition find4Page(String search, int page, int size) {
return null;
}
@Override
public JobDefinition add(JobDefinition jobDefinition) {
return null;
}
@Override
public JobDefinition delete(String jobId) {
return null;
}
@Override
public JobDefinition update(JobDefinition jobDefinition) {
return null;
}
};
}
/**
* 分布式锁
* dynamicschedule.cluster.enabled=true才需要
* @return
*/
@Bean("distributedLock")
public DistributedLock distributedLock() {
return new DistributedLock() {
@Override
public boolean tryLock(String lockName) {
return true;
}
@Override
public boolean tryUnLock(String lockName) {
return true;
}
};
}
/**
* 消息通知
* dynamicschedule.cluster.enabled=true才需要
* @return
*/
@Bean
public EventHandler eventHandler() {
return new EventHandler() {
@Override
public void registe(EventConsumer consumer) {
System.out.println("注册监听事件");
}
@Override
public boolean send(JobDefinition jobDefinition, EventType eventType) {
// 通知其他节点数据变化
return false;
}
};
}
}
如果需要开启web接口,只需要配置dynamicschedule.web.enabled=true就可以使用。
以上是整个自定义分布式定时任务组件的全部代码和使用示例。