[java]定时任务代码注释

2021-05-30 15:04

阅读:384

标签:less   sign   signal   nis   keepalive   dea   unit   技术   EDA   

计时任务实现

//参数说明:线程池参数,要执行的方法,初始延迟时间,周期时间,时间单位
new ScheduledThreadPoolExecutor(1).scheduleAtFixedRate(() -> {
            System.out.println("hello world");
        }, 0, 5, TimeUnit.DAYS);

java代码实现说明

/**
首先参数如上所示,进行一些常规的参数校验,创建ScheduledFutureTask对象,该对象保留指令,计算结果(周期任务不关心结果,所以为null),初始延迟时间,周期时间,并对任务绑定一个序列号
**/
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period  sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period),
                                      sequencer.getAndIncrement());
    // 这里返回的还是sft,只是换了的引用类型罢了
    RunnableScheduledFuture t = decorateTask(command, sft);
    // 设置自己的出任务为自己(周期性)
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

对于ScheduledFutureTask类的继承关系如下图所示:
技术图片
可以看是它实际上时Runnable对象,调度时执行的是它的run方法;

先看线程池的调度:

private void delayedExecute(RunnableScheduledFuture> task) {
    if (isShutdown())
        reject(task);
    else {
        // 加入任务队列,任务队列对象为:ScheduledThreadPoolExecutor的内部静态类DelayedWorkQueue对象(实现阻塞的关键)
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(task) && remove(task))
            task.cancel(false);
        else
            // 执行调度
            ensurePrestart();
    }
}

判断线程池处于关闭状态的话就拒绝任务,否则查看是否需要执行,需要执行则执行调度否则取消任务;

// 判断线程池的尺寸,对任务进行添加(这里直接空添加,后续会直接选择队列中的任务执行)
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc 
// 线程池中加入任务的核心算法
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();

                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    workerAdded = true;
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动任务运行,调用的是Worker的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里启动的是worker的run方法,worker继承关系如图:技术图片
本身也是个runnable对象,查看它的run方法:

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 在这里反复的获取task,这里的task及之前添加的ScheduledFutureTask对象
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 这里默认的是空方法
                    beforeExecute(wt, task);
                    try {
                        // 运行task的run方法(在这里,task运行完毕后又将自己添加进等待队列)
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这个woker方法中运行了ScheduledFutureTask的run方法,查看它的方法可知,它在周期性的情况下,执行后会将自己状态重置放入等待队列;

public void run() {
    // 如果不能运行了就取消
    if (!canRunInCurrentRunState(this))
        cancel(false);
    else if (!isPeriodic())
        // 调度超类的run方法,主要是判断状态,执行指令,记录结果
        super.run();
    // 如果是周期任务的话,则不需要记录结果,同时,执行完毕后需要重置状态
    else if (super.runAndReset()) {
        //设置下一次运行时间
        setNextRunTime();
        // 运行outerTask(这个之前设置的是自己)
        reExecutePeriodic(outerTask);
    }
}
void reExecutePeriodic(RunnableScheduledFuture> task) {
    if (canRunInCurrentRunState(task)) {
        // 队列加入任务
        super.getQueue().add(task);
        if (canRunInCurrentRunState(task) || !remove(task)) {
            // 这里又要周期性的调度,这里的
            ensurePrestart();
            return;
        }
    }
    task.cancel(false);
}

大体的流程就是:任务执行完毕后,会将自己重新放到等待队列中等待调度,同时再次调用ensurePrestart方法添加线程,如果线程池数量多于1的时候其实会添加出新的线程,这个线程也会去消费任务队列中的任务,消费的流程也是 一样的,如果一直只有一个定时任务,由于定时任务的执行是完毕后才会将自己放进队列,所以第二个线程相当于始终消费不到(第一个线程获取到任务的时候激活第二个线程,第二个线程再次陷入等待,第一个线程执行完任务后立即重新获取任务进行等待,所以如果仅有一个任务的情景,第二个线程无论如何也无法消费到任务,除非debug)

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            	// 在这里阻塞
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
public RunnableScheduledFuture> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture> first = queue[0];
            if (first == null)
                // 如果有第二个线程,在这里等待被激活,激活后再次循环
                available.await();
            else {
                //获得任务延迟时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay 

[java]定时任务代码注释

标签:less   sign   signal   nis   keepalive   dea   unit   技术   EDA   

原文地址:https://www.cnblogs.com/suifengek/p/14748611.html


评论


亲,登录后才可以留言!