Skip to content

Elastic-Job

Elastic-Job是当当网开源的分布式作业调度框架,基于Quartz和ZooKeeper实现,提供分片、弹性资源利用、作业监控、失效转移等企业级功能。

核心特性

分布式调度

  • 作业分片 - 任务自动分片,支持多节点并行执行
  • 弹性扩容 - 动态增减节点,自动重新分片
  • 故障转移 - 节点失效时自动转移分片到其他节点
  • 负载均衡 - 智能分配作业分片到不同节点

作业管理

  • 定时调度 - 基于Cron表达式的定时执行
  • 依赖作业 - 支持作业间的依赖关系
  • 串并行 - 支持串行和并行执行模式
  • 幂等性 - 保证作业执行的幂等性

运维监控

  • 作业监控 - 实时监控作业执行状态
  • 统计报告 - 作业执行统计和性能分析
  • 事件追踪 - 完整的作业执行事件链路
  • 失效转移 - 自动检测和处理节点故障

架构设计

核心组件

点击查看完整代码实现
Elastic-Job架构:
├── Elastic-Job-Lite
│   ├── 作业执行引擎
│   ├── 分片策略
│   ├── 故障转移
│   └── 事件监听
├── ZooKeeper
│   ├── 服务器列表
│   ├── 作业配置
│   ├── 分片信息
│   └── 故障转移
├── 作业服务器
│   ├── 作业实例
│   ├── 分片执行
│   ├── 心跳检测
│   └── 状态上报
└── 运维平台
    ├── 作业管理
    ├── 监控展示
    ├── 统计分析
    └── 配置管理

分片机制

作业分片策略:
├── 平均分配策略
│   ├── 按服务器数量平均分配
│   ├── 余数处理
│   └── 负载均衡
├── 作业名哈希策略
│   ├── 按作业名哈希分配
│   ├── 固定分片归属
│   └── 一致性保证
└── 轮转分配策略
    ├── 轮询分配分片
    ├── 动态调整
    └── 负载均衡

快速开始

Maven依赖

xml
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

作业开发

简单作业

点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.api.ShardingContext;

public class MySimpleJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.println("作业名称: " + shardingContext.getJobName());
        System.out.println("分片项: " + shardingContext.getShardingItem());
        System.out.println("分片参数: " + shardingContext.getShardingParameter());
        System.out.println("分片总数: " + shardingContext.getShardingTotalCount());
        
        // 执行业务逻辑
        processBusinessLogic(shardingContext.getShardingItem());
    }
    
    private void processBusinessLogic(int shardingItem) {
        // 根据分片项处理不同的业务逻辑
        switch (shardingItem) {
            case 0:
                System.out.println("处理用户数据");
                break;
            case 1:
                System.out.println("处理订单数据");
                break;
            case 2:
                System.out.println("处理商品数据");
                break;
            default:
                System.out.println("处理其他数据");
        }
    }
}

数据流作业

点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.ShardingContext;
import java.util.List;

public class MyDataflowJob implements DataflowJob<User> {
    
    @Override
    public List<User> fetchData(ShardingContext shardingContext) {
        // 获取待处理数据
        int shardingItem = shardingContext.getShardingItem();
        return userService.fetchUnprocessedUsers(shardingItem);
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<User> data) {
        // 处理数据
        for (User user : data) {
            System.out.println("处理用户: " + user.getName());
            // 处理业务逻辑
            userService.processUser(user);
        }
    }
}

脚本作业

点击查看完整代码实现
java
import com.dangdang.ddframe.job.api.script.ScriptJob;
import com.dangdang.ddframe.job.api.ShardingContext;

public class MyScriptJob implements ScriptJob {
    
