Elastic-Job分布式定时任务

Elastic-Job是当当网大牛基于Zookepper,Quartz开发并且开源的Java分布式定时任务,解决Quartz不支持分布式的弊端。它由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

基本概念

  • 分片概念:任务分布式的执行,需要将一个任务拆分成多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项
  • 个性化参数:shardingItemParameter,可以和分片项匹配对应关系。比如:将商品的状态分成上架,下架。那么配置0=上架,1=下架,代码中直接使用上架下架的枚举值即可完成分片项与业务逻辑的对应关系
  • 作用高可用:将分片总数设置成1,多台服务器执行作业将采用1主n从的方式执行
  • 弹性扩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服器加入集群或有服务器宕机。Elastic-Job将保留本次任务不变,下次任务开始前重新分片。
  • 并行调度:采用任务分片方式实现。将一个任务拆分为n个独立的任务项,由分布式的服务器并行执行各自分配到的分片项。
  • 集中管理:采用基于zookepper的注册中心,集中管理和协调分布式作业的状态,分配和监听。外部系统可直接根据Zookeeper的数据管理和监控elastic-job。
  • 定制化流程任务:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。

20200423145846

Elastic-Job的具体模块的底层及如何实现

Elastic-Job采用去中心化设计,主要分为注册中心、数据分片、分布式协调、定时任务处理和定制化流程型任务等模块。

去中心化:指Elastic-Job没有调度中心这一概念。每个运行在集群中的作业服务器都是对等的,节点之间通过注册中心进行分布式协调。
但elastic-job有主节点的概念,主节点用于处理一些集中式任务,如分片,清理运行时信息等,并无调度功能,定时调度都是由作业服务器自行触发。

| 中心化 | 去中心化
-|-|-
实现难度 | 难 | 易
部署难度 | 难 | 易
触发时间统一控制 | 可以 | 不可以
触发延迟 | 有 | 无
异构语言支持 | 容易 | 困难

** 注册中心** :注册中心模块目前直接使用zookeeper,用于记录作业的配置,服务器信息以及作业运行状态。Zookeeper虽然很成熟,但原理复杂,使用较难,在海量数据支持的情况下也会有性能和网络问题。
** 数据分片** :数据分片是elastic-job中实现分布式的重要概念,将真实数据和逻辑分片对应,用于解耦作业框架和数据的关系。作业框架只负责将分片合理的分配给相关的作业服务器,而作业服务器需要根据所分配的分片匹配数据进行处理。服务器分片目前都存储在注册中心中,各个服务器根据自己的IP地址拉取分片。
** 分布式协调** :分布式协调模块用于处理作业服务器的动态扩容缩容。一旦集群中有服务器发生变化,分布式协调将自动监测并将变化结果通知仍存活的作业服务器。协调时将会涉及主节点选举,重分片等操作。目前使用的Zookeeper的临时节点和监听器实现主动检查和通知功能。
** 定时任务处理** :定时任务处理根据cron表达式定时触发任务,目前有防止任务同时触发,错过任务重出发等功能。主要还是使用Quartz本身的定时调度功能,为了便于控制,每个任务都使用独立的线程池。
** 定制化流程型任务** :定制化流程型任务将定时任务分为多种流程,有不经任何修饰的简单任务;有用于处理数据的fetchData/processData的数据流任务;以后还将增加消息流任务,文件任务,工作流任务等。用户能以插件的形式扩展并贡献代码。

作业开发

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。
Simple类型的作业:该类型意为简单实现,只需实现SimpleJob接口,重写它的execute方法即可
Dataflow类型作业:用于处理数据流,实现DataflowJob接口,并重写两个方法——用于抓取(fetchData方法)和处理(processData方法)数据。比如在fetchData方法里面查询没有上架的商品,在processData方法修改该商品的状态。
注意:可通过DataflowJobConfiguration配置是否流式处理。当配置成流式处理,fetchData方法返回值(返回值是集合)是null或长度是0,作业才停止抓取,否则将一直运行。非流式的则每次作业只执行一次这两个方法就结束该作业。
Script类型作业:意为脚本类型作业,支持shell、python、perl等类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。

Demo

创建一个项目 配置:

1
2
3
4
5
6
7
8
spring:
application:
name: job1

#配置zookeeper
regCenter:
serverList: localhost:2181
namespace: job1

注册中心配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 配置zookeeper
*/
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0") //ConditionalOnExpression决定是否生效
public class JobRegistryCenterConfig {

@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}

}

定义一个任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 一个任务
*/
public class MySimpleJob implements SimpleJob {
Logger logger = LoggerFactory.getLogger(MySimpleJob.class);

private String name;

public MySimpleJob() {
super();
}
public MySimpleJob(String name) {
this.name = name;
}
@Override
public void execute(ShardingContext shardingContext) {
logger.info(String.format("任务:%s "+
"Thread ID: %s 作业分片总数: %s " +
"当前分片项: %s 当前参数: %s " +
"作业名称: %s 作业自定义参数: %s "
,
this.name,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}

配置任务作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 配置任务
*/
@Configuration
public class MyJobConfig {
/**
* 配置任务的时候,这里定义了四个参数,分别是:
*
* cron:cron表达式,用于控制作业触发时间。
* shardingTotalCount:作业分片总数
* shardingItemParameters:分片序列号和参数用等号分隔,多个键值对用逗号分隔
* 分片序列号从0开始,不可大于或等于作业分片总数
*
*/
private final String cron = "0/10 * * * * ?"; //每十秒
private final int shardingTotalCount = 2;//作业分片总数
private final String shardingItemParameters = "0=A,1=B";//分片序列号和参数用等号分隔,多个键值对用逗号分隔
private final String jobParameters = "hello";//作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业

@Autowired
private ZookeeperRegistryCenter regCenter;

@Bean
public SimpleJob stockJob() {
// 任务
return new MySimpleJob("One");
}

//初始化作业
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(),
cron, shardingTotalCount, shardingItemParameters, jobParameters));
}

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount).
shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return simpleJobRootConfig;

}
}

