Java常见编程错误:线程池

2021-02-04 19:17

阅读:475

标签:tar   指定   extends   exce   oob   term   解决问题   zab   请求   

由于线程的创建?较昂贵,随意、没有控制地创建?量线程会造成性能问题,因此短平快的任务?般考虑使 ?线程池来处理,?不是直接创建线程。

通过三个?产事故,来看看使?线程池应该注意些什么。

 

线程池的声明需要?动进?

Java中的Executors类定义了?些快捷的?具?法,来帮助我们快速创建线程池。《阿?巴巴Java开发? 册》中提到,禁?使?这些?法来创建线程池,?应该?动new ThreadPoolExecutor来创建线程池。

最典型的就是newFixedThreadPool和 newCachedThreadPool,可能因为资源耗尽导致OOM问题。

场景描述

写?段测试代码,来初始化?个单线程的FixedThreadPool,循环1亿次向线程池提交任务,每个任务都会创建?个?较?的字符串然后休眠??时:

执?程序后不久,?志中就出现OOM

public void oom1() throws InterruptedException {
        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
        for (int i = 0; i  {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a")
                        .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                }

            });
        }
        threadPool.shutdown();
        threadPool.awaitTermination(1, TimeUnit.HOURS);
    }

 

看newFixedThreadPool?法的源码发现,线程池的?作队列直接new了?个 LinkedBlockingQueue,?默认构造?法的LinkedBlockingQueue是?个Integer.MAX_VALUE?度的队列,可以认为是?界的:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue());
    }
    public class LinkedBlockingQueue extends AbstractQueue
            implements BlockingQueue, java.io.Serializable {
        ...
        /**
         * Creates a {@code LinkedBlockingQueue} with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
        ...
    }    

 

虽然使?newFixedThreadPool可以把?作线程控制在固定的数量上,但任务队列是?界的。如果任务较多 并且执?较慢的话,队列可能会快速积压,撑爆内存导致OOM。

把刚才的例?稍微改?下,改为使?newCachedThreadPool?法来获得线程池。程序运?不久后, 同样看到了OOM异常

这次OOM的原因是?法创建线程,翻看newCachedThreadPool的源码可以看到,这种线程池的最?线程数是Integer.MAX_VALUE,可以认为是没有上限的,?其?作队列 SynchronousQueue是?个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到?条?作线程来处理,如果当前没有空闲的线程就再创建?条新的。

由于任务需要1?时才能执?完成,?量的任务进来后会创建?量的线程。线程是需要分配?定的内存空间作为线程栈的,?如1MB,因此?限制创建线程必然会导致OOM

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue());
}

事故描述:

抱有侥幸?理,觉得只是使?线程池做?些轻量级的任务,不可能造成队列积压或开启?量线程。

??注册后,调??个外部服务去发送短信, 发送短信接?正常时可以在100毫秒内响应,TPS 100的注册量,CachedThreadPool能稳定在占?10个左 右线程的情况下满?需求。在某个时间点,外部短信服务不可?了,我们调?这个服务的超时?特别?, ?如1分钟,1分钟可能就进来了6000??,产?6000个发送短信的任务,需要6000个线程,没多久就因为?法创建线程导致了OOM,整个应?程序崩溃。

 

不建议使?Executors提供的两种快捷的线程池,原因如下:

  • 我们需要根据??的场景、并发情况来评估线程池的?个核?参数,包括核?线程数、最?线程数、线程 回收策略、?作队列的类型,以及拒绝策略,确保线程池的?作?为符合需求,?般都需要设置有界的? 作队列和可控的线程数。
  • 任何时候,都应该为?定义线程池指定有意义的名称,以?便排查问题。当出现线程数量暴增、线程死锁、线程占??量CPU、线程执?出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以?便我们定位问题。

线程池线程管理策略详解

除了?动声明线程池以外,还可以??些监控?段来观察线程池的状态。线程池这个组件除?是出现了拒绝策略,否则压?再?都不会抛出?个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。

程池线程管理策略详解

??个printStats?法实现了最简陋的监控,每秒输出?次线程池的基本内部信息,包括线程数、活跃线程数、完成了多少任务,以及队列中还有多少积压任务等信息:

private void printStats(ThreadPoolExecutor threadPool) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("Pool Size: {}", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

验证方案

?定义?个线程池。这个线程池具有2个核?线程、5个最?线程、使?容量为10的ArrayBlockingQueue阻塞队列作为?作队列,使?默认的AbortPolicy拒绝策略,也就是任务添加到线程池失败会抛出RejectedExecutionException。此外,借助Jodd类库的ThreadFactoryBuilder?法来构造?个线程??,实现线程池线程的?定义命名。

写?段测试代码来观察线程池管理线程的策略。测试代码的逻辑为,每次间隔1秒向线程池提交 任务,循环20次,每个任务需要10秒才能执?完成,代码如下:

public static int right() throws InterruptedException {
        //  使用一个计数器跟踪完成的任务数
        AtomicInteger atomicInteger = new AtomicInteger();

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2, 5,
                5, TimeUnit.SECONDS,
                new ArrayBlockingQueue(10),
                new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").build(),
                new ThreadPoolExecutor.AbortPolicy()
        );

        printStatus(threadPool);
        //每隔1s提交一次任务,一共提交20次

        IntStream.rangeClosed(1, 20).forEach(
                i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int id = atomicInteger.incrementAndGet();
                    try {
                        threadPool.submit(() -> {
                                    System.out.println(id + " started.");

                                    try {
                                        TimeUnit.SECONDS.sleep(10);
                                    } catch (InterruptedException e) {

                                    }

                                    System.out.println(id + " finished.");
                                }
                        );
                    } catch (Exception ex) {
                        //提交出现异常的话,打印出错信息并为计数器减?
                        System.out.println("error submitting task " + id + " " + ex.toString());
                        atomicInteger.decrementAndGet();
                    }
                }
        );

        TimeUnit.SECONDS.sleep(60);
        return atomicInteger.intValue();


    }

    private static void printStatus(ThreadPoolExecutor threadPool) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
                () -> {
                    System.out.println("=========================");
                    System.out.println("Pool Size: " + threadPool.getPoolSize());
                    System.out.println("Active Threads: " + threadPool.getActiveCount());
                    System.out.println("Number of Tasks Completed: " + threadPool.getCompletedTaskCount());
                    System.out.println(("Number of Tasks in Queue: " + threadPool.getQueue().size()));
                    System.out.println("=========================");
                }, 0, 1, TimeUnit.SECONDS
        );
    }

60秒后??输出了17,有3次提交失败了,把printStats?法打印出的?志绘制成图表,得出如下曲线:

技术图片

 

可以总结出线程池默认的?作?为:

  • 不会初始化corePoolSize个线程,有任务来了才创建?作线程;
  • 当核?线程满了之后不会?即扩容线程池,?是把任务堆积到?作队列中;
  • 当?作队列满了后扩容线程池,?直到线程个数达到maximumPoolSize为?;
  • 如果队列已满且达到了最?线程后还有任务进来,按照拒绝策略处理;
  • 当线程数?于核?线程数时,线程等待keepAliveTime后还是没有任务需要处理的话,收缩线程到核?线程数。 

也可以 通过?些?段来改变线程池的默认?作?为:

  • 声明线程池后?即调?prestartAllCoreThreads?法,来启动所有核?线程;
  • 传?true给allowCoreThreadTimeOut?法,来让线程池在空闲的时候同样回收核?线程。

务必确认清楚线程池本?是不是复?的

 

Java常见编程错误:线程池

标签:tar   指定   extends   exce   oob   term   解决问题   zab   请求   

原文地址:https://www.cnblogs.com/liekkas01/p/12791700.html


评论


亲,登录后才可以留言!