Skip to content

引言

在项目中,可以通过集成 spring-boot-starter-quartz 实现定时任务调度。Quartz 是一个功能强大的任务调度框架,支持同步和异步任务执行。通过配置 JobDetailTriggerScheduler,可以灵活地管理定时任务。

添加依赖

pom.xml 配置文件中添加以下依赖:

js
<!-- quartz定时器 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

xiaomayi-common/xiaomayi-scheduler 模块中已经引入此依赖,在实际使用时直接引入以下依赖即可:

js
<!-- quartz任务调用 -->
<dependency>
    <groupId>com.xiaomayi</groupId>
    <artifactId>xiaomayi-scheduler</artifactId>
</dependency>

任务初始化事件

js
package com.xiaomayi.scheduler.event;

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * <p>
 * 任务调度初始化事件
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
@Component
public abstract class ScheduleJobEvent {

    /**
     * 项目启动时初始化
     * 备注:此方法在Bean实例化后会被立即调用
     */
    @PostConstruct
    private void init() {
        if (log.isInfoEnabled()) {
            log.info("任务调度初始化开始...");
        }
        // 初始化任务调度
        initScheduleJob();

        if (log.isInfoEnabled()) {
            log.info("任务调度初始化结束...");
        }
    }

    /**
     * 初始化任务调度
     */
    public abstract void initScheduleJob();

}

任务调度工厂

js
package com.xiaomayi.scheduler.factory;

import com.xiaomayi.core.utils.StringUtils;
import com.xiaomayi.scheduler.constant.ScheduleConstant;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;


/**
 * <p>
 * 任务调度抽象执行器
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
public abstract class ScheduleJobFactory extends QuartzJobBean {

    /**
     * 线程本地变量
     */
    private static ThreadLocal<LocalDateTime> threadLocal = new ThreadLocal<>();

    /**
     * 执行句柄
     *
     * @param context 任务执行
     * @throws JobExecutionException 异常处理
     */
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        try {
            // 获取任务对象
            JobDataMap mergedJobDataMap = context.getMergedJobDataMap();
            // 获取任务参数
            ScheduleJob scheduleJob = (ScheduleJob) mergedJobDataMap.get(ScheduleConstant.TASK_SCHEDULE_PARAM);
            log.info("jobName:{}{}", scheduleJob.getJobName(), scheduleJob);
            // 执行前
            doBefore(context, scheduleJob);
            // 执行定时任务调度
            doExecute(context, scheduleJob);
            // 执行后
            doAfter(context, scheduleJob, null);
        } catch (Exception e) {
            log.error("任务执行异常:{}", e.getMessage());
            // 执行后
            doAfter(context, null, e);
        }
    }

    /**
     * 执行前
     *
     * @param context     工作执行上下文对象
     * @param scheduleJob 系统计划任务
     */
    protected void doBefore(JobExecutionContext context, ScheduleJob scheduleJob) {
        log.info("定时任务调度,任务名称:{},任务分组:{} 执行开始", scheduleJob.getJobName(), scheduleJob.getJobGroup());
        threadLocal.set(LocalDateTime.now());
    }

    /**
     * 执行后
     *
     * @param context     工作执行上下文对象
     * @param scheduleJob 系统计划任务
     */
    protected void doAfter(JobExecutionContext context, ScheduleJob scheduleJob, Exception e) {
        log.info("定时任务调度,任务名称:{},任务分组:{} 执行结束", scheduleJob.getJobName(), scheduleJob.getJobGroup());
        // 实例化任务调度日志VO
        ScheduleJobLogVO scheduleJobLogVO = new ScheduleJobLogVO();
        // 任务ID
        scheduleJobLogVO.setJobId(scheduleJob.getId());
        // 任务名称
        scheduleJobLogVO.setJobName(scheduleJob.getJobName());
        // 任务分组
        scheduleJobLogVO.setJobGroup(scheduleJob.getJobGroup());
        // 任务触发器
        scheduleJobLogVO.setJobTrigger(scheduleJob.getJobTrigger());
        // 执行任务正则表达式
        scheduleJobLogVO.setCronExpression(scheduleJob.getCronExpression());
        // 执行开始时间
        LocalDateTime startTime = threadLocal.get();
        // 清除本地线程
        threadLocal.remove();
        scheduleJobLogVO.setStartTime(startTime);
        // 执行结束时间
        scheduleJobLogVO.setEndTime(LocalDateTime.now());
        // 计算任务执行耗时计算,单位毫秒
        long consumeTime = ChronoUnit.MILLIS.between(scheduleJobLogVO.getStartTime(), scheduleJobLogVO.getEndTime());
        // 执行耗时
        scheduleJobLogVO.setConsumeTime(consumeTime);
        // 任务执行耗时
        scheduleJobLogVO.setJobMessage(scheduleJobLogVO.getJobName() + " 总共耗时:" + consumeTime + "毫秒");
        if (StringUtils.isNotNull(e)) {
            // 设置状态:失败
            scheduleJobLogVO.setStatus(1);
            // 获取执行异常错误描述信息,
            String errorMsg = StringUtils.substring(e.getMessage(), 0, 2000);
            // 执行异常错误信息
            scheduleJobLogVO.setExceptionInfo(errorMsg);
        } else {
            // 设置状态:成功
            scheduleJobLogVO.setStatus(0);
        }

        // 调用抽象方法,存储任务调度执行日志记录
        if (addJobLog(scheduleJobLogVO)) {
            log.info("任务调度:{}, 任务分组:{},执行日志存储成功!", scheduleJobLogVO.getJobName(), scheduleJobLogVO.getJobGroup());
        }
    }

    /**
     * 执行句柄,抽象方法,由子类重载
     *
     * @param context     工作执行上下文对象
     * @param scheduleJob 系统计划任务
     * @throws Exception 执行过程中的异常
     */
    protected abstract void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception;

    /**
     * 存储任务调度执行日志
     *
     * @param scheduleJobLogVO 参数
     * @return 返回结果
     */
    protected abstract boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO);

}

