Java如何让线程池满后再存放队列
2021-06-05 03:02
标签:RoCE 成功 lex 自定义 作用 sub shu code lis 核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、超出核心线程数量的存活时间(keepAliveTime)、 超出核心线程数量的存活时间单位(unit)、存放任务队列(workQueue)、执行程序创建新线程时使用的工厂(threadFactory)、当线程边界和队列容量达到时拒绝策略(handler) 正常线程池工作流程 1:当提交的任务小于核心线程池数量的时候,使用线程池中的核心线程。 2:当提交的任务大于线程池中核心线程数量的时候,会将新任务存放到队列中。 3:当队列存满后,会开启新线程直到达到设置的最大线程池数量。 4:当队列存满后,且线程池中的最大线程数量达到最大的时候,这时候在提交过来任务,直接采用线程池设置的拒绝策略。 由上面可得,如果队列在没有存满的情况下我们的最大线程数量是没有开启的,这时候并没有达到我们想要的多线程的效果。所以我们需要改写一下逻辑 1:自定义线程池继承ThreadPoolExecutor类,改写核心的逻辑。 2:自定义队列继承LinkedBlockingQueue,改写 offer 方法。 自定义队列方法: 自定义线程池类: 测试类: 自定义线程池核心逻辑: 当提交任务到CustomThreadPoolExecutor的时候,执行 submittedTaskCount.incrementAndGet(); 将线程池中数量+1处理,然后调用父类 super.execute(command); 执行。 自定义队列核心逻辑: 当执行到 workQueue.offer(command) 方法的时候走的我们自定义队列TaskQueue的offer方法,而offer方法的返回值决定着是否创建更多的线程:返回true,代表入队成功,不创建线程;返回false,代表入队失败,需要创建线程。 核心逻辑:当前线程数小于最大线程数就返回false,代表入队失败,需要创建线程。 因此,总结起来就是:自定义的CustomThreadPoolExecutor依赖自定义的TaskQueue的offer返回值来决定是否创建更多的线程,达到先判断maximumPoolSize再判断队列的目的。 tomcat 源码中的线程池也是使用的这样的思想,该例子来源于tomcat源码思想。 tomcat 线程池 源码: tomcat 源码中 TaskQueue 源码: Java如何让线程池满后再存放队列 标签:RoCE 成功 lex 自定义 作用 sub shu code lis 原文地址:https://www.cnblogs.com/ming-blogs/p/14636960.html1.线程池源码分析:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue
2.场景分析
package com.example.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @Description: 线程池工作队列重写
* @Author: mingtian
* @CreateDate: 2021/4/9 13:22
* @Version: 1.0
*/
public class TaskQueue
package com.example.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 自定义线程池 重写线程池执行顺序
* @Author: mingtian
* @CreateDate: 2021/4/9 13:21
* @Version: 1.0
*/
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 打印日志
*/
private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class);
/**
* 定义一个成员变量,用于记录当前线程池中已提交的任务数量
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* 自定义线程池
*
* @param corePoolSize 核心线程池数量
* @param maximumPoolSize 最大线程池数量
* @param keepAliveTime 超过核心线程池数量存活时间
* @param unit 超过核心线程池数量存活时间单位
* @param workQueue 存放任务的队列
* @param threadFactory 线程工厂 可以定义线程池名称
* @param handler 当队列满时执行拒绝策略
*/
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue
package com.example.util;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: 自定义线程池队列
* @Author: mingtian
* @CreateDate: 2021/4/9 13:28
* @Version: 1.0
*/
public class CustomThreadPoolExecutorUtil {
/**
* 打印日志
*/
private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutorUtil.class);
/**
* 默认 CPU 核心数
*/
private static int threadPoolSize = 0;
static {
// 获取服务器 CPU 核心数
threadPoolSize = Runtime.getRuntime().availableProcessors();
logger.info("服务器 CPU 核心数量:{}", threadPoolSize);
}
public static int getThreadPoolSize() {
return threadPoolSize;
}
/**
* 线程工厂,用来创建线程
*/
private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("port-pool-%d").build();
private static TaskQueue taskQueue = new TaskQueue(10);
/**
* 自定义线程池
*/
private static CustomThreadPoolExecutor CustomThreadPoolExecutor = new CustomThreadPoolExecutor(2, 2 * 2,
60L, TimeUnit.SECONDS, taskQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
/**
* 获取线程池对象方法
*
* @return
*/
public static CustomThreadPoolExecutor getCustomThreadPoolExecutor() {
return CustomThreadPoolExecutor;
}
/**
* 模拟发送消息方法
*/
public static class SendMessage implements Runnable {
private int i;
public SendMessage(int i) {
this.i = i;
}
@SneakyThrows
@Override
public void run() {
logger.info("我是第{}条消息,poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{}", i,
CustomThreadPoolExecutor.getPoolSize(), CustomThreadPoolExecutor.getQueue().size(),
CustomThreadPoolExecutor.getActiveCount(), CustomThreadPoolExecutor.getCompletedTaskCount());
}
}
public static void main(String[] args) throws InterruptedException {
logger.info("-------------------------------开始测试--------------------------------------");
taskQueue.setExecutor(CustomThreadPoolExecutor);
for (int i = 1; i ) {
CustomThreadPoolExecutorUtil.SendMessage sendMessage = new CustomThreadPoolExecutorUtil.SendMessage(i);
CustomThreadPoolExecutor.execute(sendMessage);
}
Thread.sleep(10000);
CustomThreadPoolExecutor.shutdown();
logger.info("-------------------------------测试结束--------------------------------------");
}
}
1 // 代码运行到此处,说明线程数 >= corePoolSize, 此时workQueue为自定义的TaskQueue
2 if (isRunning(c) && workQueue.offer(command)) {
3 int recheck = ctl.get();
4 if (! isRunning(recheck) && remove(command))
5 reject(command);
6 else if (workerCountOf(recheck) == 0)
7 addWorker(null, false);
8 }
1 public boolean offer(Runnable runnable) {
2 if (threadPoolExecutor == null) {
3 throw new RejectedExecutionException("The task queue does not have executor!");
4 }
5 // 线程池的当前线程数
6 int currentPoolThreadSize = threadPoolExecutor.getPoolSize();
7 if (threadPoolExecutor.getSubmittedTaskCount() currentPoolThreadSize) {
8 // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
9 return super.offer(runnable);
10 }
11
12 // return false to let executor create new worker.
13 if (currentPoolThreadSize threadPoolExecutor.getMaximumPoolSize()) {
14 // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
15 return false;
16 }
17 // 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
18 return super.offer(runnable);
19 }
3.参考文献
private final AtomicInteger submittedCount = new AtomicInteger(0);
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
public boolean offer(Runnable o) {
//we can‘t do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()
上一篇:滴水逆向初级-C语言(二)
下一篇:springboot面试题