特价版线程池ThreadPoolExecutor实现

2021-03-10 05:29

阅读:644

标签:状态   人事   tin   vat   tran   keepalive   move   退出   tun   

  线程池的实现原理无非复用二字,类似数据库连接池,都是将一些重复创建的东西拿来重复使用。其中最关键的问题就两个:一个是怎么复用;一个是怎么回收。在数据库连接池中,一个连接的生命周期是我们可以手动控制的,相对来说容易一些。我们通过使用一个链表来持有连接并复用,超过最大连接数就回收。线程池不同,线程的生命周期不可控,当run方法运行结束了,线程就自然消亡了,因此麻烦一些,我们需要通过一个循环来让run方法不停歇的运行着,跑完一个又一个的任务。线程和任务由worker持有,当一个工人的所有任务(包括队列中的)都跑完了就会被回收。

  JDK8的线程池ThreadPoolExecutor类2000多行,当然其中注释占了大头。其中5个内部类,除去4个饱和策略,分量最重的是一个私有的内部类Worker,我叫它工具人类,它是关键。看下它的说明:

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

  工具人类继承AQS是为了实现不可重入的互斥锁(见标黄外文),因为我们不想要工具人在执行任务时被人打断,比如setCorePoolSize方法里的interruptIdleWorkers方法。

  工具人类实现了Runnable接口是为了让它自己先顶替真正的任务,在runWorker方法中实现线程的复用。工具人投入池中,用自己的线程出力,执行自己的任务,直到超过规定正式员工人数,所有任务进入缓存队列中排队。此时队列是动态的,只要工具人手头任务处理完了,就会从队列中拿到新任务进行处理。一旦任务爆仓,就得外包工具人上场。把ThreadPoolExecutor精简一下,去掉线程池的生命周期,去掉饱和策略,从2000多行变成了300多行。直接看代码:

