Java 并发编程 --- ThreadPoolExecutor(五)

2021-06-15 14:04

阅读:697

标签:holding   cer   限制   释放   his   二次   add   row   readwrite   

使用线程池的好处

引用自 http://ifeve.com/java-threadpool/ 的说明:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java中的线程池是用ThreadPoolExecutor类来实现的. 本文就结合JDK 1.8对该类的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理。ThreadPoolExecutor结构如下图:

技术分享图片

Executor接口

此接口提供了一种将任务提交与每个任务的运行机制分离的方法,包括线程使用,调度等的详细信息。该接口中只有execute(Runnable command)方法,用来替代通常创建或启动线程的方法。例如使用Thread创建线程

Thread thread = new Thread();
thread.start();

使用execute创建运行线程,具体的线程执行会由相应的实现类去执行(jdk默认线程池execute的实现是由ThreadPoolExecutor来实现的)

Thread thread = new Thread();
executor.execute(thread);

ExecutorService接口

 技术分享图片

ExecutorService接口提供管理终止的方法和可以生成Future的方法,用于跟踪一个或多个异步任务的进度, 它继承了Executor接口,同时增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。

shutDown() : 允许之前提交的任务继续执行(执行完后shutDown,不会再接收新的任务)

shutDownNow():立即停止正在执行的任务

invokeAll():执行给定的任务,当所有任务完成后返回任务状态和结果的Futures列表
invokeAny():执行给定的任务,返回已完成的任务的结果 submit():提交线程

AbstractExecutorService类

ExecutorService接口的默认实现,同时也是线程池实现类ThreadPoolExecutor的父类,主要看下submit()方法与invokeAll()方法:

submit:

/**不管参数是Callable还是Runable, 执行方法都一样,生成一个task,然后执行task,execute方法的具体实现在ThreadPoolExecutor中,后续分析**/
public  Future submit(Callable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

invokeAll :

/**代码很简单,将给定的任务线程封装成Future对象,等待所有任务执行完成,统一返回Future对象,如果出现异常,会将未完成的任务取消**/
public  List> invokeAll(Collection extends Callable> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList> futures = new ArrayList>(tasks.size());
    boolean done = false;
    try {
        for (Callable t : tasks) {
            RunnableFuture f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i ) {
            Future f = futures.get(i);
            if (!f.isDone()) {
                try {
            /** 没有完成,阻塞**/
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i )
                futures.get(i).cancel(true);
    }
}

ThreadPoolExecutor类

在关注ThreadPoolExecutor之前,先来了解下线程的基本状态信息。

线程总的来说有NEW(初始)、RUNNABLE(运行)、WAITING(等待)、TIME_WAITING(超时等待)、BLOCKED(阻塞)、TERMINATED(终止)6种状态。

NEW:初始状态,线程被构建,但是还没有调用 start 方法

RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为“运行中” BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况 等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中 其他阻塞:运行的线程执行 Thread.sleep 或者 Thread.join 方法,或者发出了 I

/O请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、
io 处理完毕则线程恢复

WAITING:等待,需要主动唤醒 TIME_WAITING:超时等待状态,超时以后自动返回. TERMINATED:终止状态,表示当前线程执行完毕

具体的转化关系如下图:

技术分享图片

对于线程池而言,也有五种种不同的状态,分别为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

RUNNING:运行状态,可以处理任务,并且接收任务(前提阻塞队列处于未满状态,阻塞队列一旦满了,会根据相应的饱和策略进行不同的处理)

SHUTDOWN:关闭状态,不能接收新的任务,但是能处理队列中的任务(shutdow方法)

STOP:停止状态,不能接收行的任务,不能处理队列中的任务并且会中断正在运行的任务(shutdownNow方法)

TIDYING:所有的任务都终止了,workCount为0,会进入该状态,将调用terminated方法进入TERMINATED状态

TERMINATED:terminated()方法执行完成

各个状态之间的转化关系(借用这里的图)

技术分享图片

 

ThreadPoolExcecutor类有一些重要的属性:

corePoolSize:线程池中核心线程的数量

maximumPoolSize:线程池中最大线程的数量

defaultHandler:默认的线程池饱和执行策略,一般是阻塞队列满了后且没有空闲线程,再有任务提交是抛出异常,还是直接丢弃等,默认的策略是抛出异:

ctl:对线程池运行状态以及线程池中有效线程数进行记录的一个原子性int变量,主要记录两部分:线程池中的有效线程(workerCount);线程的状态(runstate)包含运行,shutdown
等状态。该变量的高3位用来记录runstate,低29位用来记录有效线程数(约5亿条)(其实这个地方与ReentReadWriteLock中的state变量相似)

COUNT_BITS:workerCount计数位数,低29位

CAPACITY:workerCount的最大值2^29 - 1

饱和策略(内部类)

ThreadPoolExecutor中提供了四种可选择的饱和策略(拒绝策略),用来处理阻塞队列已满且没有空闲线程,后续新来任务的处理

AbortPolicy:直接抛出异常(默认策略)

CallerRunsPolicy:用调用者所在的线程执行任务

CallerRunsPolicy:丢弃队列中最靠前的任务,执行该任务

DiscardPolicy:直接丢弃

worker类(内部类)

worker类是实现线程池的重要类,它继承了AQS类并实现了Runnable接口,结构如下:

技术分享图片

Worker内部类主要是用来将运行线程封装,维护运行任务线程中断状态的类,该类继承了AQS类并实现了Runnable接口

变量:

firstTask: 提交的任务线程;

thread: worker类封装后的线程,用来处理任务线程;

completeTasks: 完成的任务数;

构造方法:

Worker(Runnable firstTask) {
   /**初始化锁的获取次数**/ setState(
-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

 获取锁、释放锁

从Worker类获取锁的方式可以看到worker类只会去获取独占锁,也就是说不支持重入的,这也是为什么Worker不直接使用ReentrantLock的原因,ReentrantLock是可重入的;当worker获取到锁时表明工作线程正在运行,不允许中断(可以在runWorker中查看);

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

 

构造方法

ThreadPoolExecutor总共有四种构造方法

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,
                          RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,
                          ThreadFactory threadFactory)

/**所有的构造方法调用的都是该方法**/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue,
                          ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize 0 ||
        maximumPoolSize 0 ||
        maximumPoolSize 
        keepAliveTime 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

字段说明:
corePoolSize:线程池初始化核心线程数

maximumPoolSize:线程池最大线程数

keepAliveTime:空闲线程存活时间

workQueue:存放任务的队列(阻塞队列)

threadFactory:线程池的类型

handler:饱和处理策略

execute方法

执行给定的任务,可能是用的是新创建的线程,也可能是已存在的线程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /**获取ctl,记录workCount以及runState, 为32**/   
    int c = ctl.get();
    /**判断线程池中的线程数是否小于核心线程数**/
    if (workerCountOf(c)  corePoolSize) {
     /**添加一个工作线程线程**/
        if (addWorker(command, true))
            return;
     /**添加失败重新获取ctl**/
        c = ctl.get();
    }
   /**线程池是运行状态,并且线程成功添加到队列(线程池中线程数大于核心线程或者小于核心线程且添加线程失败)**/
    if (isRunning(c) && workQueue.offer(command)) {
     /**重新获取ctl**/
        int recheck = ctl.get();
     /**该处的二次检查是为了防止线程池被shutdown或者上次检查后有线程死亡**/
     /**重新判断线程池是否是运行状态,如果不是运行状态,将成功添加到队列中的线程从队列中移除,同时通过对应的饱和策略处理**/
        if (! isRunning(recheck) && remove(command))
       /**执行拒绝策略**/
            reject(command);
     /**如果工作线程为0,执行添加工作线程操作**/
        else if (workerCountOf(recheck) == 0)
        /**添加一个工作线程但不启动**/
            addWorker(null, false);
    }
   /** 执行到这里说有存在两种情况
     * 1.线程池是running状态,工作线程数大于核心线程数且阻塞队列已满导致添加任务失败。
     * 2.线程池不是工作状态
   **/
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      /**获取线程池的运行状态**/
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 判断是否需要添加新的线程(不在添加需要满足两个条件:rs >= shutdown; 第二个条件整体为false)
          * 1.rs >= SHUTDOWN 即线程池是shutdown、stop、tidying、terminated状态,表示线程池不在接收新的任务。
          *
          * 2.rs == SHUTDOWN 即线程池不在接收新的任务;firstTask == null 即提交执行的线程为空;!workQueue.isEmpty() 即阻塞队列不为空只要三个条件有
          *    一个不满足,则返回false。
          *   2.1. 能执行到这里表名rs一定是>=SHUTDOWN的,如果rs不是SHUTDOWN状态,线程池不会接受新的任务,以及正在处理的任务一会停掉,所以不需要添加新的
          *        工作线程。
          *   2.2. fistTask为空,没必要为该任务创建新的工作线程
          *   2.3. 阻塞队列为空,进行该判断表明rs = SHUTDOWN且阻塞队列中的任务已经处理完,不会创建新的工作线程
         **/
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            /**获取线程池中的工作线程**/
            int wc = workerCountOf(c);
            /**判断工作线程是否超限**/
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /**通过cas方法添加一个工作线程数**/
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     /**根据firstTask创建一个工作线程**/
        w = new Worker(firstTask);
        final Thread t = w.thread;
     /**firstTask为null只创建,不启动**/
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
          /**1. 线程池是running状态
            *2. 线程池是shutdown状态并且firstTask为null
                  *满足上面任意一个条件,会去添加工作线程,对于第二个条件来说,不会去接收新的任务,但阻塞队列可能没有处理完,可以添加新的工作线程
                 **/
                if (rs 
                    (rs == SHUTDOWN && firstTask == null)) {
            /**线程是否已经启动**/
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
          /**启动线程**/
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            /**添加工作线程失败,进行回滚操作
              *1.将添加的工作线程从工作线程集合中移除
              *2.线程池工作线程数减一
              *3.重新执行线程池的terminate状态转换
             **/
            addWorkerFailed(w);
    }
    return workerStarted;
}

 runWorker方法(执行任务)

/**仅仅会在addWorker()成功时调用,内容比较简单,需要注意三个地方getTask()、beforeExecute()、afterExecute()(后两个可以自己重写)**/
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
   /** 释放锁,对应于worker类构造方法中的setState(-1), 将state状态恢复为0,允许中断
     *  线程池正在初始化任务线程时,会将锁的初始值设置为-1,这样做的目的是禁止执行前对任务进行中断
    **/
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       /**通过getTask()方法获取任务**/
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**判断线程/线程池是否处于中断/stop状态**/
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
              /**获得锁并运行任务**/
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
              /**释放锁,任务完成数加1**/
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {
    /**从阻塞队列中获取任务是否超时的变量设置**/
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
       /**如果线程池不是运行状态
         *1.线程是是否是stop、TIDYING、terminate状态
         *2.阻塞队列是否为空
         *满足以上条件 1||2,表明线程池不处理任务,不接受新的任务,线程池任务线程数-1
        **/
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        /**allowCoreThreadTimeOut为false表示线程池中核心线程数不需要进行超时判断**/
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
       /**获取任务(都会阻塞)
         * 如果设置了核心线程运行超时,或者是线程池中任务线程数多于核心线程数,通过pool设置超时时间获取任务。
         * 没事设置超时时间,通过take方法获取任务
         **/
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

shutdownNow方法

与shutdown方法相比,多了一个drainQueue清空阻塞队列的方法,并且所有线程进行中断操作

/**shutdown方法主要调用了四个方法**/
public List shutdownNow() {
    List tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        /**如果存在安全管理器,判断是否有权限interrupt权限**/
        checkShutdownAccess();
        /**设置线程池运行状态**/
        advanceRunState(STOP);
     /**中断任务线程**/
        interruptWorkers();
        /**清空阻塞队列**/
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    /**尝试将线程池设置为terminate状态**/
    tryTerminate();
    return tasks;
}

/**该方法是worker类中的方法,直接中断,与shutdown方法相比,改方法是对所有的任务线程进行中断操作,
  *shutdown方法会去先尝试获取锁,如果获取锁成功,表示当前线程正在等待任务,对于这种任务线程进行中断操作**/
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

 

tryTerminate方法

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /**1.线程池是否是运行状态
          *2.线程池是都是Tidying、terminate状态
          *3.线程池是否是shutdown状态,并且阻塞队列不为空
          *满足上述3个条件任意一个立即返回:
          *运行状态,线程池允许任务的处理以及添加,不能直接转换到terminate
          *shutdown状态,阻塞队列不为空,表示还在处理任务,不能直接转换到terminate
        **/
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        /**线程池为shutdown或者stop状态,且阻塞队列为空
          *如果线程池工作线程数不为0,至少中断一个工作线程, 此处可能存在getTask获取任务是一直处于阻塞的任务线程,避免队列为空,任务线程一直阻塞的情况
        **/
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /**设置为tidying状态**/
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    /**设置成terminated状态**/
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

线程池的监控

getPoolSize() : 获取当前线程池的工作线程数量

getQueue() : 获取线程池中阻塞队列(间接获取阻塞队列中任务的数量)

getCompletedTaskCount() : 获取也完成的任务数量

getTaskCount() : 获取已运行、未运行的任务总数

getLargestPoolSize() : 线程池线程数最大值

getActiveCount():当前线程池中正在执行任务的线程数量。

getCorePoolSize() : 线程池核心线程数

 

常见的线程池(Executors)

Executors是线程池的工厂类,通过Executors可以创建四种不同的线程池 (newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor、newWorkStealingPool(也是一种线程池,但不是通过ThreadPoolExecutor实现,不做讨论))

阻塞队列(引用这里)

SynchronousQueue:newCachedThreadPool
LinkedBlockingQueue(无界队列):基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,
而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
newFixedThreadPool使用
ArrayBlockingQueue(有界队列):使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变
得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可
能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
1. 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量,
但这样也会降低线程处理任务的吞吐量。
2. 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
3. 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任
务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。

DelayedWorkQueue : ScheduledThreadPoolExecutor使用

newFixedThreadPool

固定线程数量的线程池,corePoolSize==maximumPoolSize

1.所有工作线程都在执行任务,新来任务需要在队列中等待直到有空闲工作线程

2.工作线程在执行任务时被shutdown了,新来任务是会创建一个新的任务线程

newCachedThreadPool

可缓存线程池,corePoolSize==0, maximumPoolSize=Integer.MAX_VALUE

1.没有核心任务处理线程

2.新来任务是如果有空闲的处理线程,直接使用已有的处理线程,否则创建一个处理线程

3.当超过60s工作线程没有任务处理,将会被销毁

该线程池适合处理执行时间短,数量多的任务

 newScheduledThreadPool

调度线程池,jdk中单独一个类实现,初始化对象时设置corePoolSize,maximumPoolSize=Integer.MAX_VALUE

用来设置给定延迟时间后执行

newSingleThreadExecutor

只有一个工作线程来处理任务的线程池,corePoolSize==maximumPoolSize==1

 

Java 并发编程 --- ThreadPoolExecutor(五)

标签:holding   cer   限制   释放   his   二次   add   row   readwrite   

原文地址:https://www.cnblogs.com/kaneziki/p/9698781.html


评论


亲,登录后才可以留言!