Spring定时任务动态取消/创建/修改

2021-06-05 12:03

阅读:565

标签:fun   model   public   tab   refresh   efi   core   prefix   entry   

 

首先配置ThreadPoolTaskScheduler线程池:

package cn.demo.support.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class ScheduleConfig {
    
    @Bean(destroyMethod = "shutdown")
    public ThreadPoolTaskScheduler taskExecutor() {
        ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
        executor.setPoolSize(20);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        return executor;
    }
 
}

 

配置定时任务,业务类中注入HiveClusterSyncScheduler即可调用: 

  1 package cn.jsfund.ngdp.support.batchSchedule;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Date;
  5 import java.util.List;
  6 import java.util.Map;
  7 import java.util.Map.Entry;
  8 import java.util.concurrent.ConcurrentHashMap;
  9 import java.util.concurrent.ScheduledFuture;
 10 
 11 import javax.annotation.PostConstruct;
 12 import javax.annotation.Resource;
 13 
 14 import org.apache.commons.lang.StringUtils;
 15 import org.slf4j.Logger;
 16 import org.slf4j.LoggerFactory;
 17 import org.springframework.beans.factory.annotation.Autowired;
 18 import org.springframework.jdbc.core.JdbcTemplate;
 19 import org.springframework.scheduling.Trigger;
 20 import org.springframework.scheduling.TriggerContext;
 21 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 22 import org.springframework.scheduling.support.CronSequenceGenerator;
 23 import org.springframework.scheduling.support.CronTrigger;
 24 import org.springframework.stereotype.Component;
 25 import org.springframework.util.ObjectUtils;
 26 
 27 import cn.jsfund.ngdp.support.config.BasicConfig;
 28 import cn.jsfund.ngdp.support.exception.ServiceException;
 29 import cn.jsfund.ngdp.support.model.bigdata.TaskDef;
 30 import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
 31 
 32 @SuppressWarnings("rawtypes")
 33 @Component
 34 public class HiveClusterSyncScheduler {
 35 
 36     @Resource
 37     private JdbcTemplate dmJdbcTemplate;
 38 
 39     @Resource
 40     private BasicConfig basicConfig;
 41 
 42     @Resource
 43     HiveClusterSyncService hiveClusterSyncService;//业务类
 44 
 45     @Autowired
 46     ThreadPoolTaskScheduler threadPoolTaskScheduler;
 47 
 48     private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
 49 
 50     private static final String MAPTASKKEY = "map_task_key";
 51 
 52     private static Map scheduledFutureMap = new ConcurrentHashMap();
 53 
 54     @PostConstruct
 55     public void init() {
 56         refreshTasks();
 57     }
 58 
 59     //    @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}")
 60     public void refreshTasks() {
 61 
 62         if (!"true".equals(basicConfig.getBackupEnabled())) {
 63             logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************");
 64             return;
 65         }
 66         for (Entry entry : scheduledFutureMap.entrySet()) {
 67             ScheduledFuture sf = entry.getValue();
 68             if (sf != null) {
 69                 sf.cancel(false);
 70             }
 71         }
 72         scheduledFutureMap.clear();
 73 
 74         logger.info("*************开始扫描数据库,刷新定时任务*************");
 75 
 76         List list = new ArrayList();
 77 
 78         try {
 79             list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null)
 80                     .getContent();
 81         } catch (Exception e) {
 82             logger.info("查询数据库异常,代码执行结束,异常信息:", e);
 83         }
 84         if (ObjectUtils.isEmpty(list)) {
 85             logger.info("查询启动状态的任务记录为空,代码执行结束。");
 86             return;
 87         }
 88         for (TaskDef taskDef : list) {
 89             String taskId = taskDef.getId();
 90             String crontab = taskDef.getCrontab();
 91             if (StringUtils.isBlank(crontab)) {
 92                 continue;
 93             }
 94 
 95             TaskThread taskThread = new TaskThread(taskId, crontab);
 96             boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab);
 97             if (!isValidExp) {
 98                 logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab);
 99                 continue;
100             }
101             ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,
102                     new Trigger() {
103 
104                         @Override
105                         public Date nextExecutionTime(TriggerContext triggerContext) {
106 
107                             return new CronTrigger(crontab).nextExecutionTime(triggerContext);
108                         }
109                     });
110 
111             scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);
112         }
113         logger.info("*************刷新定时任务完成*************");
114     }
115 
116     //添加计划
117     public void addTask(String taskId, String crontab) {
118         if (!"true".equals(basicConfig.getBackupEnabled())) {
119             return;
120         }
121         TaskThread taskThread = new TaskThread(taskId, crontab);
122         ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,
123                 new Trigger() {
124                     @Override
125                     public Date nextExecutionTime(TriggerContext triggerContext) {
126                         return new CronTrigger(crontab).nextExecutionTime(triggerContext);
127                     }
128                 });
129         scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);
130     }
131 
132     //取消计划
133     public void cancelTask(Object... taskId) {
134         for (int i = 0; i ) {
135             ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId[i]);
136             if (sf != null) {
137                 sf.cancel(false);
138                 scheduledFutureMap.remove(MAPTASKKEY + taskId[i]);
139             }
140         }
141     }
142 
143     //更新计划:先取消再添加
144     public void updateScheduleTask(String taskId, String crontab) throws ServiceException {
145         if (!"true".equals(basicConfig.getBackupEnabled())) {
146             return;
147         }
148         ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId);
149         if (sf != null) {
150             sf.cancel(false);
151             scheduledFutureMap.remove(MAPTASKKEY + taskId);
152         }
153         TaskThread taskThread = new TaskThread(taskId, crontab);
154         ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,
155                 new Trigger() {
156                     @Override
157                     public Date nextExecutionTime(TriggerContext triggerContext) {
158                         return new CronTrigger(crontab).nextExecutionTime(triggerContext);
159                     }
160                 });
161         scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);
162     }
163 
164     class TaskThread extends Thread {
165 
166         private String taskId;
167 
168         private String crontab;
169 
170         public TaskThread(String taskId, String crontab) {
171             this.taskId = taskId;
172             this.crontab = crontab;
173         }
174 
175         public void run() {
176             try {
177                 hiveClusterSyncService.bootTask(taskId, crontab);
178             } catch (Exception e) {
179                 logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage());
180             }
181         }
182     }
183 
184 }

 

