@JobHandler(value="offlineTaskJobHandler")
@Component
public class OfflineTaskJobHandler extends IJobHandler {
@Reference(check = false,version = "cms-dev",group="cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log(" offlineTaskJobHandler start.");
try {
offlineTaskExecutorFacade.executeOfflineTask();
} catch (Exception e) {
XxlJobLogger.log("offlineTaskJobHandler-->exception." , e);
return FAIL;
}
XxlJobLogger.log("XXL-JOB, offlineTaskJobHandler end.");
return SUCCESS;
}
}
(滑动可查看)
示例2:分片广播任务。
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
2.4.3.2 整合dubbo
(1)引入dubbo-spring-boot-starter和业务facade jar包依赖。
<dependency>
<groupId>com.alibaba.spring.boot</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.demo.service</groupId>
<artifactId>xxx-facade</artifactId>
<version>1.9-SNAPSHOT</version>
</dependency>
(2)配置文件加入dubbo消费端配置(可根据环境定义多个配置文件,通过profile切换)。
## Dubbo 服务消费者配置
spring.dubbo.application.name=xxl-job
spring.dubbo.registry.address=zookeeper://zookeeper.xyz:2183
spring.dubbo.port=20880
spring.dubbo.version=demo
spring.dubbo.group=demo-service
3)代码中通过@Reference注入facade接口即可。
@Reference(check = false,version = "demo",group="demo-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
4)启动程序加入@EnableDubboConfiguration注解。
@SpringBootApplication
@EnableDubboConfiguration
public class XxlJobExecutorApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobExecutorApplication.class, args);
}
}
2.4.4 任务可视化配置
内置了平台项目,方便了开发者对任务的管理和执行日志的监控,并提供了一些便于测试的功能。
(1)任务监控和报表的优化。
(2)任务报警方式的扩展,比如加入告警中心,提供内部消息,短信告警。
(3)对实际业务内部执行出现异常情况下的不同监控告警和重试策略。
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
可惜的是已经两年没有迭代更新记录。
2.5.3 实践说明
2.5.3.1 demo使用
(1)安装zookeeper,配置注册中心config,配置文件加入注册中心zk的配置。
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
@Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
spring.application.name=demo_elasticjob
regCenter.serverList=localhost:2181
regCenter.namespace=demo_elasticjob
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(2)配置数据源config,并配置文件中加入数据源配置。
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceProperties {
private String url;
private String username;
private String password;
@Bean
@Primary
public DataSource getDataSource() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
}
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl-job?Unicode=true&characterEncoding=UTF-8
spring.datasource.username=user
spring.datasource.password=pwd
(3)配置事件config。
@Configuration
public class JobEventConfig {
@Autowired
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
(4)为了便于灵活配置不同的任务触发事件,加入ElasticSimpleJob注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticSimpleJob {
@AliasFor("cron")
String value() default "";
@AliasFor("value")
String cron() default "";
String jobName() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
}
(5)对配置进行初始化。
@Configuration
@ConditionalOnExpression("'${elaticjob.zookeeper.server-lists}'.length() > 0")
public class ElasticJobAutoConfiguration {
@Value("${regCenter.serverList}")
private String serverList;
@Value("${regCenter.namespace}")
private String namespace;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initElasticJob() {
ZookeeperRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
regCenter.init();
Map<String, SimpleJob> map = applicationContext.getBeansOfType(SimpleJob.class);
for (Map.Entry<String, SimpleJob> entry : map.entrySet()) {
SimpleJob simpleJob = entry.getValue();
ElasticSimpleJob elasticSimpleJobAnnotation = simpleJob.getClass().getAnnotation(ElasticSimpleJob.class);
String cron = StringUtils.defaultIfBlank(elasticSimpleJobAnnotation.cron(), elasticSimpleJobAnnotation.value());
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(simpleJob.getClass().getName(), cron, elasticSimpleJobAnnotation.shardingTotalCount()).shardingItemParameters(elasticSimpleJobAnnotation.shardingItemParameters()).build(), simpleJob.getClass().getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();
JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource);
SpringJobScheduler jobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration, jobEventRdbConfiguration);
jobScheduler.init();
}
}
}
(滑动可查看)
(6)实现 SimpleJob接口,按上文中方法整合dubbo, 完成业务逻辑。
@ElasticSimpleJob(
cron = "*/10 * * * * ?",
jobName = "OfflineTaskJob",
shardingTotalCount = 2,
jobParameter = "测试参数",
shardingItemParameters = "0=A,1=B")
@Component
public class MySimpleJob implements SimpleJob {
Logger logger = LoggerFactory.getLogger(OfflineTaskJob.class);
@Reference(check = false, version = "cms-dev", group = "cms-service")
private OfflineTaskExecutorFacade offlineTaskExecutorFacade;
@Override
public void execute(ShardingContext shardingContext) {
offlineTaskExecutorFacade.executeOfflineTask();
logger.info(String.format("Thread ID: %s, 作业分片总数: %s, " +
"当前分片项: %s.当前参数: %s," +
"作业名称: %s.作业自定义参数: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
));
}
}
(1)Saturn:Saturn是唯品会开源的一个分布式任务调度平台,在Elastic Job的基础上进行了改造。
(2)SIA-TASK:是宜信开源的分布式任务调度平台。
丰富任务监控数据和告警策略。
接入统一登录和权限控制。
进一步简化业务接入步骤。
对于并发场景不是特别高的系统来说,xxl-job配置部署简单易用,不需要引入多余的组件,同时提供了可视化的控制台,使用起来非常友好,是一个比较好的选择。希望直接利用开源分布式框架能力的系统,建议根据自身的情况来进行合适的选型。