定义任务类

创建一个任务类,实现 QuartzJobBean 接口,底层继承的是 Job 接口 定义任务执行逻辑。

  • 同步任务
js
package com.xiaomayi.scheduler.factory;

import com.xiaomayi.core.utils.SpringUtils;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.utils.JobExecuteUtils;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;

/**
 * <p>
 * 同步任务工厂,禁止并发执行
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SyncJobFactory extends ScheduleJobFactory {


    /**
     * 同步任务工厂执行器
     *
     * @param context     工作执行上下文对象
     * @param scheduleJob 系统计划任务
     * @throws Exception 异常处理
     */
    @Override
    protected void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception {
        log.info("SyncJobFactory execute");
        JobExecuteUtils.invokeMethod(scheduleJob);
    }

    /**
     * 存储任务调度执行日志
     *
     * @param scheduleJobLogVO 参数
     * @return 返回结果
     */
    @Override
    protected boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
        JobResolverService jobResolverService = SpringUtils.getBean(JobResolverService.class);
        return jobResolverService.addJobLog(scheduleJobLogVO);
    }

}
  • 异步任务

异步任务可以通过在任务类中调用异步方法实现:

js
package com.xiaomayi.scheduler.factory;

import com.xiaomayi.core.utils.SpringUtils;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.utils.JobExecuteUtils;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;

/**
 * <p>
 * 异步任务工厂,允许并发执行
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
public class AsyncJobFactory extends ScheduleJobFactory {

    /**
     * 异步工厂执行器
     *
     * @param context     工作执行上下文对象
     * @param scheduleJob 系统计划任务
     * @throws Exception 异常处理
     */
    @Override
    protected void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception {
        log.info("AsyncJobFactory execute");
        JobExecuteUtils.invokeMethod(scheduleJob);
    }

    /**
     * 存储任务调度执行日志
     *
     * @param scheduleJobLogVO 参数
     * @return 返回结果
     */
    @Override
    protected boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
        JobResolverService jobResolverService = SpringUtils.getBean(JobResolverService.class);
        return jobResolverService.addJobLog(scheduleJobLogVO);
    }
}

任务调度辅助工具

js
package com.xiaomayi.scheduler.utils;

import com.xiaomayi.scheduler.constant.ScheduleConstant;
import com.xiaomayi.scheduler.exception.ScheduleException;
import com.xiaomayi.scheduler.factory.AsyncJobFactory;
import com.xiaomayi.scheduler.factory.SyncJobFactory;
import com.xiaomayi.scheduler.model.ScheduleJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;

