ElasticJob和SpringBoot
2021-01-15 00:14
标签:pre eal encoding parameter average lis root job dash 本文以在SpringBoot下集成ElasticJob的方式对其进行浅析,仅仅是简单使用,不涉及源码级别研究。 注册中心——zookeeper ElasticJob和SpringBoot 标签:pre eal encoding parameter average lis root job dash 原文地址:https://www.cnblogs.com/monument/p/12938925.html事先必备:
简略结构:
代码目录结构:
├─.idea
└─src
└─main
├─java
│ └─com
│ └─sakura
│ ├─configuration --SpringJobScheduler、ZookeeperRegistryCenter
│ ├─job
│ │ ├─jobEventConfig --Job事件监听器
│ │ └─jobListener --Job执行监听器
│ └─properties --Zookeeper、Job的配置信息
└─resources --Zookeeper、Job的配置信息
初始化注册中心:
@Configuration
@Slf4j
public class ZookeeperRegistry {
@Bean(name = "registryCenter", initMethod = "init")
public ZookeeperRegistryCenter registryCenter(ZookeeperRegistryProperties registryProperties) {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
registryProperties.getServerLists(), registryProperties.getNamespace());
zookeeperConfiguration.setDigest(registryProperties.getDigest());
zookeeperConfiguration.setBaseSleepTimeMilliseconds(registryProperties.getBaseSleepTimeMilliseconds());
zookeeperConfiguration.setConnectionTimeoutMilliseconds(registryProperties.getConnectionTimeoutMilliseconds());
zookeeperConfiguration.setMaxRetries(registryProperties.getMaxRetries());
zookeeperConfiguration.setMaxSleepTimeMilliseconds(registryProperties.getMaxSleepTimeMilliseconds());
zookeeperConfiguration.setSessionTimeoutMilliseconds(zookeeperConfiguration.getSessionTimeoutMilliseconds());
log.info("elasticJob注册中心——Zookeeper初始化成功。serverLists={}。nameSpace={}", registryProperties.getServerLists(), registryProperties.getNamespace());
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
定义Job(以SimpleJob为例):
@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("------开始执行定时任务------");
log.info("jobName:{}", shardingContext.getJobName());
log.info("taskId:{}", shardingContext.getTaskId());
}
}
初始化SpringJobScheduler:
@Configuration
@Data
public class SpringJobSchedulerInit {
private final ZookeeperRegistryCenter registryCenter;
private final ZookeeperRegistryProperties zookeeperRegistryProperties;
private final SimpleJobProperties simpleJobProperties;
private final ElasticJob mySimpleJob;
private final JobEventConfiguration jobEventConfiguration;
@Bean(initMethod = "init")
public SpringJobScheduler springJobScheduler() {
return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(),
//Job事件追踪,非必填
jobEventConfiguration,
//Job执行监听器,非必填
new MySimpleJobListener());
}
public LiteJobConfiguration getLiteJobConfiguration() {
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(simpleJobProperties.getJobName(), simpleJobProperties.getCron()
, simpleJobProperties.getShardingTotalCount())
.failover(simpleJobProperties.isFailover())
.jobParameter(simpleJobProperties.getJobParameter())
.misfire(true)
.shardingItemParameters(simpleJobProperties.getShardingItemParameters())
.build();
JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MySimpleJob.class.getName());
return LiteJobConfiguration.newBuilder(jobTypeConfiguration)
.jobShardingStrategyClass(simpleJobProperties.getJobShardingStrategyClass())
.maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds())
.monitorExecution(simpleJobProperties.isMonitorExecution())
.monitorPort(simpleJobProperties.getMonitorPort())
.maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds())
//是否要用本地的配置覆盖掉远程的ElasticJob配置
.overwrite(false)
.build();
}
}Job事件追踪——存储到数据库:
@Configuration
@Slf4j
@Data
public class JobEventConfig {
private final DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
Job执行监听器:
@Slf4j
public class MySimpleJobListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
log.info("Job执行之前:{}", ReflectionToStringBuilder.toString(shardingContexts));
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("Job执行之后:{}", ReflectionToStringBuilder.toString(shardingContexts));
}
}
properties配置信息:
@ConfigurationProperties(prefix = "simple.job")
@Data
public class SimpleJobProperties {
private String cron;
private int shardingTotalCount;
private String shardingItemParameters;
private String jobParameter;
private boolean failover;
private boolean monitorExecution;
private int monitorPort;
private int maxTimeDiffSeconds;
private String jobShardingStrategyClass;
private String jobName;
}
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class ZookeeperRegistryProperties {
private String serverLists;
private String namespace;
private int maxRetries = 3;
private int connectionTimeoutMilliseconds = 15000;
private int sessionTimeoutMilliseconds = 60000;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private String digest = "";
}
server.port=8099
spring.application.name=elasticJobTest
#ZK
elastic.job.zk.serverLists=192.168.204.140:2181,192.168.204.141:2181,192.168.204.142:2181
elastic.job.zk.namespace=elastic-job
#ElasticJob
simple.job.jobName=simpleJob
simple.job.cron=0/5 * * * * ?
simple.job.shardingTotalCount=3
simple.job.shardingItemParameters=0=beijing,1=shanghai,2=changchun
simple.job.job-parameter=source1=public,source2=private
simple.job.failover=true
simple.job.monitor-execution=true
simple.job.monitor-port=8889
simple.job.max-time-diff-seconds=-1
simple.job.job-sharding-strategy-class=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
spring.datasource.url=jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=123456
pom.xml依赖:
启动类:
@SpringBootApplication(scanBasePackages = {"com.sakura.*"})
@EnableConfigurationProperties(value = {ZookeeperRegistryProperties.class, SimpleJobProperties.class})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}