启动后,两个分片交替执行,并且每次都是新启动一个线程执行作业。

如果,修改端口,启动两个实例,查看日志如下:

1
2
3
4
5
6
7
8
2019-07-30 16:20:50.227  INFO 14088 --- [b.MySimpleJob-2] com.hu.job.MySimpleJob                   : 任务:One  Thread ID: 57   作业分片总数: 2   当前分片项: 1   当前参数: B  作业名称: com.hu.job.MySimpleJob   作业自定义参数: hello  
2019-07-30 16:20:50.226 INFO 14088 --- [b.MySimpleJob-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 56 作业分片总数: 2 当前分片项: 0 当前参数: A 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:00.112 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:10.052 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:20.036 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:30.039 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:40.048 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
2019-07-30 16:21:50.043 INFO 14088 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : 任务:One Thread ID: 26 作业分片总数: 2 当前分片项: 1 当前参数: B 作业名称: com.hu.job.MySimpleJob 作业自定义参数: hello
1
2
3
4
2019-07-30 16:21:00.181  INFO 3348 --- [pleJob_Worker-1] com.hu.job.MySimpleJob                   : ▒▒▒▒One  Thread ID: 20   ▒▒ҵ▒▒Ƭ▒▒▒▒: 2   ▒▒ǰ▒▒Ƭ▒▒: 0   ▒▒ǰ▒▒▒▒: A  ▒▒ҵ▒▒▒▒: com.hu.job.MySimpleJob   ▒▒ҵ▒Զ▒▒▒▒▒▒: hello
2019-07-30 16:21:10.065 INFO 3348 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : ▒▒▒▒One Thread ID: 20 ▒▒ҵ▒▒Ƭ▒▒▒▒: 2 ▒▒ǰ▒▒Ƭ▒▒: 0 ▒▒ǰ▒▒▒▒: A ▒▒ҵ▒▒▒▒: com.hu.job.MySimpleJob ▒▒ҵ▒Զ▒▒▒▒▒▒: hello
2019-07-30 16:21:20.048 INFO 3348 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : ▒▒▒▒One Thread ID: 20 ▒▒ҵ▒▒Ƭ▒▒▒▒: 2 ▒▒ǰ▒▒Ƭ▒▒: 0 ▒▒ǰ▒▒▒▒: A ▒▒ҵ▒▒▒▒: com.hu.job.MySimpleJob ▒▒ҵ▒Զ▒▒▒▒▒▒: hello
2019-07-30 16:21:30.052 INFO 3348 --- [pleJob_Worker-1] com.hu.job.MySimpleJob : ▒▒▒▒One Thread ID: 20 ▒▒ҵ▒▒Ƭ▒▒▒▒: 2 ▒▒ǰ▒▒Ƭ▒▒: 0 ▒▒ǰ▒▒▒▒: A ▒▒ҵ▒▒▒▒: com.hu.job.MySimpleJob ▒▒ҵ▒Զ▒▒▒▒▒▒: hello

可以看出,刚开始没有启动第二个实例的时候,分片交替作业,,当第二个实例启动以后,由于分成两片,就分别执行。

注册中心数据结构

20200423145907

作业名称节点下又包含4个数据子节点,分别是config, instances, sharding, servers和leader。

config节点

作业配置信息,以JSON格式存储

instances节点

作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。

sharding节点

作业分片信息,子节点是分片项序号,从零开始,至分片总数减一。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。节点详细信息说明:

子节点名 临时节点 描述
instance 执行该分片项的作业运行实例主键
running 分片项正在运行的状态 仅配置monitorExecution时有效
failover 如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器IP
misfire 是否开启错过任务重新执行
disabled 是否禁用此分片项

servers节点

作业服务器信息,子节点是作业服务器的IP地址。可在IP地址节点写入DISABLED表示该服务器禁用。 在新的cloud native架构下,servers节点大幅弱化,仅包含控制服务器是否可以禁用这一功能。为了更加纯粹的实现job核心,servers功能未来可能删除,控制服务器是否禁用的能力应该下放至自动化部署系统。

leader节点

作业服务器主节点信息,分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。

leader节点是内部使用的节点,如果对作业框架原理不感兴趣,可不关注此节点。

子节点名 临时节点 描述
election\instance 主节点服务器IP地址
一旦该节点被删除将会触发重新选举
重新选举的过程中一切主节点相关的操作都将阻塞
election\latch 主节点选举的分布式锁
为curator的分布式锁使用
sharding\necessary 是否需要重新分片的标记
如果分片总数变化,或作业服务器节点上下线或启用/禁用,以及主节点选举,会触发设置重分片标记
作业在下次执行时使用主节点重新分片,且中间不会被打断
作业执行时不会触发分片
sharding\processing 主节点在分片时持有的节点
如果有此节点,所有的作业执行都将阻塞,直至分片结束
主节点分片结束或主节点崩溃会删除此临时节点
failover\items\分片项 一旦有作业崩溃,则会向此节点记录
当有空闲作业服务器时,会从此节点抓取需失效转移的作业项
failover\items\latch 分配失效转移分片项时占用的分布式锁
为curator的分布式锁使用

https://blog.csdn.net/qq924862077/article/details/82956790

代码见:https://github.com/huingsn/tech-point-record