标签:状态 人事 tin vat tran keepalive move 退出 tun
线程池的实现原理无非复用二字,类似数据库连接池,都是将一些重复创建的东西拿来重复使用。其中最关键的问题就两个:一个是怎么复用;一个是怎么回收。在数据库连接池中,一个连接的生命周期是我们可以手动控制的,相对来说容易一些。我们通过使用一个链表来持有连接并复用,超过最大连接数就回收。线程池不同,线程的生命周期不可控,当run方法运行结束了,线程就自然消亡了,因此麻烦一些,我们需要通过一个循环来让run方法不停歇的运行着,跑完一个又一个的任务。线程和任务由worker持有,当一个工人的所有任务(包括队列中的)都跑完了就会被回收。
JDK8的线程池ThreadPoolExecutor类2000多行,当然其中注释占了大头。其中5个内部类,除去4个饱和策略,分量最重的是一个私有的内部类Worker,我叫它工具人类,它是关键。看下它的说明:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
工具人类继承AQS是为了实现不可重入的互斥锁(见标黄外文),因为我们不想要工具人在执行任务时被人打断,比如setCorePoolSize方法里的interruptIdleWorkers方法。
工具人类实现了Runnable接口是为了让它自己先顶替真正的任务,在runWorker方法中实现线程的复用。工具人投入池中,用自己的线程出力,执行自己的任务,直到超过规定正式员工人数,所有任务进入缓存队列中排队。此时队列是动态的,只要工具人手头任务处理完了,就会从队列中拿到新任务进行处理。一旦任务爆仓,就得外包工具人上场。把ThreadPoolExecutor精简一下,去掉线程池的生命周期,去掉饱和策略,从2000多行变成了300多行。直接看代码:
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPool implements Executor {
private AtomicInteger workerCounts = new AtomicInteger(1); // 正在忙着的工具人数量
private final BlockingQueue workQueue; // 缓存队列,用于缓存超过核心线程数的任务
private final HashSet workers = new HashSet(); // 工具人池
private int completedTaskCount = 1; // 已完成任务总数,初始值为1
private int corePoolSize; // 核心线程数
private int maxPoolSize; // 最大线程数
private volatile long keepAliveTime; // 多出核心线程数的线程保命的时间
private Lock mainLock = new ReentrantLock(); // 线程池的全局锁
ThreadPool(int corePoolSize, int maxPoolSize, int queueSize, long keepAliveTime, TimeUnit unit) {
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.workQueue = new ArrayBlockingQueue(queueSize);
this.keepAliveTime = unit.toNanos(keepAliveTime);
}
/**
* 线程池的主方法,也是入口
*
* @param command
*/
@Override
public void execute(Runnable command) {
int currentWork = workerCounts.get();
// 先处理核心线程
if (currentWork corePoolSize) {
if (addWork(command, true)) {
System.out.println("execute >>> 核心线程数内执行完毕。");
return;
}
}
// 正式员工忙不过来,就把任务加入缓存队列中
if (workQueue.offer(command)) {
System.out.println("execute >>> 已加入缓存队列中,队列中有 " + workQueue.size() + " 个任务。");
if (workerCounts.get() == 0) { // 当前已经没有活着的工具人了,因为当前任务都跑完了,需要再创建工具人
addWork(null, false); // 此时无需给工具人分配任务了,因为任务入队缓存队列中,只需从缓存队列中取出即可
}
} else if (!addWork(command, false)) { // 缓存队列满了,让外包工具人处理,如果外包工具人名额也满了,那就真搞不定了
System.err.println("execute >>> 外包工具人名额也超额,线程池搞不定了。");
}
}
/**
* 任务入池
*
* @param firstTask 初始任务
* @param isCorePoolSize 是否核心线程数
* @return 任务是否已执行
*/
public boolean addWork(Runnable firstTask, boolean isCorePoolSize) {
// 双循环,通过设置标记来直接从内层循环跳出外层循环
retry:
for (; ; ) {
// 校验任务是否为空.若缓存队列不为空时,任务可以为空,见execute方法
if (firstTask == null && workQueue.isEmpty())
return false;
for (; ; ) {
int c = workerCounts.get(); // 获取在跑的工具人数量
System.out.println("addWork >>> 工具人数 : " + c + " 个;是否核心线程数 : " + isCorePoolSize);
// 若在核心线程数内,判断工具人数是否已超过;同理,已经超过核心线程数,则看是否超过最大任务数
if (c > (isCorePoolSize ? corePoolSize : maxPoolSize))
return false;
// 先将在忙的工具人数+1,跳出到最外层,执行循环下面的逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
}
}
boolean workerStarted = false; // 任务是否已开始执行
boolean workerAdded; // 任务是否已添加到池子里
Worker w = null;
final Thread t;
try {
w = new ThreadPool.Worker(firstTask); // 实例化工具人,将初始任务分配给该工具人
t = w.thread; // 从工具人那里领取线程,等会儿把任务跑起来
if (t != null) {
final Lock mainLock = this.mainLock; // 使用全局锁来添加工具人到池子里
mainLock.lock();
try {
workers.add(w); // 工具人入池
System.out.println("addWork >>> 工具人池里有 " + workers.size() + " 个工具人。");
workerAdded = true;
} finally {
mainLock.unlock();
}
if (workerAdded) { // 工具人入池成功,可以让他的线程跑起来了
System.out.println("addWork >>> 开始拉起线程,准备进入Workder.run()...");
t.start(); // 关键点1:工具人的线程跑起来了,怎么跑?工具人自己的run(),他也是个Runnable
workerStarted = true;
}
}
} finally {
if (!workerStarted) // 任务没跑起来,回滚:回收该工具人,在忙着的工具人数-1
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 新增工具人失败,执行回滚操作
*
* @param w
*/
private void addWorkerFailed(Worker w) {
final Lock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 刚进去的工具人,从哪来回哪去
decrementWorkerCount(); // 刚加的在忙着的工具人数减回去
} finally {
mainLock.unlock();
}
}
/**
* 使用CAS将工具人数+1
*
* @param expect
* @return
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return workerCounts.compareAndSet(expect, expect + 1);
}
/**
* 工具人执行任务。1个关键点。
*
* @param worker
*/
final void runWork(Worker worker) {
Runnable task = worker.task; // 获取该工具人要执行的任务
worker.task = null; // 清空该工具人的任务,所以firstTask就是一次的任务,后面都得从缓存队列中取任务
worker.unlock(); // 这里还是允许中断的
boolean completedAbruptly = true; // 任务执行是否被打断
try {
// 关键点3:线程复用——若任务不为空,或者队列中的任务不为空,工具人的线程将一直执行下去。一旦断了,工具人也就可以去死了
while (task != null || (task = getTask()) != null) {
worker.lock(); // 每次跑之前,工具人先锁住(不可重复锁),这里就不允许中断了
try {
task.run();
System.out.println("runWork >>> 任务真的跑起来了,目前已完成 " + completedTaskCount + " 个任务。");
} finally {
task = null; // 任务完成,清空
worker.completedTasks++; // 累计该工具人所完成的任务数量
worker.unlock(); // 工具人解锁(不可重复锁)
}
}
completedAbruptly = false; // 没有被打断
} finally {
System.out.println("runWork >>> 这个工具人没任务可做了,要死了...");
processWorkerExit(worker, completedAbruptly);
}
}
/**
* 将濒死的工具人从池中剔除
*
* @param w
* @param completedAbruptly 工具人在干活时是否被人打断过
*/
private void processWorkerExit(ThreadPool.Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 不曾被人打断,在干活的人数-1
decrementWorkerCount();
final Lock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 死后记账,把该工具人所完成的任务数汇总到总完成任务数中去
workers.remove(w); // 回收工具人的尸体
System.err.println("processWorkerExit >>> 工具人被回收了。");
} finally {
mainLock.unlock();
}
}
/**
* 空转,直到工具人数-1
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(workerCounts.get()));
}
/**
* 使用CAS将工具人数-1
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return workerCounts.compareAndSet(expect, expect - 1);
}
/**
* 获取队列中的工作任务。两个关键点。
*
* @return
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int wc = workerCounts.get(); // 取得在忙工具人数
boolean timed = wc > corePoolSize; // 超过核心线程池数,超时的外包工具人就危险了
if ((wc > maxPoolSize || (timed && timedOut)) // 要么超过最大数了,要么出现了要炒掉的外包工具人
&& (wc > 1 || workQueue.isEmpty())) { // 要么还有忙的工具人,要么缓存队列清空了
if (compareAndDecrementWorkerCount(wc))
return null; // 超时回收这里返回null,runWork就得退出while循环,进入回收工具人阶段,也就是炒人
continue; // 没有炒掉,继续清点外包工具人
}
System.out.println("getTask >>> 工作队列中还有 " + workQueue.size() + " 个任务。");
try {
Runnable r = timed ? // 什么时候会出现poll超时?当然是队列为空的时候。关键点4——确认这个外包工具人是否要被裁
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 缓存队列空的时候,这里会阻塞。关键点5——线程池靠它不让JVM退出
if (r != null)
return r;
timedOut = true; // 走到这里说明超时都获取不到任务了,那么说明这个外包工具人可以炒掉了
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* 工具人,持有线程池中的线程和任务。1个关键点。
*/
private class Worker extends AbstractQueuedSynchronizer implements Runnable {
private Thread thread; // 线程,用来处理任务,消费者用来消费任务的
private Runnable task; // 任务,生产者创建的
volatile long completedTasks; // 已完成的任务数量
public Worker(Runnable task) {
setState(-1); // 设置线程状态:SIGNAL
this.task = task; // 设置工具人的任务
thread = new Thread(this); // 创建新线程,这里很关键,必须将工具人本身作为任务,分配给这个线程
}
@Override
public void run() {
System.out.println("Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...");
runWork(this); // 关键点2:让工具人线程运行工具人任务:工具人.run() -> 线程池.runWork(工具人) -> 工具人.task.run()
}
/**
* 不可重入锁
*/
public void lock() {
acquire(1);
}
/**
* 解不可重入锁
*/
public void unlock() {
release(1);
}
// 以下都是实现AQS中的方法
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
public static void main(String[] args) {
int threadNum = 2;
ThreadPool threadPool = new ThreadPool(4, 8, 6, 4, TimeUnit.SECONDS);
for (int i = 0; i ) {
final int j = i + 1;
threadPool.execute(() -> {
System.out.println("hello, world.我是任务" + (j));
});
}
}
}
运行结果:
addWork >>> 工具人数 : 1 个;是否核心线程数 : true
addWork >>> 工具人池里有 1 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 2 个;是否核心线程数 : true
addWork >>> 工具人池里有 2 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务1
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务2
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
getTask >>> 工作队列中还有 0 个任务。
我们只生产了两个任务,没有超过核心线程数4,所以召唤的两个工具人事做完了后,常驻内存了。把main中的threadNum再分别改为10:
addWork >>> 工具人数 : 1 个;是否核心线程数 : true
addWork >>> 工具人池里有 1 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 2 个;是否核心线程数 : true
addWork >>> 工具人池里有 2 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
addWork >>> 工具人数 : 3 个;是否核心线程数 : true
addWork >>> 工具人池里有 3 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
execute >>> 核心线程数内执行完毕。
execute >>> 已加入缓存队列中,队列中有 1 个任务。
execute >>> 已加入缓存队列中,队列中有 2 个任务。
execute >>> 已加入缓存队列中,队列中有 3 个任务。
execute >>> 已加入缓存队列中,队列中有 4 个任务。
execute >>> 已加入缓存队列中,队列中有 5 个任务。
execute >>> 已加入缓存队列中,队列中有 6 个任务。
addWork >>> 工具人数 : 4 个;是否核心线程数 : false
addWork >>> 工具人池里有 4 个工具人。
addWork >>> 开始拉起线程,准备进入Workder.run()...
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务1
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 6 个任务。
hello, world.我是任务4
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 5 个任务。
hello, world.我是任务5
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 4 个任务。
hello, world.我是任务6
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 3 个任务。
hello, world.我是任务7
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 2 个任务。
hello, world.我是任务8
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 1 个任务。
hello, world.我是任务9
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务2
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务3
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
Worker.run() >>> ....工具人的任务跑起来了,进入runWork(工具人)...
hello, world.我是任务10
runWork >>> 任务真的跑起来了,目前已完成 1 个任务。
getTask >>> 工作队列中还有 0 个任务。
runWork >>> 这个工具人没任务可做了,要死了...
getTask >>> 工作队列中还有 0 个任务。
getTask >>> 工作队列中还有 0 个任务。
processWorkerExit >>> 工具人被回收了。
getTask >>> 工作队列中还有 0 个任务。
这次跑了10个任务,核心线程数4,所以只有3个正式工具人被召唤,6个任务进入了缓存队列,最后一个任务召唤出来了一个外包工具人处理第10个任务。正式员工常驻内存,而外包的下场的悲惨的。
特价版线程池ThreadPoolExecutor实现
标签:状态 人事 tin vat tran keepalive move 退出 tun
原文地址:https://www.cnblogs.com/wuxun1997/p/14157693.html