ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

为什么阿里巴巴不允许使用Executors创建线程池

2021-12-01 19:02:02  阅读:217  来源: 互联网

标签:线程 阿里巴巴 thread Executors 队列 创建 threads new


线程池:业务代码常见的问题

在程序中,我们会使用各种池优化缓存创建昂贵的对象,比如线程池、连接池、内存池。一般是预先创建一些对象放入池中,使用的时候直接取出使用,用完归还以便复用,还会通过一定策略调整池中缓存的对象数量,实现动态伸缩。

由于线程的创建比较昂贵,随意、没有控制地创建大量线程会造成性能问题,因此短平快的任务一般优先考虑使用线程池来处理,而不是直接创建线程

1. 线程池的声明需要手动进行

Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。《阿里 巴巴 Java开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 new ThreadPoolExecutor来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典 型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。

阿里巴巴文档:

在这里插入图片描述

 

测试 OOM问题 :

  • 来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提
    交任务,每个任务都会创建一个比较大的字符串然后休眠一小时


import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
public class threadPooltest extends RecordLogApplicationTest {


    /**
     * 测试 OOM问题
     */
    @Test
    public void threadPooltest() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(100000);
        System.out.println("开始执行");
        for (int i = 0; i < 100000000; i++) {
            executorService.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a").collect(Collectors.joining("")) + UUID.randomUUID().toString();
                System.out.println("等待一小时开始");
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (Exception e) {
                    log.info(payload);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
    }


}

结果:java.lang.OutOfMemoryError 错误

在这里插入图片描述

 

首先我们看下 newFixedThreadPool 方法的源码,发现,线程池的工作队列直接 new 了一个 LinkedBlockingQueue,

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

点进去查看 LinkedBlockingQueue构造方法 是一个 Integer.MAX_VALUE长度的队列,可以认为是无界的

 /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

虽然 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界但。如果任务较多并且执行较慢但话,队列可能会快速积压,撑爆内存导致OOM

测试newCachedThreadPool

如果我们把 newFixedThreadPool 改成 newCachedThreadPool方法来获取线程池。程序运行不久后,同样会看到 OOM 异常

java.lang.OutOfMemoryError: unable to create new native thread

源码:

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

这种线程池的最大线程数是 Integer.MAX_VALUE ,认为是没有上限的,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。由于我们的任务需要一小时才能完成,大量的任务进来后会创建大量的线程,我们知道线程是分配一定的内存空间做为线程栈,比如 1MB,因此无限创建线程必然会导致OOM

我们不建议使用 Executors 提供的两种快捷的线程池,原因如下:

  • 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合要求,一般都需要设置有界的工作队列和 可控的线程数。
  • 任何时候,都于根伟自定义线程池指定有意思的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量CPU 、线程执行出现异常等问题时,我们往往会抓取线程栈,此时,有意义的线程名称,就可以方便我们定位问题。

总结 线程池工作行为:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

2.确认线程池是否在复用

  • 在生产环境中,监控一直报警当前使用线程数太多,一会又将下来,但是当前用户访问量也不是很大

通过代码排查 发现项目中使用了 Executors.newCachedThreadPool(); 创建线程池使用,我们知道newCachedThreadPool 会在需要时创建必要多的线程,业务代码的一次业务操作会向线程池提交多个慢任务,这样执行一次业务操作就会开启多个线程,如果业务操作量较大,的确有可能一下子开启几千个线程

源码发现

/**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  • 它的核心线程数是0,而最大线程数 Integer的最大值,一般来说机器都没那么大内存给它不断使用,而 keepAliveTime 是60秒,也就是在 60秒之后所有的线程都是可以回收的,采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

所以我们在使用线程池要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:
: 1. 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队
列。
: 2. 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数 *2(理
由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过
多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做
缓冲。

标签:线程,阿里巴巴,thread,Executors,队列,创建,threads,new
来源: https://blog.csdn.net/qq_40428665/article/details/121661788

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有