java 线程池
标签:线程 i++ 服务 ons stat hand service forkjoin execution
两种创建线程池的方法
ThreadPoolExecutor 和 FokJoinPool
1.ThreadPoolExecutor 创建线程池
1) 无返回值的情况
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 无返回值的线程池
*/
public class ThreadPoolTest {
/**
* 任务先拿corePoolSize里面的 2个线程,然后往 workQueue里面放两个,然后往这里面放 maxPoolSize - corePoolSize = 2个
* 这样就达到了线程的饱和数,其他的任务来了没有线程执行,阻塞队列也满了所以其他任务都会被拒绝
*
*/
public static void main(String[] args) {
//核心线程池的大小
int corePoolSize = 2;
// 线程池的最大线程数
int maxPoolSize = 4;
//线程最大空闲数
long keepAliveTime = 2;
//时间单位
TimeUnit unit = TimeUnit.SECONDS;
//阻塞对垒 容量为2
BlockingQueue workQueue = new ArrayBlockingQueue(2);
//线程创建工厂
ThreadFactory threadFactory = new NameThreadFactory();
//线程池拒绝策略
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = null;
try{
//推荐的使用创建线程池方式
//不推荐使用现成的api创建,会默认最大线程事2的31次方,容易导致服务器挂
executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
//预开启所有的核心线程 提升效率
executor.prestartAllCoreThreads();
//任务数量
int count = 12;
for (int i = 1; i ) {
//执行任务
RunnableTask task = new RunnableTask(String.valueOf(i));
executor.submit(task);
System.out.println("i::"+i+" active::"+executor.getActiveCount()+" core::"+executor.getCorePoolSize()+" size::"+executor.getPoolSize()+" queue::"+executor.getQueue().size());
System.out.println();
}
}finally{
if(executor !=null){
executor.shutdown();
}
}
}
/**
* @Description 线程工厂
* @Param
*/
static class NameThreadFactory implements ThreadFactory {
//线程id
private final AtomicInteger threadId = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,"线程—"+threadId.getAndIncrement());
System.out.println(t.getName()+" 已经被创建");
return t;
}
}
/**
* @Description 线程池拒绝策略
* @Param
*/
static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//打印被拒绝的任务
System.out.println(r.toString() + " 被拒绝");
}
}
static class RunnableTask implements Runnable{
private String name ;
RunnableTask(String name ){
this.name = name ;
}
@Override
public void run() {
System.out.println(this.toString() + " is running!");
// 让任务执行慢点
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString(){
return "RunnableTask [name = "+ name + "]";
}
}
}
View Code
2)有返回值的情况
import java.util.concurrent.*;
/**
* 有返回值的线程池
*/
public class ThreadPoolCallableTest {
public static void main(String[] args) throws ExecutionException {
ExecutorService executorService = null ;
int cout = 10 ;
try{
//不推荐这种创建方式,这是默认创建线程池,默认线程数事2的31次方
executorService = Executors.newCachedThreadPool();
//保存执行完成的结果数据
CompletionService
View Code
2.ForkJoinPool 创建线程池
1)无返回值的情况
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
/**
* 无返回值
*/
public class ForkJoinPoolAction {
private static Set resultSet = new CopyOnWriteArraySet();
public static void main(String[] args) throws InterruptedException {
//打印1-3000 的数字
PrintTask task = new PrintTask(0,3000);
//穿见线程池,这里线程数默认为服务器当前处理器数
ForkJoinPool pool = new ForkJoinPool();
pool.submit(task);
// 线程阻塞 等待所有的任务完成,这里有个等待时间
pool.awaitTermination(2,TimeUnit.SECONDS);
System.out.printf("result_set 的大小 = %s",resultSet.size());
pool.shutdown();
}
static class PrintTask extends RecursiveAction {
private int start ;
private int end;
private static final int THRESHOLD = 50 ;
@Override
protected void compute() {
//小于50 就直接将50个数字放集合里
if(end -start THRESHOLD){
for(int i = start; i ){
ForkJoinPoolAction.resultSet.add(i);
System.out.println(Thread.currentThread().getName()+ "的i 值 \t" + i);
}
}
//如果大约50就递归切分
else{
//递归切分
int mid = (start+end )/2;
PrintTask leftTask = new PrintTask(start,mid);
PrintTask rightTask = new PrintTask(mid,end);
leftTask.fork();
rightTask.fork();
}
}
public PrintTask(int start,int end){
this.start = start;
this.end = end;
}
}
}
View Code
2)有返回值的情况
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 有返回的情况
*/
public class ForkJoinPoolTask {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//需求: 对长度为10000的元素数组进行累加
int[] nums = new int[10000];
Random random = new Random();
int total = 0;
//获取纳秒时间
long start = System.nanoTime();
for(int i = 0 ;i ){
int temp = random.nextInt(100);
nums[i] = temp;
total += nums[i];
}
long end = System.nanoTime();
System.out.println("初始化数组用时:"+ (end-start)+" 纳秒,初始化数组总和:"+total);
long startTask = System.nanoTime();
SumTask task = new SumTask(nums,0,nums.length);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask forkJoinTask = forkJoinPool.submit(task);
long endTask = System.nanoTime();
System.out.println("线程计算用时::"+(endTask - startTask)+ " 纳秒,线程执行结果:"+ forkJoinTask.get());
forkJoinPool.shutdown();
}
static class SumTask extends RecursiveTask {
private int [] nums;
private int start;
private int end ;
private static final int THRESHOLD = 50 ;
SumTask( int [] nums, int start,int end){
this.nums = nums;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if(end -start THRESHOLD){
for(int i = start; i ){
sum+= nums[i];
}
return sum;
}
else{
//递归切分
int mid = (start+end )/2;
SumTask leftTask = new SumTask(nums,start,mid);
SumTask rightTask = new SumTask(nums,mid,end);
leftTask.fork();
rightTask.fork();
//把两个小人物累加合并
return leftTask.join() + rightTask.join();
}
}
}
}
View Code
两种创建线程池方式对比:
ThreadPoolExecutor 适用于IO密集型任务如:
1.http
2.rpc
3.DB
4.Redis 等
ForkJoinPool 适用于CPU密集型任务
1.处理大量的文本数据
java 线程池
标签:线程 i++ 服务 ons stat hand service forkjoin execution
原文地址:https://www.cnblogs.com/w-ting/p/12775139.html
文章来自:
搜素材网的
编程语言模块,转载请注明文章出处。
文章标题:
java 线程池
文章链接:http://soscw.com/index.php/essay/52282.html
评论