import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPool implements Executor {

    private AtomicInteger workerCounts = new AtomicInteger(1); // 正在忙着的工具人数量
    private final BlockingQueue workQueue; // 缓存队列,用于缓存超过核心线程数的任务
    private final HashSet workers = new HashSet(); // 工具人池
    private int completedTaskCount = 1; // 已完成任务总数,初始值为1
    private int corePoolSize; // 核心线程数
    private int maxPoolSize; // 最大线程数
    private volatile long keepAliveTime; // 多出核心线程数的线程保命的时间
    private Lock mainLock = new ReentrantLock(); // 线程池的全局锁

    ThreadPool(int corePoolSize, int maxPoolSize, int queueSize, long keepAliveTime, TimeUnit unit) {
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.workQueue = new ArrayBlockingQueue(queueSize);
        this.keepAliveTime = unit.toNanos(keepAliveTime);
    }

    /**
     * 线程池的主方法,也是入口
     *
     * @param command
     */
    @Override
    public void execute(Runnable command) {
        int currentWork = workerCounts.get();

        // 先处理核心线程
        if (currentWork  corePoolSize) {
            if (addWork(command, true)) {
                System.out.println("execute >>> 核心线程数内执行完毕。");
                return;
            }
        }

        // 正式员工忙不过来,就把任务加入缓存队列中
        if (workQueue.offer(command)) {
            System.out.println("execute >>> 已加入缓存队列中,队列中有 " + workQueue.size() + " 个任务。");
            if (workerCounts.get() == 0) { // 当前已经没有活着的工具人了,因为当前任务都跑完了,需要再创建工具人
                addWork(null, false);  // 此时无需给工具人分配任务了,因为任务入队缓存队列中,只需从缓存队列中取出即可
            }
        } else if (!addWork(command, false)) { // 缓存队列满了,让外包工具人处理,如果外包工具人名额也满了,那就真搞不定了
            System.err.println("execute >>> 外包工具人名额也超额,线程池搞不定了。");
        }
    }

    /**
     * 任务入池
     *
     * @param firstTask      初始任务
     * @param isCorePoolSize 是否核心线程数
     * @return 任务是否已执行
     */
    public boolean addWork(Runnable firstTask, boolean isCorePoolSize) {

        // 双循环,通过设置标记来直接从内层循环跳出外层循环
        retry:
        for (; ; ) {
            // 校验任务是否为空.若缓存队列不为空时,任务可以为空,见execute方法
            if (firstTask == null && workQueue.isEmpty())
                return false;

            for (; ; ) {
                int c = workerCounts.get(); // 获取在跑的工具人数量
                System.out.println("addWork >>> 工具人数 : " + c + " 个;是否核心线程数 : " + isCorePoolSize);

                // 若在核心线程数内,判断工具人数是否已超过;同理,已经超过核心线程数,则看是否超过最大任务数
                if (c > (isCorePoolSize ? corePoolSize : maxPoolSize))
                    return false;

                // 先将在忙的工具人数+1,跳出到最外层,执行循环下面的逻辑
                if (compareAndIncrementWorkerCount(c))
                    break retry;
            }
        }

        boolean workerStarted = false; // 任务是否已开始执行
        boolean workerAdded; // 任务是否已添加到池子里
        Worker w = null;
        final Thread t;

        try {
            w = new ThreadPool.Worker(firstTask); // 实例化工具人,将初始任务分配给该工具人
            t = w.thread; // 从工具人那里领取线程,等会儿把任务跑起来
            if (t != null) {
                final Lock mainLock = this.mainLock; // 使用全局锁来添加工具人到池子里
                mainLock.lock();
                try {
                    workers.add(w); // 工具人入池
                    System.out.println("addWork >>> 工具人池里有 " + workers.size() + " 个工具人。");
                    workerAdded = true;
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { // 工具人入池成功,可以让他的线程跑起来了
                    System.out.println("addWork >>> 开始拉起线程,准备进入Workder.run()...");
                    t.start(); // 关键点1:工具人的线程跑起来了,怎么跑?工具人自己的run(),他也是个Runnable
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) // 任务没跑起来,回滚:回收该工具人,在忙着的工具人数-1
                addWorkerFailed(w);
        }
        return workerStarted;

    }

    /**
     * 新增工具人失败,执行回滚操作
     *
     * @param w
     */
    private void addWorkerFailed(Worker w) {
        final Lock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w); // 刚进去的工具人,从哪来回哪去
            decrementWorkerCount(); // 刚加的在忙着的工具人数减回去
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 使用CAS将工具人数+1
     *
     * @param expect
     * @return
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return workerCounts.compareAndSet(expect, expect + 1);
    }

    /**
     * 工具人执行任务。1个关键点。
     *
     * @param worker
     */
    final void runWork(Worker worker) {
        Runnable task = worker.task; // 获取该工具人要执行的任务
        worker.task = null; // 清空该工具人的任务,所以firstTask就是一次的任务,后面都得从缓存队列中取任务
        worker.unlock(); // 这里还是允许中断的
        boolean completedAbruptly = true; // 任务执行是否被打断
        try {
            // 关键点3:线程复用——若任务不为空,或者队列中的任务不为空,工具人的线程将一直执行下去。一旦断了,工具人也就可以去死了
            while (task != null || (task = getTask()) != null) {
                worker.lock(); // 每次跑之前,工具人先锁住(不可重复锁),这里就不允许中断了
                try {
                    task.run();
                    System.out.println("runWork >>> 任务真的跑起来了,目前已完成 " + completedTaskCount + " 个任务。");
                } finally {
                    task = null; // 任务完成,清空
                    worker.completedTasks++; // 累计该工具人所完成的任务数量
                    worker.unlock(); // 工具人解锁(不可重复锁)
                }
            }
            completedAbruptly = false; // 没有被打断
        } finally {
            System.out.println("runWork >>> 这个工具人没任务可做了,要死了...");
            processWorkerExit(worker, completedAbruptly);
        }
    }

    /**
     * 将濒死的工具人从池中剔除
     *
     * @param w
     * @param completedAbruptly 工具人在干活时是否被人打断过
     */
    private void processWorkerExit(ThreadPool.Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 不曾被人打断,在干活的人数-1
            decrementWorkerCount();

        final Lock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks; // 死后记账,把该工具人所完成的任务数汇总到总完成任务数中去
            workers.remove(w); // 回收工具人的尸体
            System.err.println("processWorkerExit >>> 工具人被回收了。");
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * 空转,直到工具人数-1
     */
    private void decrementWorkerCount() {
        do {
        } while (!compareAndDecrementWorkerCount(workerCounts.get()));
    }

    /**
     * 使用CAS将工具人数-1
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return workerCounts.compareAndSet(expect, expect - 1);
    }

    /**
     * 获取队列中的工作任务。两个关键点。
     *
     * @return
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (; ; ) {
            int wc = workerCounts.get(); // 取得在忙工具人数
            boolean timed = wc > corePoolSize; // 超过核心线程池数,超时的外包工具人就危险了

            if ((wc > maxPoolSize || (timed && timedOut)) // 要么超过最大数了,要么出现了要炒掉的外包工具人
                    && (wc > 1 || workQueue.isEmpty())) { // 要么还有忙的工具人,要么缓存队列清空了
                if (compareAndDecrementWorkerCount(wc))
                    return null; // 超时回收这里返回null,runWork就得退出while循环,进入回收工具人阶段,也就是炒人
                continue; // 没有炒掉,继续清点外包工具人
            }

            System.out.println("getTask >>> 工作队列中还有 " + workQueue.size() + " 个任务。");
            try {
                Runnable r = timed ? // 什么时候会出现poll超时?当然是队列为空的时候。关键点4——确认这个外包工具人是否要被裁
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take(); // 缓存队列空的时候,这里会阻塞。关键点5——线程池靠它不让JVM退出
                if (r != null)
                    return r;
                timedOut = true; // 走到这里说明超时都获取不到任务了,那么说明这个外包工具人可以炒掉了
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    /**
     * 工具人,持有线程池中的线程和任务。1个关键点。
     */
    private class Worker extends AbstractQueuedSynchronizer implements Runnable {
        private Thread thread; // 线程,用来处理任务,消费者用来消费任务的
        private Runnable task; // 任务,生产者创建的
        volatile long completedTasks; // 已完成的任务数量

        public Worker(Runnable task) {
            setState(-1); // 设置线程状态:SIGNAL
            this.task = task; // 设置工具人的任务
            thread = new Thread(this); // 创建新线程,这里很关键,必须将工具人本身作为任务,分配给这个线程
        }

        @Override
        public void run() {
            System.out.println("Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...");
            runWork(this); // 关键点2:让工具人线程运行工具人任务:工具人.run() -> 线程池.runWork(工具人) -> 工具人.task.run()
        }

        /**
         * 不可重入锁
         */
        public void lock() {
            acquire(1);
        }

        /**
         * 解不可重入锁
         */
        public void unlock() {
            release(1);
        }

        // 以下都是实现AQS中的方法
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }


    public static void main(String[] args) {
        int threadNum = 2;
        ThreadPool threadPool = new ThreadPool(4, 8, 6, 4, TimeUnit.SECONDS);
        for (int i = 0; i ) {
            final int j = i + 1;
            threadPool.execute(() -> {
                System.out.println("hello, world.我是任务" + (j));
            });
        }
    }
}

  运行结果:

addWork >>> 工具人数 : 1 个;是否核心线程数 : true
addWork >>> 工具人池里有 1 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 2 个;是否核心线程数 : true
addWork >>> 工具人池里有 2 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务1
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务2
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
getTask >>> 工作队列中还有 0 个任务。

  我们只生产了两个任务,没有超过核心线程数4,所以召唤的两个工具人事做完了后,常驻内存了。把main中的threadNum再分别改为10:

addWork >>> 工具人数 : 1 个;是否核心线程数 : true
addWork >>> 工具人池里有 1 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 2 个;是否核心线程数 : true
addWork >>> 工具人池里有 2 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 3 个;是否核心线程数 : true
addWork >>> 工具人池里有 3 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
execute >>> 已加入缓存队列中,队列中有 1 个任务。
execute >>> 已加入缓存队列中,队列中有 2 个任务。
execute >>> 已加入缓存队列中,队列中有 3 个任务。
execute >>> 已加入缓存队列中,队列中有 4 个任务。
execute >>> 已加入缓存队列中,队列中有 5 个任务。
execute >>> 已加入缓存队列中,队列中有 6 个任务。
addWork >>> 工具人数 : 4 个;是否核心线程数 : false
addWork >>> 工具人池里有 4 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务1
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 6 个任务。
hello, world.我是任务4
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 5 个任务。
hello, world.我是任务5
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 4 个任务。
hello, world.我是任务6
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 3 个任务。
hello, world.我是任务7
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 2 个任务。
hello, world.我是任务8
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 1 个任务。
hello, world.我是任务9
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务2
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务3
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务10
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
runWork >>> 这个工具人没任务可做了,要死了...
getTask >>> 工作队列中还有 0 个任务。
getTask >>> 工作队列中还有 0 个任务。
processWorkerExit >>> 工具人被回收了。
getTask >>> 工作队列中还有 0 个任务。

  这次跑了10个任务,核心线程数4,所以只有3个正式工具人被召唤,6个任务进入了缓存队列,最后一个任务召唤出来了一个外包工具人处理第10个任务。正式员工常驻内存,而外包的下场的悲惨的。

 

特价版线程池ThreadPoolExecutor实现

标签:状态   人事   tin   vat   tran   keepalive   move   退出   tun   

原文地址:https://www.cnblogs.com/wuxun1997/p/14157693.html


评论


亲,登录后才可以留言!