线程池基础篇
2020-12-13 04:09
标签:建议 修改 oid 时机 实现 atom bcd access 适用于 1、 降低资源的消耗。降低线程创建和销毁的资源消耗; 2、 提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间 3、 提高线程的可管理性。 ThreadPoolExecutor,jdk所有线程池实现的父类 先看构造函数: 再看看参数的具体解释: 参数详解: 1.核心线程 int corePoolSize:核心池大小 线程池在new 出来后是没有线程的,仅当执行任务的时候才会创建线程; 当然也可以调用线程池的prestartAllCoreThreads()方法,让线程池在创建时就创建corePoolSize数目的线程; 核心线程在任务完成时,会new 新的线程,并且不会销毁,除非设置allowCoreThreadTimeOut 2 最大线程数 int maximuxPoolSize:最大线程池大小 3 存活时间 long keepAliveTime:线程存活时间 当线程池中的线程数量大于核心池大小后,超过核心数量的线程在keepAliveTime的时间内可以等待一个新任务,超过时间就会销毁; 4. TimeUnit timeUnit:keepAliveTime的单位, TimeUnit 枚举值有:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS(毫秒)、MICROSECONDS(微秒)、NANOSECONDS(纳秒); 5.工作队列 BlockingQueue workQueue:阻塞任务队列 执行前的任务,存储在队列中(线程池会先使用核心线程,当核心线程用完才会使用队列); 线程池只会execute Runnable任务,Callable任务也会包装成Runnable任务 主要实现类有: 1)LinkedBlockingQueue:基于链表的无界(默认构造函数为:最大值Integer.MAX_VALUE容量)阻塞队列,按FIFO(先进先出)的规则存取任务 2)ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO的规则对任务进行存取,必须传入参数来定义队列大小 3)DelayedWorkQueue:基于堆的延迟队列,Executors.newScheduledThreadPool(...)中使用了该队列 4)PriorityBlockingQueue:具有优先级的阻塞队列 5)SynchronousQueue:不存储任务的阻塞队列,每一个存入对应一个取出,串行化队列 吞吐量:SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue 6 线程创建工厂 ThreadFactory threadFactory:线程工厂 用来创建线程,可以通过自定义线程工厂给新创建的线程设置更合理的名字、设置优先级和是否守护线程。可以根据需要自定义线程工厂。 7 拒绝策略 RejectedExecutionHandler handler:拒绝任务的接口处理器 达到队列和线程池的最大线程数限制,就执行拒绝策略 拒绝策略有: 1)AbortPolicy:拒绝任务并抛出异常,默认的策略 2)DiscardPolicy:直接拒绝不抛出异常 3)DiscardOldestPolicy:丢弃队列中最远的一个任务(最先进入队列的,FIFO),并执行当前任务; 4)CallerRunsPolicy:只用调用者所在的线程来执行任务,不管其他线程的事。 看上面的英文注释,有3步: 1) 检查core线程池数量 2)如果corePoolSize线程数量已使用,如果队列容量未满,则加入队列。 3)队列已满,创建maximumPoolSize线程数量执行;如果失败则执行关闭线程池或者拒绝策略。 继续跟踪 private boolean addWorker(Runnable firstTask, boolean core) Worker 是啥 Worker实现AQS和Runnable接口 ,是线程接口实现 追踪 run方法 跟踪 getTask 跟踪 processWorkerExit(w, completedAbruptly); addWorker 失败处理 tryTerminate 大量的CAS和自旋锁;位运算算法和重入锁AQS。 看看销毁线程池方法 执行图解: 创建固定线程数量的,适用于负载较重的服务器,使用了无界队列 创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列 会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue 基于ForkJoinPool实现 需要定期执行周期任务,Timer不建议使用了。 newSingleThreadScheduledExecutor:只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务 newScheduledThreadPool 可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候 方法说明: schedule:只执行一次,任务还可以延时执行 scheduleAtFixedRate:提交固定时间间隔的任务 scheduleWithFixedDelay:提交固定延时间隔执行的任务一、什么是线程池?为什么要用线程池?
二、线程池的创建
三、参数介绍
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize
* {@code keepAliveTime
* {@code maximumPoolSize
* {@code maximumPoolSize @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue// 阻塞队列
private final BlockingQueue
the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime )
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
the maximum number of threads to allow in the pool 线程池所允许创建的最大线程数;
when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
//线程组
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//名称前缀,这就是我们打印的线程池中线程名称前缀
//可以修改的,如果自定义线程工厂
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
//工厂创建线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
//非守护线程
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
//常规优先级
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
四、我们看线程池的主要方法
1.execute()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn‘t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//AtomicInteger存
int c = ctl.get();
//工作线程小于corePoolSize
if (workerCountOf(c) corePoolSize) {
//添加一个core线程,此处参数为true,表示添加的线程是core容量下的线程
if (addWorker(command, true))
return;
//刷新数据,乐观锁就是没有锁
c = ctl.get();
}
//判断线程池运行状态,工作队列是否有空间
if (isRunning(c) && workQueue.offer(command)) {
//再次检测
int recheck = ctl.get();
//线程池已停止,就移除队列
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
//线程池在运行,有效线程数为0
else if (workerCountOf(recheck) == 0)
//添加一个空线程进线程池,使用非core容量线程
//仅有一种情况,会走这步,core线程数为0,max线程数>0,队列容量>0
//创建一个非core容量的线程,线程池会将队列的command执行
addWorker(null, false);
}
//线程池停止了或者队列已满,添加maximumPoolSize容量工作线程,如果失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
rivate final ReentrantLock mainLock = new ReentrantLock();
private final HashSet
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//此处创建了线程,线程工厂,使用this
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don‘t need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread‘s
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread‘s UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//任务线程的锁状态默认为-1(构造函数设置的),此时解锁+1,变为0,是锁打开状态,允许中断。
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果添加的任务存在或者队列取
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//线程池正在停止,当前线程未中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断当前线程
wt.interrupt();
try {
//准备,空方法,可自定义实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//本质,直接调用run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//结束,空方法,可自定义实现
afterExecute(task, thrown);
}
} finally {
task = null;
//记录任务数
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//worker结束处理
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//自旋锁
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查线程池是否关闭,队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//减少工作线程数量
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//工作线程数超过core或者max,或者队列为空,工作线程存在
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//减少任务数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//超时机制控制队列取元素
//take 移除并返回队列头部的元素 如果队列为空,则阻塞
//poll 移除并返问队列头部的元素 如果队列为空,则返回null
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//正常执行,此处设置为法false
if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//任务数增加
completedTaskCount += w.completedTasks;
//移除HashSet的线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试停止线程池
tryTerminate();
int c = ctl.get();
//如果没有停止线程池
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//core线程数量为0或者队列为空,默认1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//工作线程比core线程多,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//当前线程运行结束,增加空线程,容量maximumPoolSize
addWorker(null, false);
}
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//工作线程移除任务HashSet
workers.remove(w);
//工作线程数量-1
decrementWorkerCount();
//尝试停止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//线程池在运行或者状态在变化中,或者正在停止但队列有任务,直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//有任务工作线程
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断所有线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程池状态改变
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//结束线程池,空方法,does nothing
terminated();
} finally {
//设置线程池状态,结束
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有wait线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限
checkShutdownAccess();
//CAS 更新线程池状态
advanceRunState(SHUTDOWN);
//中断所有线程
interruptIdleWorkers();
//关闭,此处是do nothing
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试结束,上面代码已分析
tryTerminate();
}
2. submit方法
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture
五、预定义的线程池
FixedThreadPool
SingleThreadExecutor
CachedThreadPool
WorkStealingPool(JDK7以后)
ScheduledThreadPoolExecutor
package com.youyou.test.demo01;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import