    @Override
    public void execute(ShardingContext shardingContext) {
        // 执行脚本
        String scriptCommand = shardingContext.getJobParameter();
        
        try {
            Process process = Runtime.getRuntime().exec(scriptCommand);
            int exitCode = process.waitFor();
            
            if (exitCode == 0) {
                System.out.println("脚本执行成功");
            } else {
                System.err.println("脚本执行失败,退出码: " + exitCode);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

作业配置

Java配置方式

点击查看完整代码实现
java
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

@Configuration
public class ElasticJobConfig {
    
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
        return new ZookeeperRegistryCenter(
            new ZookeeperConfiguration("localhost:2181", "elastic-job-demo"));
    }
    
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(ZookeeperRegistryCenter regCenter) {
        
        // 作业核心配置
        JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(
                "mySimpleJob", "0/10 * * * * ?", 3)
                .shardingItemParameters("0=A,1=B,2=C")
                .build();
        
        // 简单作业配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(
                coreConfig, MySimpleJob.class.getCanonicalName());
        
        // Lite作业配置
        LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig)
                .overwrite(true)
                .build();
        
        return new JobScheduler(regCenter, jobConfig);
    }
}

Spring XML配置

点击查看完整代码实现
xml
<!-- 注册中心 -->
<reg:zookeeper id="regCenter" 
    server-lists="localhost:2181" 
    namespace="elastic-job-example" 
    base-sleep-time-milliseconds="1000" 
    max-sleep-time-milliseconds="3000" 
    max-retries="3" />

<!-- 简单作业配置 -->
<job:simple id="mySimpleJob" 
    class="com.example.MySimpleJob" 
    registry-center-ref="regCenter" 
    cron="0/10 * * * * ?" 
    sharding-total-count="3" 
    sharding-item-parameters="0=A,1=B,2=C" />

<!-- 数据流作业配置 -->
<job:dataflow id="myDataflowJob" 
    class="com.example.MyDataflowJob" 
    registry-center-ref="regCenter" 
    cron="0/10 * * * * ?" 
    sharding-total-count="3" 
    process-count-interval-seconds="10" 
    concurrent-data-process-thread-count="10" />

作业属性配置

点击查看完整代码实现
properties
# 作业配置
job.name=myElasticJob
job.cron=0/30 * * * * ?
job.shardingTotalCount=3
job.shardingItemParameters=0=A,1=B,2=C
job.jobParameter=param1

# 故障转移
job.failover=true
job.misfire=true

# 监控
job.monitorExecution=true
job.monitorPort=9888

# 最大时间差
job.maxTimeDiffSeconds=60

# 作业分片策略
job.jobShardingStrategyClass=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

# 线程池配置
job.executorServiceHandler=com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler

分片策略

内置分片策略

点击查看完整代码实现
java
// 平均分配策略(默认)
@Component
public class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(
            List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
        
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>();
        int itemCountPerSharding = shardingTotalCount / jobInstances.size();
        int offsetCount = shardingTotalCount % jobInstances.size();
        
        int index = 0;
        for (JobInstance jobInstance : jobInstances) {
            List<Integer> shardingItems = new ArrayList<>();
            
            for (int i = 0; i < itemCountPerSharding; i++) {
                shardingItems.add(index++);
            }
            
            if (offsetCount > 0) {
                shardingItems.add(index++);
                offsetCount--;
            }
            
            result.put(jobInstance, shardingItems);
        }
        
        return result;
    }
}

自定义分片策略

点击查看完整代码实现
java
public class CustomJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(
            List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
        
        Map<JobInstance, List<Integer>> result = new HashMap<>();
        
        // 按照服务器IP进行分片
        Collections.sort(jobInstances, (o1, o2) -> o1.getIp().compareTo(o2.getIp()));
        
        for (int i = 0; i < jobInstances.size(); i++) {
            List<Integer> shardingItems = new ArrayList<>();
            
            // 每个实例分配连续的分片
            int start = (shardingTotalCount / jobInstances.size()) * i;
            int end = i == jobInstances.size() - 1 ? shardingTotalCount : start + (shardingTotalCount / jobInstances.size());
            
            for (int j = start; j < end; j++) {
                shardingItems.add(j);
            }
            
            result.put(jobInstances.get(i), shardingItems);
        }
        
        return result;
    }
}

事件监听

作业监听器

点击查看完整代码实现
java
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;

public class MyJobListener implements ElasticJobListener {
    
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("作业执行前: " + shardingContexts.getJobName());
        
        // 记录开始时间
        long startTime = System.currentTimeMillis();
        shardingContexts.setTaskId(String.valueOf(startTime));
        
        // 其他前置处理
        prepareResources();
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("作业执行后: " + shardingContexts.getJobName());
        
        // 计算执行时间
        long startTime = Long.parseLong(shardingContexts.getTaskId());
        long duration = System.currentTimeMillis() - startTime;
        System.out.println("执行耗时: " + duration + "ms");
        
        // 清理资源
        cleanupResources();
    }
    
    private void prepareResources() {
        // 准备资源
    }
    
    private void cleanupResources() {
        // 清理资源
    }
}

分布式监听器

点击查看完整代码实现
java
import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener;

public class MyDistributeListener extends AbstractDistributeOnceElasticJobListener {
    
    public MyDistributeListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }
    
    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("最后一个作业开始前执行");
        
        // 全局前置处理
        globalPrepare();
    }
    
    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("最后一个作业完成后执行");
        
        // 全局后置处理
        globalCleanup();
    }
    
    private void globalPrepare() {
        // 全局准备工作
    }
    
    private void globalCleanup() {
        // 全局清理工作
    }
}

运维管理

运维控制台

Elastic-Job提供了Web运维控制台,用于作业管理和监控:

xml
<!-- 运维控制台依赖 -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-console</artifactId>
    <version>2.1.5</version>
</dependency>

功能特性:

  • 作业维度:作业配置、触发、暂停、删除等操作
  • 服务器维度:查看作业服务器状态,禁用服务器等操作
  • 分片维度:查看分片和服务器分配关系
  • 历史轨迹:查看作业历史执行轨迹和执行状态

API管理

点击查看完整代码实现
java
// 获取作业API
JobAPIFactory jobAPIFactory = new JobAPIFactory(regCenter);
JobOperateAPI jobOperateAPI = jobAPIFactory.createJobOperateAPI();

// 触发作业
jobOperateAPI.trigger(Optional.of("myJob"), Optional.of("0"));

// 禁用作业  
jobOperateAPI.disable(Optional.of("myJob"), Optional.of("server01"));

// 启用作业
jobOperateAPI.enable(Optional.of("myJob"), Optional.of("server01"));

// 删除作业
jobOperateAPI.remove("myJob", Optional.of("server01"));

统计分析

java
// 作业统计API
JobStatisticsAPI jobStatisticsAPI = jobAPIFactory.createJobStatisticsAPI();

// 获取作业总数
int totalCount = jobStatisticsAPI.getJobsTotalCount();

// 获取OK数量
int okCount = jobStatisticsAPI.getJobsOkCount();  

// 获取CRASHED数量
int crashedCount = jobStatisticsAPI.getJobsCrashedCount();

// 获取分片统计信息
JobBriefInfo jobBrief = jobStatisticsAPI.getJobBriefInfo("myJob");

监控告警

JMX监控

java
// 开启JMX监控
LiteJobConfiguration jobConfig = LiteJobConfiguration.newBuilder(simpleJobConfig)
    .monitorExecution(true)
    .monitorPort(9888)
    .build();

自定义监控

点击查看完整代码实现
java
@Component
public class JobMonitor {
    
    private final MeterRegistry meterRegistry;
    
    public JobMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordJobExecution(String jobName, boolean success, long duration) {
        Tags tags = Tags.of("job", jobName, "status", success ? "success" : "failure");
        
        // 记录执行次数
        Counter.builder("job.execution.count")
            .tags(tags)
            .register(meterRegistry)
            .increment();
        
        // 记录执行时间
        Timer.builder("job.execution.duration")
            .tags(tags)
            .register(meterRegistry)
            .record(duration, TimeUnit.MILLISECONDS);
    }
}

告警配置

点击查看完整代码实现
java
@Component  
public class JobAlarmHandler {
    
    public void handleJobFailure(String jobName, String serverIp, Throwable cause) {
        // 发送告警邮件
        sendAlarmEmail(jobName, serverIp, cause);
        
        // 发送钉钉告警
        sendDingTalkAlarm(jobName, serverIp, cause);
        
        // 记录告警日志
        logAlarm(jobName, serverIp, cause);
    }
    
    private void sendAlarmEmail(String jobName, String serverIp, Throwable cause) {
        // 实现邮件告警
    }
    
    private void sendDingTalkAlarm(String jobName, String serverIp, Throwable cause) {
        // 实现钉钉告警
    }
    
    private void logAlarm(String jobName, String serverIp, Throwable cause) {
        // 记录告警日志
    }
}

应用场景

数据处理

  • 大批量数据处理和迁移
  • 定时数据同步和备份
  • 数据清洗和转换
  • 报表生成和统计分析

业务处理

  • 定时任务和批处理
  • 订单超时处理
  • 用户积分结算
  • 系统资源清理

监控运维

  • 系统健康检查
  • 日志文件清理
  • 缓存预热和刷新
  • 定时监控和告警

最佳实践

作业设计

  • 保证作业幂等性,支持重复执行
  • 合理设计分片策略和分片参数
  • 避免分片间的数据依赖
  • 实现作业执行状态检查

性能优化

  • 合理设置分片数量,避免过多或过少
  • 优化数据处理逻辑,提高执行效率
  • 使用数据流作业处理大量数据
  • 监控作业执行时间和资源使用

运维管理

  • 建立完善的监控和告警体系
  • 定期检查作业执行状态和日志
  • 制定作业故障应急处理流程
  • 建立作业执行统计和分析

高可用部署

  • 部署多个作业服务器实现高可用
  • 配置ZooKeeper集群保证注册中心可用
  • 实现作业执行监控和自动恢复
  • 建立数据备份和恢复机制

Elastic-Job凭借其强大的分片功能、完善的故障转移机制和丰富的监控特性,成为Java生态中重要的分布式作业调度框架,特别适合需要高可用性和弹性扩展的企业级批处理场景。

正在精进