SpringBoot+Quartz实现动态定时任务

目前常用的几种任务调度

  • Timer,简单无门槛,一般也没人用。
  • spring @Scheduled注解,一般集成于项目中,小任务很方便。
  • 开源工具 Quartz,分布式集群开源工具,以下两个分布式任务应该都是基于Quartz实现的,可以说是中小型公司必选,当然也视自身需求而定。
  • 分布式任务 XXL-JOB,是一个轻量级分布式任务调度框架,支持通过 Web 页面对任务进行 CRUD 操作,支持动态修改任务状态、暂停/恢复任务,以及终止运行中任务,支持在线配置调度任务入参和在线查看调度结果。
  • 分布式任务 Elastic-Job,是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。支持分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、并行调度、自诊。
  • 分布式任务 Saturn,Saturn是唯品会在github开源的一款分布式任务调度产品。它是基于当当elastic-job来开发的,其上完善了一些功能和添加了一些新的feature。目前在github上开源大半年,470个star。Saturn的任务可以用多种语言开发比如python、Go、Shell、Java、Php。其在唯品会内部已经发部署350+个节点,每天任务调度4000多万次。同时,管理和统计也是它的亮点。

Quartz是一个广泛使用的开源任务调度框架,用于在Java应用程序中执行定时任务和周期性任务。它提供了强大的调度功能,允许您计划、管理和执行各种任务,从简单的任务到复杂的任务。

Quartz 核心元素:

Quartz任务调度的核心元素为:Scheduler——任务调度器、Trigger——触发器、Job——任务。其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。

  • Trigger是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz中主要提供了四种类型的trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和NthIncludedDayTrigger。这四种trigger可以满足企业应用中的绝大部分需求。

  • Job用于表示被调度的任务。主要有两种类型的job:无状态的(stateless)和有状态的(stateful)。对于同一个trigger来说,有状态的job不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job主要有两种属性:volatility和durability,其中volatility表示任务是否被持久化到数据库存储,而durability表示在没有trigger关联的时候任务是否被保留。两者都是在值为true的时候任务被持久化或保留。一个job可以被多个trigger关联,但是一个trigger只能关联一个job。

  • Scheduler由scheduler工厂创建

以下是Quartz的一些关键特点和功能:

  • 灵活的调度器:Quartz提供了一个高度可配置的调度器,允许您根据不同的时间表执行任务,包括固定的时间、每日、每周、每月、每秒等。您可以设置任务执行的时间和频率。
  • 多任务支持:Quartz支持同时管理和执行多个任务。您可以定义多个作业和触发器,并将它们添加到调度器中。
  • 集群和分布式调度:Quartz支持集群模式,可以在多台机器上协调任务的执行。这使得Quartz非常适合大规模和分布式应用,以确保任务的高可用性和负载均衡。
  • 持久化:Quartz可以将任务和调度信息持久化到数据库中,以便在应用程序重启时不会丢失任务信息。这对于可靠性和数据保持非常重要。
  • 错过任务处理:Quartz可以配置在任务错过执行时如何处理,例如,是否立即执行、延迟执行或丢弃任务。
  • 监听器:Quartz提供了各种监听器,可以用来监视任务的执行,以及在任务执行前后执行自定义操作。
  • 多种作业类型:Quartz支持不同类型的作业,包括无状态作业(Stateless Job)和有状态作业(Stateful
    Job)。这允许您选择最适合您需求的作业类型。
  • 插件机制:Quartz具有灵活的插件机制,可以扩展其功能。您可以创建自定义插件,以满足特定需求。
  • 丰富的API:Quartz提供了丰富的Java API,使任务调度的配置和管理非常方便。

背景

各个服务需要改造支持集群,现在的授权、日程使用的是基于内存的spring scheduler定时任务,如果部署多个节点,那么到了时间点,多个节点都会开始执行定时任务从而可能引起业务和性能上的问题。

服务中的定时任务比较轻量,为了避免引入redis、zookeeper、单独的定时任务程序,所以建议选用quartz这种基于数据库的分布式定时任务调度框架,无需引用多余中间件。

简单设计

原则上是尽量与quartz的耦合降至最低,针对我们的业务场景并不需要太多的调度操作(即图上的controller),只需要程序启动的时候初始化好指定的定时任务就行了,所以先这样搞,如果有更好的设计欢迎一起交流。


SpringBoot集成quartz

1.在官网下载quartz

  • 下载之后解压,进入src\org\quartz\impl\jdbcjobstore找到22种数据库11张表的初始化sql文件,根据不同的数据库选择不同的文件,(达梦为Oracle系,需要使用tables_oracle.sql)


  • 11张表的功能说明:

表名 功能
qrtz_job_details 存储每一个已配置的 Job 的详细信息
qrtz_triggers 存储已配置的 Trigger 的信息
qrtz_simple_triggers 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
qrtz_cron_triggers 存储 Cron Trigger,包括 Cron 表达式和时区信息
qrtz_simprop_triggers 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
qrtz_blob_triggers Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
qrtz_calendars 以 Blob 类型存储 Quartz 的 Calendar 信息
qrtz_paused_trigger_grps 存储已暂停的 Trigger 组的信息
qrtz_fired_triggers 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
qrtz_scheduler_state 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)
qrtz_locks 存储程序的悲观锁的信息(假如使用了悲观锁)

2.引入依赖

<!--quartz依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

3.config配置

  • application.properties
#===================================================================
# quartz基础配置
#===================================================================
# 存储方式,可选值:MEMORY(内存方式,不推荐)、JDBC(持久化存储,推荐)
spring.quartz.jobStoreType=JDBC
# 可选值:ALWAYS(每次都生成、注意只有druid数据库连接池才会自动生成表)、EMBEDDED(仅初始化嵌入式数据源)、NEVER(不初始化数据源)。
spring.quartz.jdbc.initializeSchema=ALWAYS
# quartz自动建表的库类型
spring.quartz.jdbc.platform=mysql_innodb
# quartz自动建表sql的指定
spring.quartz.jdbc.schema=classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql
#  随着容器启动,启动定时任务(默认值ture)
spring.quartz.autoStartup=true
# 定时任务延时启动的时间 (默认值0s)
spring.quartz.startupDelay=5
# 是否可以覆盖定时任务,true 是 (默认值false)
spring.quartz.overwriteExistingJobs=true
# 在容器关闭时,任务执行后关闭容 (默认值false)
spring.quartz.waitForJobsToCompleteOnShutdown=true
  • quartz.properties
#===================================================================
# 配置JobStore
#===================================================================
# 数据保存方式为数据库持久化
org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
# 数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate 可以满足大部分数据库,建议pg系使用 org.quartz.impl.jdbcjobstore.PostgreSQLDelegate,oracle系使用OracleDelegate
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否加入集群
org.quartz.jobStore.isClustered=true
# 信息保存时间 默认值60秒 单位:ms
org.quartz.jobStore.misfireThreshold=25000
# 调度实例失效的检查时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval=5000
# JobDataMaps是否都为String类型,默认false
org.quartz.jobStore.useProperties=true
# 当设置为“true”时,此属性告诉Quartz 在非托管JDBC连接上调用setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted=true
#===================================================================
# Scheduler 调度器属性配置
#===================================================================
# 调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=DiServerClusterScheduler
# ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceId=AUTO
# 是否开启守护线程
org.quartz.scheduler.makeSchedulerThreadDaemon=true
#===================================================================
# 配置ThreadPool
#===================================================================
# 线程池的实现类(一般使用SimpleThreadPool即可满足需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
# 指定在线程池里面创建的线程是否是守护线程
org.quartz.threadPool.makeThreadsDaemons=true
# 指定线程数,至少为1(无默认值),一般设置为1-100直接的整数,根据系统资源配置
org.quartz.threadPool.threadCount=10
# 设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority=5

3.config配置

springboot集成quartz核心配置类

  • 初始化quartz:QuartzSchedulerConfig .java
import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;

@Configuration
public class QuartzSchedulerConfig {

    /**
     * JobFactory与schedulerFactoryBean中的JobFactory相互依赖,注意bean的名称
     * 在这里为JobFactory注入了Spring上下文
     *
     * @param applicationContext
     * @return
     */
    @Bean
    public JobFactory jobFactory(ApplicationContext applicationContext) {
        QuartzJobFactory quartzJobFactory = new QuartzJobFactory();
        quartzJobFactory.setApplicationContext(applicationContext);
        return quartzJobFactory;
    }


