引言
在项目中,可以通过集成 spring-boot-starter-quartz
实现定时任务调度。Quartz
是一个功能强大的任务调度框架,支持同步和异步任务执行。通过配置 JobDetail
、Trigger
和 Scheduler
,可以灵活地管理定时任务。
添加依赖
在 pom.xml
配置文件中添加以下依赖:
js
<!-- quartz定时器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
在 xiaomayi-common/xiaomayi-scheduler
模块中已经引入此依赖,在实际使用时直接引入以下依赖即可:
js
<!-- quartz任务调用 -->
<dependency>
<groupId>com.xiaomayi</groupId>
<artifactId>xiaomayi-scheduler</artifactId>
</dependency>
任务初始化事件
js
package com.xiaomayi.scheduler.event;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* <p>
* 任务调度初始化事件
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
@Component
public abstract class ScheduleJobEvent {
/**
* 项目启动时初始化
* 备注:此方法在Bean实例化后会被立即调用
*/
@PostConstruct
private void init() {
if (log.isInfoEnabled()) {
log.info("任务调度初始化开始...");
}
// 初始化任务调度
initScheduleJob();
if (log.isInfoEnabled()) {
log.info("任务调度初始化结束...");
}
}
/**
* 初始化任务调度
*/
public abstract void initScheduleJob();
}
任务调度工厂
js
package com.xiaomayi.scheduler.factory;
import com.xiaomayi.core.utils.StringUtils;
import com.xiaomayi.scheduler.constant.ScheduleConstant;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
/**
* <p>
* 任务调度抽象执行器
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
public abstract class ScheduleJobFactory extends QuartzJobBean {
/**
* 线程本地变量
*/
private static ThreadLocal<LocalDateTime> threadLocal = new ThreadLocal<>();
/**
* 执行句柄
*
* @param context 任务执行
* @throws JobExecutionException 异常处理
*/
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
// 获取任务对象
JobDataMap mergedJobDataMap = context.getMergedJobDataMap();
// 获取任务参数
ScheduleJob scheduleJob = (ScheduleJob) mergedJobDataMap.get(ScheduleConstant.TASK_SCHEDULE_PARAM);
log.info("jobName:{}, {}", scheduleJob.getJobName(), scheduleJob);
// 执行前
doBefore(context, scheduleJob);
// 执行定时任务调度
doExecute(context, scheduleJob);
// 执行后
doAfter(context, scheduleJob, null);
} catch (Exception e) {
log.error("任务执行异常:{}", e.getMessage());
// 执行后
doAfter(context, null, e);
}
}
/**
* 执行前
*
* @param context 工作执行上下文对象
* @param scheduleJob 系统计划任务
*/
protected void doBefore(JobExecutionContext context, ScheduleJob scheduleJob) {
log.info("定时任务调度,任务名称:{},任务分组:{} 执行开始", scheduleJob.getJobName(), scheduleJob.getJobGroup());
threadLocal.set(LocalDateTime.now());
}
/**
* 执行后
*
* @param context 工作执行上下文对象
* @param scheduleJob 系统计划任务
*/
protected void doAfter(JobExecutionContext context, ScheduleJob scheduleJob, Exception e) {
log.info("定时任务调度,任务名称:{},任务分组:{} 执行结束", scheduleJob.getJobName(), scheduleJob.getJobGroup());
// 实例化任务调度日志VO
ScheduleJobLogVO scheduleJobLogVO = new ScheduleJobLogVO();
// 任务ID
scheduleJobLogVO.setJobId(scheduleJob.getId());
// 任务名称
scheduleJobLogVO.setJobName(scheduleJob.getJobName());
// 任务分组
scheduleJobLogVO.setJobGroup(scheduleJob.getJobGroup());
// 任务触发器
scheduleJobLogVO.setJobTrigger(scheduleJob.getJobTrigger());
// 执行任务正则表达式
scheduleJobLogVO.setCronExpression(scheduleJob.getCronExpression());
// 执行开始时间
LocalDateTime startTime = threadLocal.get();
// 清除本地线程
threadLocal.remove();
scheduleJobLogVO.setStartTime(startTime);
// 执行结束时间
scheduleJobLogVO.setEndTime(LocalDateTime.now());
// 计算任务执行耗时计算,单位毫秒
long consumeTime = ChronoUnit.MILLIS.between(scheduleJobLogVO.getStartTime(), scheduleJobLogVO.getEndTime());
// 执行耗时
scheduleJobLogVO.setConsumeTime(consumeTime);
// 任务执行耗时
scheduleJobLogVO.setJobMessage(scheduleJobLogVO.getJobName() + " 总共耗时:" + consumeTime + "毫秒");
if (StringUtils.isNotNull(e)) {
// 设置状态:失败
scheduleJobLogVO.setStatus(1);
// 获取执行异常错误描述信息,
String errorMsg = StringUtils.substring(e.getMessage(), 0, 2000);
// 执行异常错误信息
scheduleJobLogVO.setExceptionInfo(errorMsg);
} else {
// 设置状态:成功
scheduleJobLogVO.setStatus(0);
}
// 调用抽象方法,存储任务调度执行日志记录
if (addJobLog(scheduleJobLogVO)) {
log.info("任务调度:{}, 任务分组:{},执行日志存储成功!", scheduleJobLogVO.getJobName(), scheduleJobLogVO.getJobGroup());
}
}
/**
* 执行句柄,抽象方法,由子类重载
*
* @param context 工作执行上下文对象
* @param scheduleJob 系统计划任务
* @throws Exception 执行过程中的异常
*/
protected abstract void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception;
/**
* 存储任务调度执行日志
*
* @param scheduleJobLogVO 参数
* @return 返回结果
*/
protected abstract boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO);
}
定义任务类
创建一个任务类,实现 QuartzJobBean
接口,底层继承的是 Job
接口 定义任务执行逻辑。
- 同步任务
js
package com.xiaomayi.scheduler.factory;
import com.xiaomayi.core.utils.SpringUtils;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.utils.JobExecuteUtils;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
/**
* <p>
* 同步任务工厂,禁止并发执行
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SyncJobFactory extends ScheduleJobFactory {
/**
* 同步任务工厂执行器
*
* @param context 工作执行上下文对象
* @param scheduleJob 系统计划任务
* @throws Exception 异常处理
*/
@Override
protected void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception {
log.info("SyncJobFactory execute");
JobExecuteUtils.invokeMethod(scheduleJob);
}
/**
* 存储任务调度执行日志
*
* @param scheduleJobLogVO 参数
* @return 返回结果
*/
@Override
protected boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
JobResolverService jobResolverService = SpringUtils.getBean(JobResolverService.class);
return jobResolverService.addJobLog(scheduleJobLogVO);
}
}
- 异步任务
异步任务可以通过在任务类中调用异步方法实现:
js
package com.xiaomayi.scheduler.factory;
import com.xiaomayi.core.utils.SpringUtils;
import com.xiaomayi.scheduler.model.ScheduleJob;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.utils.JobExecuteUtils;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
/**
* <p>
* 异步任务工厂,允许并发执行
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
public class AsyncJobFactory extends ScheduleJobFactory {
/**
* 异步工厂执行器
*
* @param context 工作执行上下文对象
* @param scheduleJob 系统计划任务
* @throws Exception 异常处理
*/
@Override
protected void doExecute(JobExecutionContext context, ScheduleJob scheduleJob) throws Exception {
log.info("AsyncJobFactory execute");
JobExecuteUtils.invokeMethod(scheduleJob);
}
/**
* 存储任务调度执行日志
*
* @param scheduleJobLogVO 参数
* @return 返回结果
*/
@Override
protected boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
JobResolverService jobResolverService = SpringUtils.getBean(JobResolverService.class);
return jobResolverService.addJobLog(scheduleJobLogVO);
}
}
任务调度辅助工具
js
package com.xiaomayi.scheduler.utils;
import com.xiaomayi.scheduler.constant.ScheduleConstant;
import com.xiaomayi.scheduler.exception.ScheduleException;
import com.xiaomayi.scheduler.factory.AsyncJobFactory;
import com.xiaomayi.scheduler.factory.SyncJobFactory;
import com.xiaomayi.scheduler.model.ScheduleJob;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
/**
* <p>
* 定时任务调度 辅助工具
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
public class ScheduleUtils {
/**
* 获取触发器KEY
*
* @param jobId 任务名称
* @param jobGroup 任务分组
* @return 返回结果
*/
public static TriggerKey getTriggerKey(Integer jobId, String jobGroup) {
return TriggerKey.triggerKey(ScheduleConstant.TASK_CLASS_NAME + jobId, jobGroup);
}
/**
* 获取表达式触发器
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
* @return 返回结果
*/
public static CronTrigger getCronTrigger(Scheduler scheduler, Integer jobId, String jobGroup) {
try {
TriggerKey triggerKey = getTriggerKey(jobId, jobGroup);
return (CronTrigger) scheduler.getTrigger(triggerKey);
} catch (SchedulerException e) {
log.error("获取定时任务CronTrigger出现异常", e);
throw new ScheduleException("获取定时任务CronTrigger出现异常");
}
}
/**
* 创建任务
*
* @param scheduler 任务调度
* @param scheduleJob 调度对象
*/
public static void createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
createScheduleJob(scheduler, scheduleJob.getId(), scheduleJob.getJobGroup(),
scheduleJob.getCronExpression(), scheduleJob.getIsSync(), scheduleJob);
}
/**
* 创建定时任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
* @param cronExpression 正则表达式
* @param isSync 是否同步任务:1-是 0-否
* @param param 任务参数
*/
public static void createScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup,
String cronExpression, Integer isSync, Object param) {
// 同步或异步
Class<? extends Job> jobClass = isSync.equals(1) ? SyncJobFactory.class : AsyncJobFactory.class;
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup)).withSchedule(scheduleBuilder).build();
String jobTrigger = trigger.getKey().getName();
ScheduleJob scheduleJob = (ScheduleJob) param;
scheduleJob.setJobTrigger(jobTrigger);
// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(ScheduleConstant.TASK_SCHEDULE_PARAM, scheduleJob);
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
log.error("创建定时任务失败", e);
throw new ScheduleException("创建定时任务失败");
}
}
/**
* 运行一次任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
*/
public static void runOnce(Scheduler scheduler, Integer jobId, String jobGroup) {
JobKey jobKey = getJobKey(jobId, jobGroup);
try {
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
log.error("运行一次定时任务失败", e);
throw new ScheduleException("运行一次定时任务失败");
}
}
/**
* 暂停任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
*/
public static void pauseJob(Scheduler scheduler, Integer jobId, String jobGroup) {
JobKey jobKey = getJobKey(jobId, jobGroup);
try {
scheduler.pauseJob(jobKey);
} catch (SchedulerException e) {
log.error("暂停定时任务失败", e);
throw new ScheduleException("暂停定时任务失败");
}
}
/**
* 恢复任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
*/
public static void resumeJob(Scheduler scheduler, Integer jobId, String jobGroup) {
JobKey jobKey = getJobKey(jobId, jobGroup);
try {
scheduler.resumeJob(jobKey);
} catch (SchedulerException e) {
log.error("暂停定时任务失败", e);
throw new ScheduleException("暂停定时任务失败");
}
}
/**
* 获取任务KEY
*
* @param jobId 任务名称
* @param jobGroup 任务分组
* @return 返回结果
*/
public static JobKey getJobKey(Integer jobId, String jobGroup) {
log.info("本次任务调度KEY:{},任务组:{}", ScheduleConstant.TASK_CLASS_NAME + jobId, jobGroup);
String keyName = String.format("%05d", jobId);
return JobKey.jobKey(ScheduleConstant.TASK_CLASS_NAME + keyName, jobGroup);
}
/**
* 更新定时任务
*
* @param scheduler 任务调度
* @param scheduleJob 调度对象
*/
public static void updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
updateScheduleJob(scheduler, scheduleJob.getId(), scheduleJob.getJobGroup(),
scheduleJob.getCronExpression(), scheduleJob.getIsSync(), scheduleJob);
}
/**
* 更新定时任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
* @param cronExpression 正则表达式
* @param isSync 是否同步任务:1-是 0-否
* @param param 任务参数
*/
public static void updateScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup,
String cronExpression, Integer isSync, Object param) {
try {
TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobId, jobGroup);
//表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
// 忽略状态为PAUSED的任务,解决集群环境中在其他机器设置定时任务为PAUSED状态后,集群环境启动另一台主机时定时任务全被唤醒的bug
if (!triggerState.name().equalsIgnoreCase("PAUSED")) {
//按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
}
} catch (SchedulerException e) {
log.error("更新定时任务失败", e);
throw new ScheduleException("更新定时任务失败");
}
}
/**
* 删除定时任务
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
*/
public static void deleteScheduleJob(Scheduler scheduler, Integer jobId, String jobGroup) {
try {
scheduler.deleteJob(getJobKey(jobId, jobGroup));
} catch (SchedulerException e) {
log.error("删除定时任务失败", e);
throw new ScheduleException("删除定时任务失败");
}
}
/**
* 判断定时任务是否已存在
*
* @param scheduler 任务调度
* @param jobId 任务ID
* @param jobGroup 任务分组
* @return 返回结果
*/
public static boolean checkExists(Scheduler scheduler, String jobId, String jobGroup) {
try {
// 获取定时任务KEY
JobKey jobKey = JobKey.jobKey(jobId, jobGroup);
// 返回结果
return scheduler.checkExists(jobKey);
} catch (SchedulerException e) {
log.error("检查定时任务是否存在", e);
throw new ScheduleException("检查定时任务是否存在失败");
}
}
}
任务调度日志记录
创建任务调度日志记录接口类:
js
package com.xiaomayi.scheduler.service;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
/**
* <p>
* 任务调度执行接口
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
public interface JobResolverService {
/**
* 添加任务调用执行日志
*
* @param scheduleJobLogVO 参数
* @return 返回结果
*/
boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO);
}
任务调度日志记录接口实现:
js
package com.xiaomayi.quartz.service.impl;
import com.xiaomayi.quartz.entity.JobLog;
import com.xiaomayi.quartz.service.JobLogService;
import com.xiaomayi.scheduler.service.JobResolverService;
import com.xiaomayi.scheduler.vo.ScheduleJobLogVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 任务调度执行接口实现
* </p>
*
* @author 小蚂蚁云团队
* @since 2024-06-13
*/
@Slf4j
@Service
public class JobResolverServiceImpl implements JobResolverService {
@Autowired
private JobLogService jobLogService;
/**
* 添加任务调度执行日志
*
* @param scheduleJobLogVO 参数
* @return 返回结果
*/
@Override
public boolean addJobLog(ScheduleJobLogVO scheduleJobLogVO) {
log.info("任务调度AOP日志处理中...");
try {
// 实例化操作日志对象
JobLog jobLog = new JobLog();
// 属性拷贝
BeanUtils.copyProperties(scheduleJobLogVO, jobLog);
// 保存任务调度日志
boolean result = jobLogService.save(jobLog);
// 返回结果
return result;
} catch (Exception e) {
log.error("任务调度AOP日志存储失败:{}", e.getMessage());
}
return false;
}
}
总结
通过以上步骤,你可以在项目中集成 Quartz
,实现同步和异步定时任务调度。
1. 核心功能:
通过 Quartz 实现定时任务调度。
支持同步和异步任务执行。
2. 优点:
灵活的任务调度配置,支持 Cron 表达式。
支持持久化任务,适合分布式环境。
3. 适用场景:
需要定时执行任务的业务场景。
需要异步执行任务的场景。
4. 注意事项:
异步任务需要启用 @EnableAsync 注解。
生产环境中建议使用数据库存储任务状态,避免任务丢失。