java 线程池

2021-02-07 17:18

阅读:367

标签:线程   i++   服务   ons   stat   hand   service   forkjoin   execution   

两种创建线程池的方法 

ThreadPoolExecutor 和 FokJoinPool

1.ThreadPoolExecutor 创建线程池
1) 无返回值的情况
技术图片技术图片
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 无返回值的线程池
 */
public class ThreadPoolTest {
    /**
    *  任务先拿corePoolSize里面的 2个线程,然后往 workQueue里面放两个,然后往这里面放 maxPoolSize - corePoolSize = 2个
     *  这样就达到了线程的饱和数,其他的任务来了没有线程执行,阻塞队列也满了所以其他任务都会被拒绝
    *
    */
    public static void main(String[] args) {
        //核心线程池的大小
        int corePoolSize = 2;
        // 线程池的最大线程数
        int maxPoolSize = 4;
        //线程最大空闲数
        long keepAliveTime = 2;
        //时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //阻塞对垒 容量为2
        BlockingQueue workQueue = new ArrayBlockingQueue(2);
        //线程创建工厂
        ThreadFactory threadFactory = new NameThreadFactory();
        //线程池拒绝策略
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        ThreadPoolExecutor executor = null;
        try{
            //推荐的使用创建线程池方式
            //不推荐使用现成的api创建,会默认最大线程事2的31次方,容易导致服务器挂
            executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            //预开启所有的核心线程 提升效率
            executor.prestartAllCoreThreads();

            //任务数量
            int count = 12;
            for (int i = 1; i ) {
                //执行任务
                RunnableTask task = new RunnableTask(String.valueOf(i));
                executor.submit(task);
                System.out.println("i::"+i+"    active::"+executor.getActiveCount()+"   core::"+executor.getCorePoolSize()+"    size::"+executor.getPoolSize()+"   queue::"+executor.getQueue().size());
                System.out.println();

            }
        }finally{
            if(executor !=null){
                executor.shutdown();
            }

        }
    }

    /**
     * @Description 线程工厂
     * @Param
     */
    static class NameThreadFactory implements ThreadFactory {
        //线程id
         private final AtomicInteger threadId = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r,"线程—"+threadId.getAndIncrement());
            System.out.println(t.getName()+" 已经被创建");
            return t;
        }
    }
    /**
    * @Description 线程池拒绝策略
    * @Param
    */
    static class MyIgnorePolicy implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //打印被拒绝的任务
            System.out.println(r.toString() + "  被拒绝");
        }
    }
    static class RunnableTask implements Runnable{
            private String name ;
        RunnableTask(String name ){
            this.name = name ;
        }
        @Override
        public void run() {
            System.out.println(this.toString() + " is running!");
            // 让任务执行慢点
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString(){
            return "RunnableTask [name = "+ name + "]";
        }
    }
}
View Code

  2)有返回值的情况

技术图片技术图片
import java.util.concurrent.*;

/**
 * 有返回值的线程池
 */
public class ThreadPoolCallableTest {
    public static void main(String[] args) throws ExecutionException {
        ExecutorService executorService = null ;
        int cout = 10 ;
        try{
            //不推荐这种创建方式,这是默认创建线程池,默认线程数事2的31次方
            executorService = Executors.newCachedThreadPool();
            //保存执行完成的结果数据
            CompletionService completionService = new ExecutorCompletionService(executorService);
            for(int i = 1 ; i){
                FactoryCalculator factoryCalculator = new FactoryCalculator(i);
                completionService.submit(factoryCalculator);
            }
            for(int i = 1 ; i){
                Future future = completionService.take();
                System.out.println(future.get());
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            if(executorService != null ){
                executorService.shutdown();
            }
        }
    }
    static  class  FactoryCalculator implements Callable {
        Object val = null;
        public FactoryCalculator(Object val){
            this.val = val;
        }
        @Override
        public Object call() throws Exception {
            return Thread.currentThread().getName() + "::"+val;
        }
    }
}
View Code

 2.ForkJoinPool 创建线程池

1)无返回值的情况

技术图片技术图片
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

/**
 * 无返回值
 */
public class ForkJoinPoolAction {
    private static Set resultSet = new CopyOnWriteArraySet();
    public static void main(String[] args) throws InterruptedException {
        //打印1-3000 的数字
        PrintTask task = new PrintTask(0,3000);
        //穿见线程池,这里线程数默认为服务器当前处理器数
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(task);
        // 线程阻塞 等待所有的任务完成,这里有个等待时间
        pool.awaitTermination(2,TimeUnit.SECONDS);
        System.out.printf("result_set 的大小 = %s",resultSet.size());
        pool.shutdown();
    }
    static  class  PrintTask extends RecursiveAction {
        private  int start ;
        private int end;
        private static  final  int THRESHOLD = 50 ;
        @Override
        protected void compute() {
            //小于50 就直接将50个数字放集合里
            if(end -start  THRESHOLD){
                for(int i = start; i  ){
                    ForkJoinPoolAction.resultSet.add(i);
                    System.out.println(Thread.currentThread().getName()+ "的i 值 \t" + i);
                }
            }
            //如果大约50就递归切分
            else{
                //递归切分
                int mid = (start+end )/2;
                PrintTask leftTask = new PrintTask(start,mid);
                PrintTask rightTask = new PrintTask(mid,end);
                leftTask.fork();
                rightTask.fork();
            }
        }
        public PrintTask(int start,int end){
                this.start = start;
                this.end = end;
        }
    }
}
View Code

2)有返回值的情况

技术图片技术图片
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * 有返回的情况
 */
public class ForkJoinPoolTask {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //需求: 对长度为10000的元素数组进行累加
        int[] nums = new int[10000];
        Random random =  new Random();
        int total = 0;
        //获取纳秒时间
        long start = System.nanoTime();
        for(int i = 0 ;i ){
            int temp = random.nextInt(100);
            nums[i] = temp;
            total += nums[i];

        }
        long end = System.nanoTime();
        System.out.println("初始化数组用时:"+ (end-start)+" 纳秒,初始化数组总和:"+total);

        long startTask = System.nanoTime();
        SumTask task = new SumTask(nums,0,nums.length);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask forkJoinTask = forkJoinPool.submit(task);

        long endTask = System.nanoTime();
        System.out.println("线程计算用时::"+(endTask - startTask)+ " 纳秒,线程执行结果:"+ forkJoinTask.get());
        forkJoinPool.shutdown();

    }
    static  class  SumTask extends RecursiveTask {
        private  int [] nums;
        private int start;
        private int end ;
        private static  final  int THRESHOLD = 50 ;

         SumTask( int [] nums, int start,int end){
            this.nums = nums;
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
             int sum = 0;
            if(end -start  THRESHOLD){
                for(int i = start; i  ){
                    sum+= nums[i];
                }
                return sum;
            }
            else{
                //递归切分
                int mid = (start+end )/2;
                SumTask  leftTask = new SumTask(nums,start,mid);
                SumTask rightTask = new SumTask(nums,mid,end);
                leftTask.fork();
                rightTask.fork();
                //把两个小人物累加合并
                return  leftTask.join() + rightTask.join();
            }
        }
    }
}
View Code

 

两种创建线程池方式对比:

ThreadPoolExecutor 适用于IO密集型任务如:
  1.http
  2.rpc
  3.DB
  4.Redis 等
ForkJoinPool 适用于CPU密集型任务
1.处理大量的文本数据





java 线程池

标签:线程   i++   服务   ons   stat   hand   service   forkjoin   execution   

原文地址:https://www.cnblogs.com/w-ting/p/12775139.html


评论


亲,登录后才可以留言!