Spring定时任务动态取消/创建/修改
2021-06-05 12:03
标签:fun model public tab refresh efi core prefix entry 首先配置ThreadPoolTaskScheduler线程池: 配置定时任务,业务类中注入HiveClusterSyncScheduler即可调用: package cn.jsfund.ngdp.support.batchSchedule; Spring定时任务动态取消/创建/修改 标签:fun model public tab refresh efi core prefix entry 原文地址:https://www.cnblogs.com/itfeng813/p/14629006.htmlpackage cn.demo.support.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
public class ScheduleConfig {
@Bean(destroyMethod = "shutdown")
public ThreadPoolTaskScheduler taskExecutor() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
executor.setPoolSize(20);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
1 package cn.jsfund.ngdp.support.batchSchedule;
2
3 import java.util.ArrayList;
4 import java.util.Date;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.Map.Entry;
8 import java.util.concurrent.ConcurrentHashMap;
9 import java.util.concurrent.ScheduledFuture;
10
11 import javax.annotation.PostConstruct;
12 import javax.annotation.Resource;
13
14 import org.apache.commons.lang.StringUtils;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.jdbc.core.JdbcTemplate;
19 import org.springframework.scheduling.Trigger;
20 import org.springframework.scheduling.TriggerContext;
21 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
22 import org.springframework.scheduling.support.CronSequenceGenerator;
23 import org.springframework.scheduling.support.CronTrigger;
24 import org.springframework.stereotype.Component;
25 import org.springframework.util.ObjectUtils;
26
27 import cn.jsfund.ngdp.support.config.BasicConfig;
28 import cn.jsfund.ngdp.support.exception.ServiceException;
29 import cn.jsfund.ngdp.support.model.bigdata.TaskDef;
30 import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
31
32 @SuppressWarnings("rawtypes")
33 @Component
34 public class HiveClusterSyncScheduler {
35
36 @Resource
37 private JdbcTemplate dmJdbcTemplate;
38
39 @Resource
40 private BasicConfig basicConfig;
41
42 @Resource
43 HiveClusterSyncService hiveClusterSyncService;//业务类
44
45 @Autowired
46 ThreadPoolTaskScheduler threadPoolTaskScheduler;
47
48 private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
49
50 private static final String MAPTASKKEY = "map_task_key";
51
52 private static Map
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.scheduling.Trigger;import org.springframework.scheduling.TriggerContext;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronSequenceGenerator;import org.springframework.scheduling.support.CronTrigger;import org.springframework.stereotype.Component;import org.springframework.util.ObjectUtils;
import cn.jsfund.ngdp.support.config.BasicConfig;import cn.jsfund.ngdp.support.exception.ServiceException;import cn.jsfund.ngdp.support.model.bigdata.TaskDef;import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
@SuppressWarnings("rawtypes")@Componentpublic class HiveClusterSyncScheduler {
@Resource private JdbcTemplate dmJdbcTemplate;
@Resource private BasicConfig basicConfig;
@Resource HiveClusterSyncService hiveClusterSyncService;
@Autowired ThreadPoolTaskScheduler threadPoolTaskScheduler;
private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
private static final String MAPTASKKEY = "map_task_key";
private static Map
@PostConstruct public void init() { refreshTasks(); }
// @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}") public void refreshTasks() {
if (!"true".equals(basicConfig.getBackupEnabled())) { logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************"); return; } for (Entry
logger.info("*************开始扫描数据库,刷新定时任务*************");
List
try { list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null) .getContent(); } catch (Exception e) { logger.info("查询数据库异常,代码执行结束,异常信息:", e); } if (ObjectUtils.isEmpty(list)) { logger.info("查询启动状态的任务记录为空,代码执行结束。"); return; } for (TaskDef taskDef : list) { String taskId = taskDef.getId(); String crontab = taskDef.getCrontab(); if (StringUtils.isBlank(crontab)) { continue; }
TaskThread taskThread = new TaskThread(taskId, crontab); boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab); if (!isValidExp) { logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab); continue; } ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() {
@Override public Date nextExecutionTime(TriggerContext triggerContext) {
return new CronTrigger(crontab).nextExecutionTime(triggerContext); } });
scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); } logger.info("*************刷新定时任务完成*************"); }
//添加计划 public void addTask(String taskId, String crontab) { if (!"true".equals(basicConfig.getBackupEnabled())) { return; } TaskThread taskThread = new TaskThread(taskId, crontab); ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { return new CronTrigger(crontab).nextExecutionTime(triggerContext); } }); scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); }
//取消计划 public void cancelTask(Object... taskId) { for (int i = 0; i //更新计划:先取消再添加 public void updateScheduleTask(String taskId, String crontab) throws ServiceException { if (!"true".equals(basicConfig.getBackupEnabled())) { return; } ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId); if (sf != null) { sf.cancel(false); scheduledFutureMap.remove(MAPTASKKEY + taskId); } TaskThread taskThread = new TaskThread(taskId, crontab); ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread, new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { return new CronTrigger(crontab).nextExecutionTime(triggerContext); } }); scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture); }
class TaskThread extends Thread {
private String taskId;
private String crontab;
public TaskThread(String taskId, String crontab) { this.taskId = taskId; this.crontab = crontab; }
public void run() { try { hiveClusterSyncService.bootTask(taskId, crontab); } catch (Exception e) { logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage()); } } }
}
上一篇:Java 线程实现原理
下一篇:JavaSE总结(3)