    /**
     * 从quartz.properties文件中读取Quartz配置属性
     *
     * @return
     * @throws IOException
     */
    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        propertiesFactoryBean.setLocation(new ClassPathResource("/quartz/quartz.properties"));
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }


    @Bean
    @DependsOn(value = {"jobFactory"})
    public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws IOException {
        // 创建SchedulerFactoryBean
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setQuartzProperties(quartzProperties());
        // 支持在JOB实例中注入其他的业务对象
        factory.setJobFactory(jobFactory);
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
        factory.setWaitForJobsToCompleteOnShutdown(true);
        // 是否覆盖己存在的Job
        factory.setOverwriteExistingJobs(true);
        // QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动
        factory.setStartupDelay(10);
        // 注入spring维护的DataSource
        factory.setDataSource(dataSource);
        return factory;
    }


    /**
     * 通过SchedulerFactoryBean获取Scheduler的实例
     *
     * @return
     * @throws IOException
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler(JobFactory jobFactory, DataSource dataSource) throws IOException {
        return schedulerFactoryBean(jobFactory, dataSource).getScheduler();
    }
}
  • 注入SpringBean:QuartzJobFactory.java
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;

/**
 * 为JobFactory注入SpringBean,否则Job无法使用Spring创建的bean
 */
public final class QuartzJobFactory extends SpringBeanJobFactory
        implements ApplicationContextAware {

    private transient AutowireCapableBeanFactory beanFactory;

    @Override
    protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
        // 调用父类的方法
        final Object job = super.createJobInstance(bundle);
        // 进行注入
        beanFactory.autowireBean(job);
        return job;
    }

    @Override
    public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
        beanFactory = applicationContext.getAutowireCapableBeanFactory();
    }
}
  • 具体的定时任务:TaskGroupJob .java
/**
 * @DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
 * 注org.quartz.threadPool.threadCount的数量有多个的情况,@DisallowConcurrentExecution才生效
 */

@Component
@Slf4j
@DisallowConcurrentExecution
public class TaskGroupJob implements Job {

    /**
     * 核心方法,Quartz Job真正的执行逻辑.
     * @param context 中封装有Quartz运行所需要的所有信息
     * @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常
     */
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDataMap jdMap = context.getJobDetail().getJobDataMap();
        String jobId = (String) jdMap.get("jobId");
        log.info("任务组定时任务执行start, jobId:{}", jobId);
        
        //具体定时任务逻辑处理
        
        log.info("任务组定时任务执行SUCCESS, jobId:{}", jobId);
    }
}

定时任务动态操作

import com.alibaba.fastjson.JSON;
import com.landray.data.dto.arrange.TaskJobDto;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;

import javax.annotation.Resource;

/**
 * @Author: huangyibo
 * @Date: 2024/5/11 14:29
 * @Description: Quartz定时任务动态API Handler
 */

@Component
@Slf4j
public class QuartzTaskGroupHandler {

    @Resource
    private Scheduler scheduler;