package cn.jsfund.ngdp.support.batchSchedule;
import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.scheduling.Trigger;import org.springframework.scheduling.TriggerContext;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.scheduling.support.CronSequenceGenerator;import org.springframework.scheduling.support.CronTrigger;import org.springframework.stereotype.Component;import org.springframework.util.ObjectUtils;
import cn.jsfund.ngdp.support.config.BasicConfig;import cn.jsfund.ngdp.support.exception.ServiceException;import cn.jsfund.ngdp.support.model.bigdata.TaskDef;import cn.jsfund.ngdp.support.web.service.bigdata.HiveClusterSyncService;
@SuppressWarnings("rawtypes")@Componentpublic class HiveClusterSyncScheduler {
    @Resource    private JdbcTemplate dmJdbcTemplate;
    @Resource    private BasicConfig basicConfig;
    @Resource    HiveClusterSyncService hiveClusterSyncService;
    @Autowired    ThreadPoolTaskScheduler threadPoolTaskScheduler;
    private static final Logger logger = LoggerFactory.getLogger(HiveClusterSyncScheduler.class);
    private static final String MAPTASKKEY = "map_task_key";
    private static Map scheduledFutureMap = new ConcurrentHashMap();
    @PostConstruct    public void init() {        refreshTasks();    }
    //    @Scheduled(cron = "${bigData.hive.backup.schduler.refresh.crontab}")    public void refreshTasks() {
        if (!"true".equals(basicConfig.getBackupEnabled())) {            logger.info("*************hive集群同步开关未开启,结束计划任务刷新.*************");            return;        }        for (Entry entry : scheduledFutureMap.entrySet()) {            ScheduledFuture sf = entry.getValue();            if (sf != null) {                sf.cancel(false);            }        }        scheduledFutureMap.clear();
        logger.info("*************开始扫描数据库,刷新定时任务*************");
        List list = new ArrayList();
        try {            list = hiveClusterSyncService.queryTaskDefs(null, null, null, null, null, "1", null)                    .getContent();        } catch (Exception e) {            logger.info("查询数据库异常,代码执行结束,异常信息:", e);        }        if (ObjectUtils.isEmpty(list)) {            logger.info("查询启动状态的任务记录为空,代码执行结束。");            return;        }        for (TaskDef taskDef : list) {            String taskId = taskDef.getId();            String crontab = taskDef.getCrontab();            if (StringUtils.isBlank(crontab)) {                continue;            }
            TaskThread taskThread = new TaskThread(taskId, crontab);            boolean isValidExp = CronSequenceGenerator.isValidExpression(crontab);            if (!isValidExp) {                logger.error("当前cron表达式无效,taskId={},cronExpression={}", taskId, crontab);                continue;            }            ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                    new Trigger() {
                        @Override                        public Date nextExecutionTime(TriggerContext triggerContext) {
                            return new CronTrigger(crontab).nextExecutionTime(triggerContext);                        }                    });
            scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);        }        logger.info("*************刷新定时任务完成*************");    }
    //添加计划    public void addTask(String taskId, String crontab) {        if (!"true".equals(basicConfig.getBackupEnabled())) {            return;        }        TaskThread taskThread = new TaskThread(taskId, crontab);        ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                new Trigger() {                    @Override                    public Date nextExecutionTime(TriggerContext triggerContext) {                        return new CronTrigger(crontab).nextExecutionTime(triggerContext);                    }                });        scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);    }
    //取消计划    public void cancelTask(Object... taskId) {        for (int i = 0; i     //更新计划:先取消再添加    public void updateScheduleTask(String taskId, String crontab) throws ServiceException {        if (!"true".equals(basicConfig.getBackupEnabled())) {            return;        }        ScheduledFuture sf = scheduledFutureMap.get(MAPTASKKEY + taskId);        if (sf != null) {            sf.cancel(false);            scheduledFutureMap.remove(MAPTASKKEY + taskId);        }        TaskThread taskThread = new TaskThread(taskId, crontab);        ScheduledFuture> scheduledFuture = threadPoolTaskScheduler.schedule(taskThread,                new Trigger() {                    @Override                    public Date nextExecutionTime(TriggerContext triggerContext) {                        return new CronTrigger(crontab).nextExecutionTime(triggerContext);                    }                });        scheduledFutureMap.put(MAPTASKKEY + taskId, scheduledFuture);    }
    class TaskThread extends Thread {
        private String taskId;
        private String crontab;
        public TaskThread(String taskId, String crontab) {            this.taskId = taskId;            this.crontab = crontab;        }
        public void run() {            try {                hiveClusterSyncService.bootTask(taskId, crontab);            } catch (Exception e) {                logger.info("任务(taskId={})结束初始化,异常信息:{}", taskId, e.getMessage());            }        }    }
}

 

Spring定时任务动态取消/创建/修改

标签:fun   model   public   tab   refresh   efi   core   prefix   entry   

原文地址:https://www.cnblogs.com/itfeng813/p/14629006.html


评论


亲,登录后才可以留言!