/**
 * <p>
 * 定时任务调度 辅助工具
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
public class ScheduleUtils {

    /**
     * 获取触发器KEY
     *
     * @param jobId    任务名称
     * @param jobGroup 任务分组
     * @return 返回结果
     */
    public static TriggerKey getTriggerKey(Integer jobId, String jobGroup) {
        return TriggerKey.triggerKey(ScheduleConstant.TASK_CLASS_NAME + jobId, jobGroup);
    }

    /**
     * 获取表达式触发器
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     * @return 返回结果
     */
    public static CronTrigger getCronTrigger(Scheduler scheduler, Integer jobId, String jobGroup) {
        try {
            TriggerKey triggerKey = getTriggerKey(jobId, jobGroup);
            return (CronTrigger) scheduler.getTrigger(triggerKey);
        } catch (SchedulerException e) {
            log.error("获取定时任务CronTrigger出现异常", e);
            throw new ScheduleException("获取定时任务CronTrigger出现异常");
        }
    }

    /**
     * 创建任务
     *
     * @param scheduler   任务调度
     * @param scheduleJob 调度对象
     */
    public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
        createScheduleJob(scheduler, scheduleJob.getId(), scheduleJob.getJobGroup(),
                scheduleJob.getCronExpression(), scheduleJob.getIsSync(), scheduleJob);
    }

    /**
     * 创建定时任务
     *
     * @param scheduler      任务调度
     * @param jobId          任务ID
     * @param jobGroup       任务分组
     * @param cronExpression 正则表达式
     * @param isSync         是否同步任务:1-是 0-否
     * @param param          任务参数
     */
    public static void createScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup,
                                         String cronExpression, Integer isSync, Object param) {
        // 同步或异步
        Class<? extends Job> jobClass = isSync.equals(1) ? SyncJobFactory.class : AsyncJobFactory.class;

        // 构建job信息
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();

        // 表达式调度构建器
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);

        // 按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup)).withSchedule(scheduleBuilder).build();

        String jobTrigger = trigger.getKey().getName();

        ScheduleJob scheduleJob = (ScheduleJob) param;
        scheduleJob.setJobTrigger(jobTrigger);

        // 放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(ScheduleConstant.TASK_SCHEDULE_PARAM, scheduleJob);

        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            log.error("创建定时任务失败", e);
            throw new ScheduleException("创建定时任务失败");
        }
    }

    /**
     * 运行一次任务
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     */
    public static void runOnce(Scheduler scheduler, Integer jobId, String jobGroup) {
        JobKey jobKey = getJobKey(jobId, jobGroup);
        try {
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            log.error("运行一次定时任务失败", e);
            throw new ScheduleException("运行一次定时任务失败");
        }
    }

    /**
     * 暂停任务
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     */
    public static void pauseJob(Scheduler scheduler, Integer jobId, String jobGroup) {
        JobKey jobKey = getJobKey(jobId, jobGroup);
        try {
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            log.error("暂停定时任务失败", e);
            throw new ScheduleException("暂停定时任务失败");
        }
    }

    /**
     * 恢复任务
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     */
    public static void resumeJob(Scheduler scheduler, Integer jobId, String jobGroup) {
        JobKey jobKey = getJobKey(jobId, jobGroup);
        try {
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            log.error("暂停定时任务失败", e);
            throw new ScheduleException("暂停定时任务失败");
        }
    }

    /**
     * 获取任务KEY
     *
     * @param jobId    任务名称
     * @param jobGroup 任务分组
     * @return 返回结果
     */
    public static JobKey getJobKey(Integer jobId, String jobGroup) {
        log.info("本次任务调度KEY:{},任务组:{}", ScheduleConstant.TASK_CLASS_NAME + jobId, jobGroup);
        String keyName = String.format("%05d", jobId);
        return JobKey.jobKey(ScheduleConstant.TASK_CLASS_NAME + keyName, jobGroup);
    }

    /**
     * 更新定时任务
     *
     * @param scheduler   任务调度
     * @param scheduleJob 调度对象
     */
    public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
        updateScheduleJob(scheduler, scheduleJob.getId(), scheduleJob.getJobGroup(),
                scheduleJob.getCronExpression(), scheduleJob.getIsSync(), scheduleJob);
    }

    /**
     * 更新定时任务
     *
     * @param scheduler      任务调度
     * @param jobId          任务ID
     * @param jobGroup       任务分组
     * @param cronExpression 正则表达式
     * @param isSync         是否同步任务:1-是 0-否
     * @param param          任务参数
     */
    public static void updateScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup,
                                         String cronExpression, Integer isSync, Object param) {

        try {

            TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobId, jobGroup);

            //表达式调度构建器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);

            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);

            //按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
            // 忽略状态为PAUSED的任务,解决集群环境中在其他机器设置定时任务为PAUSED状态后,集群环境启动另一台主机时定时任务全被唤醒的bug
            if (!triggerState.name().equalsIgnoreCase("PAUSED")) {
                //按新的trigger重新设置job执行
                scheduler.rescheduleJob(triggerKey, trigger);
            }
        } catch (SchedulerException e) {
            log.error("更新定时任务失败", e);
            throw new ScheduleException("更新定时任务失败");
        }
    }

    /**
     * 删除定时任务
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     */
    public static void deleteScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup) {
        try {
            scheduler.deleteJob(getJobKey(jobId, jobGroup));
        } catch (SchedulerException e) {
            log.error("删除定时任务失败", e);
            throw new ScheduleException("删除定时任务失败");
        }
    }

    /**
     * 判断定时任务是否已存在
     *
     * @param scheduler 任务调度
     * @param jobId     任务ID
     * @param jobGroup  任务分组
     * @return 返回结果
     */
    public static boolean checkExists(Scheduler scheduler, String jobId, String jobGroup) {
        try {
            // 获取定时任务KEY
            JobKey jobKey = JobKey.jobKey(jobId, jobGroup);
            // 返回结果
            return scheduler.checkExists(jobKey);
        } catch (SchedulerException e) {
            log.error("检查定时任务是否存在", e);
            throw new ScheduleException("检查定时任务是否存在失败");
        }
    }

}