    /**
     * 新增定时任务
     * @param taskDto
     */
    public void addCronJob(TaskJobDto taskDto) {
        try {
            //构建TriggerKey, 封装Job的name和group
            TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());

            //构建job信息, 用于描叙Job实现类及其他的一些静态信息, 构建一个作业实例
            JobDetail job = JobBuilder.newJob(taskDto.getClazz()).withIdentity(taskDto.getJobId(), taskDto.getGroupName()).build();
            JobDataMap jobDataMap = job.getJobDataMap();
            jobDataMap.put("jobId", taskDto.getJobId());

            // 构建一个触发器,规定触发的规则
            CronTrigger trigger = TriggerBuilder.newTrigger()
                    .withIdentity(triggerKey)
                    .startNow()
                    .withSchedule(CronScheduleBuilder.cronSchedule(taskDto.getCron()).withMisfireHandlingInstructionDoNothing())
                    .build();

            //SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0).withIntervalInSeconds(20)//每隔多少秒执行一次; withRepeatCount 设置重复的次数
            //.startNow().withSchedule(cronScheduleBuilder)
            //交由Scheduler安排触发
            scheduler.scheduleJob(job, trigger);
            if(!scheduler.isStarted()){
                scheduler.start();
            }
            log.info("添加定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
        } catch (SchedulerException e) {
            log.error("添加定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }

    /**
     * 更新定时任务, 定时任务不存在直接新增,定时任务存在删除后新增
     * @param taskDto
     */
    public void addCronJobNotExists(TaskJobDto taskDto) {
        try {
            boolean taskExist = scheduler.checkExists(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
            if(!taskExist){
                addCronJob(taskDto);
            }
        } catch (Exception e) {
            log.error("更新定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }

    /**
     * 更新定时任务, 定时任务不存在直接新增,定时任务存在则更新
     * @param taskDto
     */
    public void updateCronJob(TaskJobDto taskDto) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            //判断执行周期表达式是否一致
            if(trigger.getCronExpression().equals(taskDto.getCron())){
                return;
            }

            // 修改map
            trigger.getJobDataMap().put("jobId", taskDto.getJobId());

            // 表达式调度构建器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(taskDto.getCron()).withMisfireHandlingInstructionDoNothing();
            // 按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).startNow().withSchedule(scheduleBuilder).build();

            // 按新的trigger重新设置job执行
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (Exception e) {
            log.error("更新定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }

    /**
     * 立即运行一次
     * @param taskDto
     */
    public void runOnce(TaskJobDto taskDto) {
        try {
            scheduler.triggerJob(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
        } catch (SchedulerException e) {
            log.error("立即运行一次定时任务失败", e);
        }
    }


    /**
     * 删除任务
     * @param taskDto
     * @return
     */
    public void removeCronJob(TaskJobDto taskDto) {
        try {
            // TriggerKey 定义了trigger的名称和组别 ,通过任务名和任务组名获取TriggerKey
            TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());
            // 停止触发器
            scheduler.resumeTrigger(triggerKey);
            // 移除触发器
            scheduler.unscheduleJob(triggerKey);
            // 移除任务
            scheduler.deleteJob(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
            log.info("删除定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
        } catch (SchedulerException e) {
            log.error("删除定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }

    /**
     * 暂停定时任务
     * @param taskDto
     */
    public void pauseJob(TaskJobDto taskDto) {
        try {
            JobKey jobKey = JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName());
            // 暂停任务
            scheduler.pauseJob(jobKey);
            log.info("暂停定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
        } catch (SchedulerException e) {
            log.error("暂停定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }

    /**
     * 继续定时任务
     * @param taskDto
     */
    @GetMapping("/resumeJob")
    public void resumeJob(TaskJobDto taskDto) {
        try {
            // 通过任务名和任务组名获取jobKey
            JobKey jobKey = JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName());
            // 继续任务
            scheduler.resumeJob(jobKey);
            log.info("继续定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
        } catch (SchedulerException e) {
            log.error("继续定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
        }
    }
}
@ApiModel("任务管理Dto")
@Data
public class TaskJobDto implements Serializable {

    @ApiModelProperty(value = "任务id")
    private String jobId;

    @ApiModelProperty(value = "任务组名称")
    private String groupName = "TaskGroup";

    @ApiModelProperty(value = "执行周期")
    private String cron;

    @ApiModelProperty(value = "任务执行类")
    private Class<? extends Job> clazz;

}
  • 任务预热,预先加载已经启用的任务组任务
/**
 * @Author: huangyibo
 * @Date: 2024/5/10 18:26
 * @Description: 任务预热,预先加载已经启用的任务组任务
 */
 
@Component
@Slf4j
public class TaskGroupJobRunner {

    @Resource
    private TaskGroupHandler taskGroupHandler;

    @PostConstruct
    public void init() {
        log.info("任务预热,预先加载已经启用的任务组任务start");
        LambdaQueryWrapper<ArrangeTaskGroup> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(ArrangeTaskGroup::getFdTaskStatus, TaskStatusEnum.ENABLE);
        List<ArrangeTaskGroup> taskGroupList = XxxMapper.selectList(queryWrapper);
        if(CollectionUtils.isEmpty(taskGroupList)){
            log.info("任务组管理——任务预热,预先加载已经启用的任务组为空");
            return;
        }
        taskGroupList.forEach(taskGroup -> {
            TaskJobDto taskDto = new TaskJobDto();
            taskDto.setJobId(taskGroup.getFdJobId());
            taskDto.setCron(taskGroup.getFdCron());
            taskDto.setClazz(TaskGroupJob.class);
            taskGroupHandler.addCronJob(taskDto);
        });
        log.info("任务预热,预先加载已经启用的任务组SUCCESS, taskGroupList:{}", JSON.toJSONString(taskGroupList));
        
        log.info("任务预热,手动构造数据start");
        TaskJobDto taskDto = new TaskJobDto();
        taskDto.setJobId("DataAccess_06684bfdadb543f5982778dd663320a7");
        taskDto.setCron("0 0 1 * * ?");
        taskDto.setGroupName("DataAccessTask");
        taskDto.setClazz(DataAccessJob.class);
        taskGroupHandler.addCronJob(taskDto);
        log.info("任务预热,手动构造数据SUCCESS, taskDto:{}", JSON.toJSONString(taskDto));
    }
}
@Service
@Slf4j
public class TaskGroupServiceImpl extends ServiceImpl<TaskGroupMapper, ArrangeTaskGroup> implements TaskGroupService {

    @Resource
    private TaskGroupMapper taskGroupMapper;

    @Resource
    private TaskGroupHandler taskGroupHandler;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Boolean insert(TaskGroupInsert insert) {
        log.info("新增任务组start, insert:{}", JSON.toJSONString(insert));
        TaskGroup taskGroup = new TaskGroup();
        BeanUtils.copyProperties(insert, taskGroup);
        taskGroup.setFdId(IDUtil.getUuId());
        if(Objects.isNull(insert.getFdTaskStatus())){
            taskGroup.setFdTaskStatus(TaskStatusEnum.DISABLE);
        }
        taskGroup.setFdRunStatus(TaskRunStatusEnum.NOT_RUNNING);
        taskGroup.setFdJobId(IDUtil.getUuId());
        taskGroupMapper.insert(taskGroup);
        //任务组启用状态下添加任务组任务
        if(TaskStatusEnum.ENABLE.equals(taskGroup.getFdTaskStatus())){
            TaskJobDto taskDto = new TaskJobDto();
            taskDto.setJobId(taskGroup.getFdJobId());
            taskDto.setCron(taskGroup.getFdCron());
            taskDto.setClazz(TaskGroupJob.class);
            taskGroupHandler.addCronJob(taskDto);
        }
        log.info("新增任务组SUCCESS, insert:{}", JSON.toJSONString(insert));
        return Boolean.TRUE;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Boolean update(TaskGroupUpdate update) {
        log.info("更新任务组start, update:{}", JSON.toJSONString(update));
        TaskGroup taskGroup = new TaskGroup();
        BeanUtils.copyProperties(update, taskGroup);
        if(Objects.isNull(update.getFdTaskStatus())){
            taskGroup.setFdTaskStatus(TaskStatusEnum.DISABLE);
        }
        taskGroup.setFdRunStatus(TaskRunStatusEnum.NOT_RUNNING);
        taskGroupMapper.updateById(taskGroup);
        taskGroup = arrangeTaskGroupMapper.selectById(update.getFdId());
        //任务组启用状态下添加任务组任务
        if(TaskStatusEnum.ENABLE.equals(taskGroup.getFdTaskStatus())){
            TaskJobDto taskDto = new TaskJobDto();
            taskDto.setJobId(taskGroup.getFdJobId());
            taskDto.setCron(taskGroup.getFdCron());
            taskDto.setClazz(TaskGroupJob.class);
            taskGroupHandler.updateCronJob(taskDto);
        }
        log.info("更新任务组SUCCESS, update:{}", JSON.toJSONString(update));
        return Boolean.TRUE;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Boolean deleteById(FdIdForm idForm) {
        log.info("删除任务组start, idForm:{}", JSON.toJSONString(idForm));
        taskGroupMapper.deleteById(idForm.getFdId());

        //删除已经启动的定时任务
        TaskJobDto taskDto = new TaskJobDto();
        taskDto.setJobId(taskGroup.getFdJobId());
        taskDto.setCron(taskGroup.getFdCron());
        taskGroupHandler.removeCronJob(taskDto);
        log.info("删除任务组SUCCESS, idForm:{}", JSON.toJSONString(idForm));
        return Boolean.TRUE;
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Boolean batchUpdate(ArrangeTaskUpdate update) {
        log.info("任务组批量启停start, update:{}", JSON.toJSONString(update));
        LambdaQueryWrapper<TaskGroup> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.in(TaskGroup::getFdId, update.getGroupIdList());
        List<TaskGroup> taskGroupList = taskGroupMapper.selectList(queryWrapper);
        if(CollectionUtils.isEmpty(taskGroupList)){
            throw new BusinessException("批量启停操作任务组不存在!");
        }
        LambdaUpdateWrapper<TaskGroup> lambdaWrapper = new LambdaUpdateWrapper<>();
        lambdaWrapper.set(TaskGroup::getFdTaskStatus, update.getFdTaskStatus()).in(TaskGroup::getFdId, update.getGroupIdList());
        taskGroupMapper.update(null, lambdaWrapper);
        taskGroupList.forEach(taskGroup -> {
            TaskJobDto taskDto = new TaskJobDto();
            taskDto.setJobId(taskGroup.getFdJobId());
            taskDto.setCron(taskGroup.getFdCron());
            taskDto.setClazz(TaskGroupJob.class);
            taskGroupHandler.removeCronJob(taskDto);
            if(TaskStatusEnum.ENABLE.equals(update.getFdTaskStatus())){
                taskGroupHandler.addCronJob(taskDto);
            }
        });
        log.info("任务组批量启停SUCCESS, update:{}", JSON.toJSONString(update));
        return Boolean.TRUE;
    }
}

参考:
https://www.jb51.net/article/261554.htm

https://www.cnblogs.com/Alida/p/12986967.html

https://blog.csdn.net/lc1025082182/article/details/123656328

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容