Quartz
Quartz是一个功能强大的开源作业调度库,完全用Java编写,设计用于J2SE和J2EE应用中。它提供了巨大的灵活性而不牺牲简单性,是企业级应用中最广泛使用的调度框架。
核心概念
基本组件
- Scheduler - 调度器,作业调度的主要接口
- Job - 作业,需要被调度执行的任务
- JobDetail - 作业详情,描述Job的具体信息
- Trigger - 触发器,定义作业执行的时间规则
- JobDataMap - 作业数据映射,在作业执行时传递数据
触发器类型
- SimpleTrigger - 简单触发器,支持固定间隔执行
- CronTrigger - Cron触发器,支持复杂的时间表达式
- CalendarIntervalTrigger - 日历间隔触发器
- DailyTimeIntervalTrigger - 每日时间间隔触发器
存储方式
- RAMJobStore - 内存存储,性能最佳但不持久化
- JobStoreTX - 数据库存储,支持事务
- JobStoreCMT - 容器管理事务存储
- TerracottaJobStore - Terracotta集群存储
架构设计
核心架构
点击查看完整代码实现
Quartz架构:
├── Scheduler Factory
│ ├── 调度器工厂
│ ├── 配置管理
│ └── 实例创建
├── Scheduler
│ ├── 作业管理
│ ├── 触发器管理
│ ├── 监听器管理
│ └── 生命周期控制
├── Job Store
│ ├── 作业存储
│ ├── 触发器存储
│ ├── 调度状态
│ └── 持久化机制
├── Thread Pool
│ ├── 工作线程池
│ ├── 并发控制
│ └── 资源管理
└── Trigger
├── 时间计算
├── 触发逻辑
└── 状态管理执行流程
作业执行流程:
1. Scheduler启动
2. 检查Trigger触发时间
3. 从JobStore获取JobDetail
4. 分配Worker Thread
5. 创建JobExecutionContext
6. 执行Job.execute()
7. 更新作业状态
8. 处理异常和完成状态快速开始
Maven依赖
xml
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>
<!-- Spring集成 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.3.21</version>
</dependency>简单示例
创建作业类
点击查看完整代码实现
java
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
public class HelloJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Hello Quartz! " + new Date());
// 获取作业数据
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String message = dataMap.getString("message");
System.out.println("Message: " + message);
// 获取触发器信息
System.out.println("Trigger Name: " + context.getTrigger().getKey().getName());
System.out.println("Fire Time: " + context.getFireTime());
System.out.println("Next Fire Time: " + context.getNextFireTime());
}
}调度器配置
点击查看完整代码实现
java
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
public class QuartzExample {
public static void main(String[] args) throws Exception {
// 创建调度器工厂
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
// 定义作业
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withIdentity("helloJob", "group1")
.usingJobData("message", "Hello World!")
.build();
// 定义触发器 - 每5秒执行一次
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("helloTrigger", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(5)
.repeatForever())
.build();
// 调度作业
scheduler.scheduleJob(jobDetail, trigger);
// 启动调度器
scheduler.start();
// 运行一段时间后关闭
Thread.sleep(30000);
scheduler.shutdown();
}
}Cron表达式示例
点击查看完整代码实现
java
public class CronExample {
public static void main(String[] args) throws Exception {
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
// 定义作业
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withIdentity("cronJob", "cronGroup")
.build();
// Cron触发器 - 每天上午10:15执行
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity("cronTrigger", "cronGroup")
.withSchedule(CronScheduleBuilder.cronSchedule("0 15 10 * * ?"))
.build();
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.start();
// 其他Cron表达式示例
scheduleDailyCronJob(scheduler);
scheduleWeeklyCronJob(scheduler);
scheduleMonthlyCronJob(scheduler);
}
private static void scheduleDailyCronJob(Scheduler scheduler) throws Exception {
// 每天22:00执行
JobDetail job = JobBuilder.newJob(HelloJob.class)
.withIdentity("dailyJob")
.build();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("dailyTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 22 * * ?"))
.build();
scheduler.scheduleJob(job, trigger);
}
private static void scheduleWeeklyCronJob(Scheduler scheduler) throws Exception {
// 每周一上午9:00执行
JobDetail job = JobBuilder.newJob(HelloJob.class)
.withIdentity("weeklyJob")
.build();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("weeklyTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 9 ? * MON"))
.build();
scheduler.scheduleJob(job, trigger);
}
private static void scheduleMonthlyCronJob(Scheduler scheduler) throws Exception {
// 每月1号凌晨2:00执行
JobDetail job = JobBuilder.newJob(HelloJob.class)
.withIdentity("monthlyJob")
.build();
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("monthlyTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 1 * ?"))
.build();
scheduler.scheduleJob(job, trigger);
}
}作业开发
有状态作业
点击查看完整代码实现
java
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@DisallowConcurrentExecution // 禁止并发执行
public class StatefulJob implements Job {
private static int counter = 0;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
counter++;
System.out.println("StatefulJob executed " + counter + " times");
// 模拟长时间运行的任务
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("StatefulJob finished execution " + counter);
}
}持久化作业
点击查看完整代码实现
java
import org.quartz.PersistJobDataAfterExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@PersistJobDataAfterExecution // 执行后持久化作业数据
@DisallowConcurrentExecution // 禁止并发执行
public class PersistentJob implements Job {
private static final String COUNT_KEY = "count";
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
int count = dataMap.getIntValue(COUNT_KEY);
count++;
System.out.println("PersistentJob count: " + count);
// 更新作业数据
dataMap.put(COUNT_KEY, count);
// 如果执行了10次就删除作业
if (count >= 10) {
try {
context.getScheduler().deleteJob(context.getJobDetail().getKey());
System.out.println("Job deleted after 10 executions");
} catch (SchedulerException e) {
e.printStackTrace();
}
}
}
}中断作业
点击查看完整代码实现
java
import org.quartz.InterruptableJob;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.UnableToInterruptJobException;
public class InterruptableJobExample implements InterruptableJob {
private volatile boolean interrupted = false;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("InterruptableJob started");
for (int i = 0; i < 100; i++) {
if (interrupted) {
System.out.println("Job was interrupted at step " + i);
return;
}
// 模拟工作
try {
Thread.sleep(1000);
System.out.println("Step " + i + " completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
System.out.println("InterruptableJob completed");
}
@Override
public void interrupt() throws UnableToInterruptJobException {
System.out.println("Job interruption requested");
interrupted = true;
}
}集群配置
数据库集群配置
点击查看完整代码实现
properties
# quartz.properties
# 调度器配置
org.quartz.scheduler.instanceName = MyClusteredScheduler
org.quartz.scheduler.instanceId = AUTO
# 线程池配置
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.threadPool.threadPriority = 5
# 作业存储配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties = false
org.quartz.jobStore.dataSource = myDS
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
# 数据源配置
org.quartz.dataSource.myDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/quartz?useSSL=false
org.quartz.dataSource.myDS.user = quartz
org.quartz.dataSource.myDS.password = quartz
org.quartz.dataSource.myDS.maxConnections = 10
org.quartz.dataSource.myDS.validationQuery = select 1数据库表创建
点击查看完整代码实现
sql
-- MySQL建表脚本
-- Quartz表结构
-- 作业详情表
CREATE TABLE QRTZ_JOB_DETAILS (
SCHED_NAME varchar(120) NOT NULL,
JOB_NAME varchar(200) NOT NULL,
JOB_GROUP varchar(200) NOT NULL,
DESCRIPTION varchar(250) NULL,
JOB_CLASS_NAME varchar(250) NOT NULL,
IS_DURABLE varchar(1) NOT NULL,
IS_NONCONCURRENT varchar(1) NOT NULL,
IS_UPDATE_DATA varchar(1) NOT NULL,
REQUESTS_RECOVERY varchar(1) NOT NULL,
JOB_DATA blob NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
-- 触发器表
CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME varchar(120) NOT NULL,
TRIGGER_NAME varchar(200) NOT NULL,
TRIGGER_GROUP varchar(200) NOT NULL,
JOB_NAME varchar(200) NOT NULL,
JOB_GROUP varchar(200) NOT NULL,
DESCRIPTION varchar(250) NULL,
NEXT_FIRE_TIME bigint(13) NULL,
PREV_FIRE_TIME bigint(13) NULL,
PRIORITY int(11) NULL,
TRIGGER_STATE varchar(16) NOT NULL,
TRIGGER_TYPE varchar(8) NOT NULL,
START_TIME bigint(13) NOT NULL,
END_TIME bigint(13) NULL,
CALENDAR_NAME varchar(200) NULL,
MISFIRE_INSTR smallint(2) NULL,
JOB_DATA blob NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
-- 其他表结构...Spring Boot集成
点击查看完整代码实现
java
@Configuration
@EnableScheduling
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setConfigLocation(new ClassPathResource("quartz.properties"));
factory.setStartupDelay(2);
factory.setAutoStartup(true);
factory.setOverwriteExistingJobs(true);
// 设置数据源
factory.setDataSource(dataSource());
return factory;
}
@Bean
public DataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/quartz");
dataSource.setUsername("quartz");
dataSource.setPassword("quartz");
dataSource.setMaximumPoolSize(10);
return dataSource;
}
@Bean
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) {
return schedulerFactoryBean.getScheduler();
}
}监听器
作业监听器
点击查看完整代码实现
java
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
public class MyJobListener implements JobListener {
private static final String LISTENER_NAME = "MyJobListener";
@Override
public String getName() {
return LISTENER_NAME;
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
System.out.println("Job " + jobName + " is about to be executed");
// 记录开始时间
context.put("startTime", System.currentTimeMillis());
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
System.out.println("Job " + jobName + " was vetoed");
}
@Override
public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
String jobName = context.getJobDetail().getKey().toString();
// 计算执行时间
Long startTime = (Long) context.get("startTime");
if (startTime != null) {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Job " + jobName + " was executed in " + duration + " ms");
}
if (jobException != null) {
System.err.println("Job " + jobName + " threw exception: " + jobException.getMessage());
}
}
}触发器监听器
点击查看完整代码实现
java
import org.quartz.JobExecutionContext;
import org.quartz.Trigger;
import org.quartz.TriggerListener;
public class MyTriggerListener implements TriggerListener {
private static final String LISTENER_NAME = "MyTriggerListener";
@Override
public String getName() {
return LISTENER_NAME;
}
@Override
public void triggerFired(Trigger trigger, JobExecutionContext context) {
String triggerName = trigger.getKey().toString();
System.out.println("Trigger " + triggerName + " fired");
}
@Override
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
// 返回true会阻止作业执行
return false;
}
@Override
public void triggerMisfired(Trigger trigger) {
String triggerName = trigger.getKey().toString();
System.out.println("Trigger " + triggerName + " misfired");
}
@Override
public void triggerComplete(Trigger trigger, JobExecutionContext context,
Trigger.CompletedExecutionInstruction triggerInstructionCode) {
String triggerName = trigger.getKey().toString();
System.out.println("Trigger " + triggerName + " completed");
}
}调度器监听器
点击查看完整代码实现
java
import org.quartz.*;
public class MySchedulerListener implements SchedulerListener {
@Override
public void jobScheduled(Trigger trigger) {
System.out.println("Job scheduled: " + trigger.getJobKey());
}
@Override
public void jobUnscheduled(TriggerKey triggerKey) {
System.out.println("Job unscheduled: " + triggerKey);
}
@Override
public void triggerFinalized(Trigger trigger) {
System.out.println("Trigger finalized: " + trigger.getKey());
}
@Override
public void triggerPaused(TriggerKey triggerKey) {
System.out.println("Trigger paused: " + triggerKey);
}
@Override
public void triggerResumed(TriggerKey triggerKey) {
System.out.println("Trigger resumed: " + triggerKey);
}
@Override
public void jobAdded(JobDetail jobDetail) {
System.out.println("Job added: " + jobDetail.getKey());
}
@Override
public void jobDeleted(JobKey jobKey) {
System.out.println("Job deleted: " + jobKey);
}
@Override
public void jobPaused(JobKey jobKey) {
System.out.println("Job paused: " + jobKey);
}
@Override
public void jobResumed(JobKey jobKey) {
System.out.println("Job resumed: " + jobKey);
}
@Override
public void schedulerError(String msg, SchedulerException cause) {
System.err.println("Scheduler error: " + msg);
cause.printStackTrace();
}
@Override
public void schedulerInStandbyMode() {
System.out.println("Scheduler in standby mode");
}
@Override
public void schedulerStarted() {
System.out.println("Scheduler started");
}
@Override
public void schedulerStarting() {
System.out.println("Scheduler starting");
}
@Override
public void schedulerShutdown() {
System.out.println("Scheduler shutdown");
}
@Override
public void schedulerShuttingdown() {
System.out.println("Scheduler shutting down");
}
@Override
public void schedulingDataCleared() {
System.out.println("Scheduling data cleared");
}
}高级功能
作业链
点击查看完整代码实现
java
public class JobChainExample {
public static void main(String[] args) throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 第一个作业
JobDetail job1 = JobBuilder.newJob(Job1.class)
.withIdentity("job1", "chain")
.build();
// 第二个作业
JobDetail job2 = JobBuilder.newJob(Job2.class)
.withIdentity("job2", "chain")
.build();
// 第三个作业
JobDetail job3 = JobBuilder.newJob(Job3.class)
.withIdentity("job3", "chain")
.build();
// 链式触发器
Trigger trigger1 = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "chain")
.startNow()
.build();
scheduler.scheduleJob(job1, trigger1);
scheduler.scheduleJob(job2, Set.of(), true);
scheduler.scheduleJob(job3, Set.of(), true);
scheduler.start();
}
}
public static class Job1 implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Job1 executed");
// 触发下一个作业
try {
Trigger trigger2 = TriggerBuilder.newTrigger()
.withIdentity("trigger2", "chain")
.forJob("job2", "chain")
.startNow()
.build();
context.getScheduler().scheduleJob(trigger2);
} catch (SchedulerException e) {
throw new JobExecutionException(e);
}
}
}日历排除
点击查看完整代码实现
java
import org.quartz.Calendar;
import org.quartz.impl.calendar.HolidayCalendar;
import org.quartz.impl.calendar.WeeklyCalendar;
public class CalendarExample {
public static void main(String[] args) throws Exception {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 创建节假日日历
HolidayCalendar holidayCalendar = new HolidayCalendar();
holidayCalendar.addExcludedDate(new Date("2024/01/01")); // 元旦
holidayCalendar.addExcludedDate(new Date("2024/10/01")); // 国庆节
// 创建周末日历
WeeklyCalendar weeklyCalendar = new WeeklyCalendar();
weeklyCalendar.setDayExcluded(java.util.Calendar.SATURDAY, true);
weeklyCalendar.setDayExcluded(java.util.Calendar.SUNDAY, true);
// 添加日历到调度器
scheduler.addCalendar("holidays", holidayCalendar, false, false);
scheduler.addCalendar("weekends", weeklyCalendar, false, false);
// 创建使用日历的触发器
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("businessDayTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 9 * * ?"))
.modifiedByCalendar("holidays") // 排除节假日
.build();
JobDetail job = JobBuilder.newJob(BusinessDayJob.class)
.withIdentity("businessDayJob")
.build();
scheduler.scheduleJob(job, trigger);
scheduler.start();
}
}监控管理
JMX监控
java
import org.quartz.management.ManagementRESTServiceConfiguration;
// 启用JMX监控
Properties props = new Properties();
props.setProperty("org.quartz.scheduler.jmx.export", "true");
props.setProperty("org.quartz.scheduler.jmx.objectName", "quartz:type=QuartzScheduler,name=MyScheduler,instance=NON_CLUSTERED");
SchedulerFactory factory = new StdSchedulerFactory(props);
Scheduler scheduler = factory.getScheduler();Web监控控制台
点击查看完整代码实现
java
// 基于Spring Boot的监控端点
@RestController
@RequestMapping("/quartz")
public class QuartzMonitorController {
@Autowired
private Scheduler scheduler;
@GetMapping("/jobs")
public List<JobInfo> getAllJobs() throws SchedulerException {
List<JobInfo> jobs = new ArrayList<>();
for (String groupName : scheduler.getJobGroupNames()) {
for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
JobInfo jobInfo = new JobInfo();
jobInfo.setJobName(jobKey.getName());
jobInfo.setJobGroup(jobKey.getGroup());
jobInfo.setJobClass(jobDetail.getJobClass().getSimpleName());
if (!triggers.isEmpty()) {
Trigger trigger = triggers.get(0);
jobInfo.setNextFireTime(trigger.getNextFireTime());
jobInfo.setPreviousFireTime(trigger.getPreviousFireTime());
jobInfo.setState(scheduler.getTriggerState(trigger.getKey()).name());
}
jobs.add(jobInfo);
}
}
return jobs;
}
@PostMapping("/jobs/{jobName}/{jobGroup}/trigger")
public ResponseEntity<String> triggerJob(@PathVariable String jobName, @PathVariable String jobGroup) {
try {
scheduler.triggerJob(new JobKey(jobName, jobGroup));
return ResponseEntity.ok("Job triggered successfully");
} catch (SchedulerException e) {
return ResponseEntity.badRequest().body("Failed to trigger job: " + e.getMessage());
}
}
@PostMapping("/jobs/{jobName}/{jobGroup}/pause")
public ResponseEntity<String> pauseJob(@PathVariable String jobName, @PathVariable String jobGroup) {
try {
scheduler.pauseJob(new JobKey(jobName, jobGroup));
return ResponseEntity.ok("Job paused successfully");
} catch (SchedulerException e) {
return ResponseEntity.badRequest().body("Failed to pause job: " + e.getMessage());
}
}
@PostMapping("/jobs/{jobName}/{jobGroup}/resume")
public ResponseEntity<String> resumeJob(@PathVariable String jobName, @PathVariable String jobGroup) {
try {
scheduler.resumeJob(new JobKey(jobName, jobGroup));
return ResponseEntity.ok("Job resumed successfully");
} catch (SchedulerException e) {
return ResponseEntity.badRequest().body("Failed to resume job: " + e.getMessage());
}
}
}最佳实践
作业设计原则
- 保持作业无状态,避免共享资源冲突
- 实现幂等性,支持重复执行
- 合理处理异常,避免作业中断
- 使用JobDataMap传递参数,避免硬编码
性能优化
- 合理配置线程池大小
- 使用数据库连接池优化存储性能
- 避免在作业中进行耗时的I/O操作
- 定期清理历史执行记录
集群部署
- 使用数据库存储确保集群一致性
- 合理设置集群检查间隔
- 监控集群节点状态和负载分布
- 建立节点故障自动恢复机制
运维管理
- 建立完善的监控告警体系
- 定期备份作业配置和执行历史
- 制定作业执行异常处理流程
- 实现作业执行统计和分析报告
Quartz作为Java生态中最成熟稳定的调度框架,凭借其强大的功能特性、灵活的配置选项和完善的集群支持,在企业级应用中得到了广泛应用,是构建可靠任务调度系统的理想选择。
