ElasticJob和SpringBoot

2021-01-15 00:14

阅读:583

标签:pre   eal   encoding   parameter   average   lis   root   job   dash   

本文以在SpringBoot下集成ElasticJob的方式对其进行浅析,仅仅是简单使用,不涉及源码级别研究。

事先必备:

注册中心——zookeeper

简略结构:

技术图片

 

代码目录结构:

├─.idea
└─src
    └─main
        ├─java
        │  └─com
        │      └─sakura
        │          ├─configuration    --SpringJobScheduler、ZookeeperRegistryCenter
        │          ├─job
        │          │  ├─jobEventConfig    --Job事件监听器
        │          │  └─jobListener    --Job执行监听器
        │          └─properties    --Zookeeper、Job的配置信息
        └─resources    --Zookeeper、Job的配置信息

初始化注册中心:

@Configuration
@Slf4j
public class ZookeeperRegistry {

    @Bean(name = "registryCenter", initMethod = "init")
    public ZookeeperRegistryCenter registryCenter(ZookeeperRegistryProperties registryProperties) {
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(
                registryProperties.getServerLists(), registryProperties.getNamespace());
        zookeeperConfiguration.setDigest(registryProperties.getDigest());
        zookeeperConfiguration.setBaseSleepTimeMilliseconds(registryProperties.getBaseSleepTimeMilliseconds());
        zookeeperConfiguration.setConnectionTimeoutMilliseconds(registryProperties.getConnectionTimeoutMilliseconds());
        zookeeperConfiguration.setMaxRetries(registryProperties.getMaxRetries());
        zookeeperConfiguration.setMaxSleepTimeMilliseconds(registryProperties.getMaxSleepTimeMilliseconds());
        zookeeperConfiguration.setSessionTimeoutMilliseconds(zookeeperConfiguration.getSessionTimeoutMilliseconds());
        log.info("elasticJob注册中心——Zookeeper初始化成功。serverLists={}。nameSpace={}", registryProperties.getServerLists(), registryProperties.getNamespace());
        return new ZookeeperRegistryCenter(zookeeperConfiguration);
    }
}

定义Job(以SimpleJob为例):

@Slf4j
@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("------开始执行定时任务------");
        log.info("jobName:{}", shardingContext.getJobName());
        log.info("taskId:{}", shardingContext.getTaskId());
    }
}

初始化SpringJobScheduler:

@Configuration
@Data
public class SpringJobSchedulerInit {
    private final ZookeeperRegistryCenter registryCenter;
    private final ZookeeperRegistryProperties zookeeperRegistryProperties;
    private final SimpleJobProperties simpleJobProperties;
    private final ElasticJob mySimpleJob;
    private final JobEventConfiguration jobEventConfiguration;

    @Bean(initMethod = "init")
    public SpringJobScheduler springJobScheduler() {
        return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(),
          //Job事件追踪,非必填 jobEventConfiguration,
          //Job执行监听器,非必填
new MySimpleJobListener()); } public LiteJobConfiguration getLiteJobConfiguration() { JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(simpleJobProperties.getJobName(), simpleJobProperties.getCron() , simpleJobProperties.getShardingTotalCount()) .failover(simpleJobProperties.isFailover()) .jobParameter(simpleJobProperties.getJobParameter()) .misfire(true) .shardingItemParameters(simpleJobProperties.getShardingItemParameters()) .build(); JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MySimpleJob.class.getName()); return LiteJobConfiguration.newBuilder(jobTypeConfiguration) .jobShardingStrategyClass(simpleJobProperties.getJobShardingStrategyClass()) .maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds()) .monitorExecution(simpleJobProperties.isMonitorExecution()) .monitorPort(simpleJobProperties.getMonitorPort()) .maxTimeDiffSeconds(simpleJobProperties.getMaxTimeDiffSeconds()) //是否要用本地的配置覆盖掉远程的ElasticJob配置 .overwrite(false) .build(); } }

Job事件追踪——存储到数据库:

@Configuration
@Slf4j
@Data
public class JobEventConfig {
    private final DataSource dataSource;

    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}

Job执行监听器:

@Slf4j
public class MySimpleJobListener implements ElasticJobListener {
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        log.info("Job执行之前:{}", ReflectionToStringBuilder.toString(shardingContexts));
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        log.info("Job执行之后:{}", ReflectionToStringBuilder.toString(shardingContexts));
    }
}

properties配置信息:

@ConfigurationProperties(prefix = "simple.job")
@Data
public class SimpleJobProperties {
    private String cron;
    private int shardingTotalCount;
    private String shardingItemParameters;
    private String jobParameter;
    private boolean failover;
    private boolean monitorExecution;
    private int monitorPort;
    private int maxTimeDiffSeconds;
    private String jobShardingStrategyClass;
    private String jobName;
}
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class ZookeeperRegistryProperties {

    private String serverLists;

    private String namespace;

    private int maxRetries = 3;

    private int connectionTimeoutMilliseconds = 15000;

    private int sessionTimeoutMilliseconds = 60000;

    private int baseSleepTimeMilliseconds = 1000;

    private int maxSleepTimeMilliseconds = 3000;

    private String digest = "";

}
server.port=8099
spring.application.name=elasticJobTest

#ZK
elastic.job.zk.serverLists=192.168.204.140:2181,192.168.204.141:2181,192.168.204.142:2181
elastic.job.zk.namespace=elastic-job

#ElasticJob
simple.job.jobName=simpleJob
simple.job.cron=0/5 * * * * ?
simple.job.shardingTotalCount=3
simple.job.shardingItemParameters=0=beijing,1=shanghai,2=changchun
simple.job.job-parameter=source1=public,source2=private
simple.job.failover=true
simple.job.monitor-execution=true
simple.job.monitor-port=8889
simple.job.max-time-diff-seconds=-1
simple.job.job-sharding-strategy-class=com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

spring.datasource.url=jdbc:mysql://localhost:3306/elasticjob?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC&useSSL=false
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=123456

pom.xml依赖:

技术图片技术图片
org.springframework.boot
            spring-boot-starter
        org.springframework.boot
            spring-boot-starter-web
        org.projectlombok
            lombok
            providedorg.springframework.boot
            spring-boot-configuration-processor
            trueorg.springframework.boot
            spring-boot-starter-jdbc
        mysql
            mysql-connector-java
        com.dangdang
            elastic-job-lite-core
            2.1.5com.dangdang
            elastic-job-lite-spring
            2.1.5
View Code

启动类:

@SpringBootApplication(scanBasePackages = {"com.sakura.*"})
@EnableConfigurationProperties(value = {ZookeeperRegistryProperties.class, SimpleJobProperties.class})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

 

ElasticJob和SpringBoot

标签:pre   eal   encoding   parameter   average   lis   root   job   dash   

原文地址:https://www.cnblogs.com/monument/p/12938925.html


评论


亲,登录后才可以留言!