Elastic-Job
Elastic-Job是当当网开源的分布式作业调度框架,基于Quartz和ZooKeeper实现,提供分片、弹性资源利用、作业监控、失效转移等企业级功能。
核心特性
分布式调度
- 作业分片 - 任务自动分片,支持多节点并行执行
- 弹性扩容 - 动态增减节点,自动重新分片
- 故障转移 - 节点失效时自动转移分片到其他节点
- 负载均衡 - 智能分配作业分片到不同节点
作业管理
- 定时调度 - 基于Cron表达式的定时执行
- 依赖作业 - 支持作业间的依赖关系
- 串并行 - 支持串行和并行执行模式
- 幂等性 - 保证作业执行的幂等性
运维监控
- 作业监控 - 实时监控作业执行状态
- 统计报告 - 作业执行统计和性能分析
- 事件追踪 - 完整的作业执行事件链路
- 失效转移 - 自动检测和处理节点故障
架构设计
核心组件
点击查看完整代码实现
Elastic-Job架构:
├── Elastic-Job-Lite
│ ├── 作业执行引擎
│ ├── 分片策略
│ ├── 故障转移
│ └── 事件监听
├── ZooKeeper
│ ├── 服务器列表
│ ├── 作业配置
│ ├── 分片信息
│ └── 故障转移
├── 作业服务器
│ ├── 作业实例
│ ├── 分片执行
│ ├── 心跳检测
│ └── 状态上报
└── 运维平台
├── 作业管理
├── 监控展示
├── 统计分析
└── 配置管理分片机制
作业分片策略:
├── 平均分配策略
│ ├── 按服务器数量平均分配
│ ├── 余数处理
│ └── 负载均衡
├── 作业名哈希策略
│ ├── 按作业名哈希分配
│ ├── 固定分片归属
│ └── 一致性保证
└── 轮转分配策略
├── 轮询分配分片
├── 动态调整
└── 负载均衡快速开始
Maven依赖
xml
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>作业开发
简单作业
点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.api.ShardingContext;
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("作业名称: " + shardingContext.getJobName());
System.out.println("分片项: " + shardingContext.getShardingItem());
System.out.println("分片参数: " + shardingContext.getShardingParameter());
System.out.println("分片总数: " + shardingContext.getShardingTotalCount());
// 执行业务逻辑
processBusinessLogic(shardingContext.getShardingItem());
}
private void processBusinessLogic(int shardingItem) {
// 根据分片项处理不同的业务逻辑
switch (shardingItem) {
case 0:
System.out.println("处理用户数据");
break;
case 1:
System.out.println("处理订单数据");
break;
case 2:
System.out.println("处理商品数据");
break;
default:
System.out.println("处理其他数据");
}
}
}数据流作业
点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.ShardingContext;
import java.util.List;
public class MyDataflowJob implements DataflowJob<User> {
@Override
public List<User> fetchData(ShardingContext shardingContext) {
// 获取待处理数据
int shardingItem = shardingContext.getShardingItem();
return userService.fetchUnprocessedUsers(shardingItem);
}
@Override
public void processData(ShardingContext shardingContext, List<User> data) {
// 处理数据
for (User user : data) {
System.out.println("处理用户: " + user.getName());
// 处理业务逻辑
userService.processUser(user);
}
}
}脚本作业
点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.script.ScriptJob;
import com.dangdang.ddframe.job.api.ShardingContext;
public class MyScriptJob implements ScriptJob {
@Override
public void execute(ShardingContext shardingContext) {
// 执行脚本
String scriptCommand = shardingContext.getJobParameter();
try {
Process process = Runtime.getRuntime().exec(scriptCommand);
int exitCode = process.waitFor();
if (exitCode == 0) {
System.out.println("脚本执行成功");
} else {
System.err.println("脚本执行失败,退出码: " + exitCode);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}作业配置
Java配置方式
点击查看完整代码实现
java
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
@Configuration
public class ElasticJobConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
return new ZookeeperRegistryCenter(
new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(ZookeeperRegistryCenter regCenter) {
// 作业核心配置
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(
"mySimpleJob", "0/10 * * * * ?", 3)
.shardingItemParameters("0=A,1=B,2=C")
.build();
// 简单作业配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(
coreConfig, MySimpleJob.class.getCanonicalName());
// Lite作业配置
LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig)
.overwrite(true)
.build();
return new JobScheduler(regCenter, jobConfig);
}
}Spring XML配置
点击查看完整代码实现
xml
<!-- 注册中心 -->
<reg:zookeeper id="regCenter"
server-lists="localhost:2181"
namespace="elastic-job-example"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000"
max-retries="3" />
<!-- 简单作业配置 -->
<job:simple id="mySimpleJob"
class="com.example.MySimpleJob"
registry-center-ref="regCenter"
cron="0/10 * * * * ?"
sharding-total-count="3"
sharding-item-parameters="0=A,1=B,2=C" />
<!-- 数据流作业配置 -->
<job:dataflow id="myDataflowJob"
class="com.example.MyDataflowJob"
registry-center-ref="regCenter"
cron="0/10 * * * * ?"
sharding-total-count="3"
process-count-interval-seconds="10"
concurrent-data-process-thread-count="10" />作业属性配置
点击查看完整代码实现
properties
# 作业配置
job.name=myElasticJob
job.cron=0/30 * * * * ?
job.shardingTotalCount=3
job.shardingItemParameters=0=A,1=B,2=C
job.jobParameter=param1
# 故障转移
job.failover=true
job.misfire=true
# 监控
job.monitorExecution=true
job.monitorPort=9888
# 最大时间差
job.maxTimeDiffSeconds=60
# 作业分片策略
job.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
# 线程池配置
job.executorServiceHandler=com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler分片策略
内置分片策略
点击查看完整代码实现
java
// 平均分配策略(默认)
@Component
public class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(
List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>();
int itemCountPerSharding = shardingTotalCount / jobInstances.size();
int offsetCount = shardingTotalCount % jobInstances.size();
int index = 0;
for (JobInstance jobInstance : jobInstances) {
List<Integer> shardingItems = new ArrayList<>();
for (int i = 0; i < itemCountPerSharding; i++) {
shardingItems.add(index++);
}
if (offsetCount > 0) {
shardingItems.add(index++);
offsetCount--;
}
result.put(jobInstance, shardingItems);
}
return result;
}
}自定义分片策略
点击查看完整代码实现
java
public class CustomJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(
List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new HashMap<>();
// 按照服务器IP进行分片
Collections.sort(jobInstances, (o1, o2) -> o1.getIp().compareTo(o2.getIp()));
for (int i = 0; i < jobInstances.size(); i++) {
List<Integer> shardingItems = new ArrayList<>();
// 每个实例分配连续的分片
int start = (shardingTotalCount / jobInstances.size()) * i;
int end = i == jobInstances.size() - 1 ? shardingTotalCount : start + (shardingTotalCount / jobInstances.size());
for (int j = start; j < end; j++) {
shardingItems.add(j);
}
result.put(jobInstances.get(i), shardingItems);
}
return result;
}
}事件监听
作业监听器
点击查看完整代码实现
java
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
public class MyJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("作业执行前: " + shardingContexts.getJobName());
// 记录开始时间
long startTime = System.currentTimeMillis();
shardingContexts.setTaskId(String.valueOf(startTime));
// 其他前置处理
prepareResources();
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("作业执行后: " + shardingContexts.getJobName());
// 计算执行时间
long startTime = Long.parseLong(shardingContexts.getTaskId());
long duration = System.currentTimeMillis() - startTime;
System.out.println("执行耗时: " + duration + "ms");
// 清理资源
cleanupResources();
}
private void prepareResources() {
// 准备资源
}
private void cleanupResources() {
// 清理资源
}
}分布式监听器
点击查看完整代码实现
java
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;
public class MyDistributeListener extends AbstractDistributeOnceElasticJobListener {
public MyDistributeListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
System.out.println("最后一个作业开始前执行");
// 全局前置处理
globalPrepare();
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
System.out.println("最后一个作业完成后执行");
// 全局后置处理
globalCleanup();
}
private void globalPrepare() {
// 全局准备工作
}
private void globalCleanup() {
// 全局清理工作
}
}运维管理
运维控制台
Elastic-Job提供了Web运维控制台,用于作业管理和监控:
xml
<!-- 运维控制台依赖 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-console</artifactId>
<version>2.1.5</version>
</dependency>功能特性:
- 作业维度:作业配置、触发、暂停、删除等操作
- 服务器维度:查看作业服务器状态,禁用服务器等操作
- 分片维度:查看分片和服务器分配关系
- 历史轨迹:查看作业历史执行轨迹和执行状态
API管理
点击查看完整代码实现
java
// 获取作业API
JobAPIFactory jobAPIFactory = new JobAPIFactory(regCenter);
JobOperateAPI jobOperateAPI = jobAPIFactory.createJobOperateAPI();
// 触发作业
jobOperateAPI.trigger(Optional.of("myJob"), Optional.of("0"));
// 禁用作业
jobOperateAPI.disable(Optional.of("myJob"), Optional.of("server01"));
// 启用作业
jobOperateAPI.enable(Optional.of("myJob"), Optional.of("server01"));
// 删除作业
jobOperateAPI.remove("myJob", Optional.of("server01"));统计分析
java
// 作业统计API
JobStatisticsAPI jobStatisticsAPI = jobAPIFactory.createJobStatisticsAPI();
// 获取作业总数
int totalCount = jobStatisticsAPI.getJobsTotalCount();
// 获取OK数量
int okCount = jobStatisticsAPI.getJobsOkCount();
// 获取CRASHED数量
int crashedCount = jobStatisticsAPI.getJobsCrashedCount();
// 获取分片统计信息
JobBriefInfo jobBrief = jobStatisticsAPI.getJobBriefInfo("myJob");监控告警
JMX监控
java
// 开启JMX监控
LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig)
.monitorExecution(true)
.monitorPort(9888)
.build();自定义监控
点击查看完整代码实现
java
@Component
public class JobMonitor {
private final MeterRegistry meterRegistry;
public JobMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordJobExecution(String jobName, boolean success, long duration) {
Tags tags = Tags.of("job", jobName, "status", success ? "success" : "failure");
// 记录执行次数
Counter.builder("job.execution.count")
.tags(tags)
.register(meterRegistry)
.increment();
// 记录执行时间
Timer.builder("job.execution.duration")
.tags(tags)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}
}告警配置
点击查看完整代码实现
java
@Component
public class JobAlarmHandler {
public void handleJobFailure(String jobName, String serverIp, Throwable cause) {
// 发送告警邮件
sendAlarmEmail(jobName, serverIp, cause);
// 发送钉钉告警
sendDingTalkAlarm(jobName, serverIp, cause);
// 记录告警日志
logAlarm(jobName, serverIp, cause);
}
private void sendAlarmEmail(String jobName, String serverIp, Throwable cause) {
// 实现邮件告警
}
private void sendDingTalkAlarm(String jobName, String serverIp, Throwable cause) {
// 实现钉钉告警
}
private void logAlarm(String jobName, String serverIp, Throwable cause) {
// 记录告警日志
}
}应用场景
数据处理
- 大批量数据处理和迁移
- 定时数据同步和备份
- 数据清洗和转换
- 报表生成和统计分析
业务处理
- 定时任务和批处理
- 订单超时处理
- 用户积分结算
- 系统资源清理
监控运维
- 系统健康检查
- 日志文件清理
- 缓存预热和刷新
- 定时监控和告警
最佳实践
作业设计
- 保证作业幂等性,支持重复执行
- 合理设计分片策略和分片参数
- 避免分片间的数据依赖
- 实现作业执行状态检查
性能优化
- 合理设置分片数量,避免过多或过少
- 优化数据处理逻辑,提高执行效率
- 使用数据流作业处理大量数据
- 监控作业执行时间和资源使用
运维管理
- 建立完善的监控和告警体系
- 定期检查作业执行状态和日志
- 制定作业故障应急处理流程
- 建立作业执行统计和分析
高可用部署
- 部署多个作业服务器实现高可用
- 配置ZooKeeper集群保证注册中心可用
- 实现作业执行监控和自动恢复
- 建立数据备份和恢复机制
Elastic-Job凭借其强大的分片功能、完善的故障转移机制和丰富的监控特性,成为Java生态中重要的分布式作业调度框架,特别适合需要高可用性和弹性扩展的企业级批处理场景。