任务调度日志记录

创建任务调度日志记录接口类:

js
package com.xiaomayi.scheduler.service;

import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;

/**
 * <p>
 * 任务调度执行接口
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
public interface JobResolverService {

    /**
     * 添加任务调用执行日志
     *
     * @param scheduleJobLogVO 参数
     * @return 返回结果
     */
    boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO);

}

任务调度日志记录接口实现:

js
package com.xiaomayi.quartz.service.impl;

import com.xiaomayi.quartz.entity.JobLog;
import com.xiaomayi.quartz.service.JobLogService;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


/**
 * <p>
 * 任务调度执行接口实现
 * </p>
 *
 * @author 小蚂蚁云团队
 * @since 2024-06-13
 */
@Slf4j
@Service
public class JobResolverServiceImpl implements JobResolverService {

    @Autowired
    private JobLogService jobLogService;

    /**
     * 添加任务调度执行日志
     *
     * @param scheduleJobLogVO 参数
     * @return 返回结果
     */
    @Override
    public boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
        log.info("任务调度AOP日志处理中...");
        try {
            // 实例化操作日志对象
            JobLog jobLog = new JobLog();
            // 属性拷贝
            BeanUtils.copyProperties(scheduleJobLogVO, jobLog);
            // 保存任务调度日志
            boolean result = jobLogService.save(jobLog);
            // 返回结果
            return result;
        } catch (Exception e) {
            log.error("任务调度AOP日志存储失败:{}", e.getMessage());
        }
        return false;
    }
}

总结

通过以上步骤,你可以在项目中集成 Quartz,实现同步和异步定时任务调度。

1. 核心功能:
    通过 Quartz 实现定时任务调度。
    支持同步和异步任务执行。
2. 优点:
    灵活的任务调度配置,支持 Cron 表达式。
    支持持久化任务,适合分布式环境。
3. 适用场景:
    需要定时执行任务的业务场景。
    需要异步执行任务的场景。
4. 注意事项:
    异步任务需要启用 @EnableAsync 注解。
    生产环境中建议使用数据库存储任务状态,避免任务丢失。

小蚂蚁云团队 · 提供技术支持

小蚂蚁云 新品首发
新品首发,限时特惠,抢购从速! 全场95折
赋能开发者,助理企业发展,提供全方位数据中台解决方案。
获取官方授权