[java]定时任务代码注释
2021-05-30 15:04
标签:less sign signal nis keepalive dea unit 技术 EDA 对于ScheduledFutureTask类的继承关系如下图所示: 先看线程池的调度: 判断线程池处于关闭状态的话就拒绝任务,否则查看是否需要执行,需要执行则执行调度否则取消任务; 这里启动的是worker的run方法,worker继承关系如图: 这个woker方法中运行了ScheduledFutureTask的run方法,查看它的方法可知,它在周期性的情况下,执行后会将自己状态重置放入等待队列; 大体的流程就是:任务执行完毕后,会将自己重新放到等待队列中等待调度,同时再次调用ensurePrestart方法添加线程,如果线程池数量多于1的时候其实会添加出新的线程,这个线程也会去消费任务队列中的任务,消费的流程也是 一样的,如果一直只有一个定时任务,由于定时任务的执行是完毕后才会将自己放进队列,所以第二个线程相当于始终消费不到(第一个线程获取到任务的时候激活第二个线程,第二个线程再次陷入等待,第一个线程执行完任务后立即重新获取任务进行等待,所以如果仅有一个任务的情景,第二个线程无论如何也无法消费到任务,除非debug) [java]定时任务代码注释 标签:less sign signal nis keepalive dea unit 技术 EDA 原文地址:https://www.cnblogs.com/suifengek/p/14748611.html计时任务实现
//参数说明:线程池参数,要执行的方法,初始延迟时间,周期时间,时间单位
new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(() -> {
System.out.println("hello world");
}, 0, 5, TimeUnit.DAYS);
java代码实现说明
/**
首先参数如上所示,进行一些常规的参数校验,创建ScheduledFutureTask对象,该对象保留指令,计算结果(周期任务不关心结果,所以为null),初始延迟时间,周期时间,并对任务绑定一个序列号
**/
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period sft =
new ScheduledFutureTask
可以看是它实际上时Runnable对象,调度时执行的是它的run方法;private void delayedExecute(RunnableScheduledFuture> task) {
if (isShutdown())
reject(task);
else {
// 加入任务队列,任务队列对象为:ScheduledThreadPoolExecutor的内部静态类DelayedWorkQueue对象(实现阻塞的关键)
super.getQueue().add(task);
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
// 执行调度
ensurePrestart();
}
}
// 判断线程池的尺寸,对任务进行添加(这里直接空添加,后续会直接选择队列中的任务执行)
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc
// 线程池中加入任务的核心算法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动任务运行,调用的是Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
本身也是个runnable对象,查看它的run方法:public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 在这里反复的获取task,这里的task及之前添加的ScheduledFutureTask对象
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这里默认的是空方法
beforeExecute(wt, task);
try {
// 运行task的run方法(在这里,task运行完毕后又将自己添加进等待队列)
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
public void run() {
// 如果不能运行了就取消
if (!canRunInCurrentRunState(this))
cancel(false);
else if (!isPeriodic())
// 调度超类的run方法,主要是判断状态,执行指令,记录结果
super.run();
// 如果是周期任务的话,则不需要记录结果,同时,执行完毕后需要重置状态
else if (super.runAndReset()) {
//设置下一次运行时间
setNextRunTime();
// 运行outerTask(这个之前设置的是自己)
reExecutePeriodic(outerTask);
}
}
void reExecutePeriodic(RunnableScheduledFuture> task) {
if (canRunInCurrentRunState(task)) {
// 队列加入任务
super.getQueue().add(task);
if (canRunInCurrentRunState(task) || !remove(task)) {
// 这里又要周期性的调度,这里的
ensurePrestart();
return;
}
}
task.cancel(false);
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 在这里阻塞
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
public RunnableScheduledFuture> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture> first = queue[0];
if (first == null)
// 如果有第二个线程,在这里等待被激活,激活后再次循环
available.await();
else {
//获得任务延迟时间
long delay = first.getDelay(NANOSECONDS);
if (delay