简单聊一聊Java线程池ThreadPoolExecutor

2022-11-13 10:11:43 java 线程 简单

简介

ThreadPoolExecutor是一个实现ExecutorService接口的线程池,ExecutorService是主要用来处理多线程任务的一个接口,通常比较简单是用法是由Executors工厂类去创建。

线程池主要解决了两个不同的问题:

  • 在执行大量异步任务时,为了能够提高性能,通常会减少每个任务的调用开销。
  • 提供了一系列多线程任务的管理方法,便于多任务执行时合理分配资源以及一些异常情况的处理。每个ThreadPoolExecutor还维护一些基本统计信息。例如:已完成任务的数量,当前获得线程数等。

参数说明

ThreadPoolExecutor提供了几个核心参数,方便开发人员根据具体场景合理分配线程资源。

  • corePoolSize:核心线程数,在线程池创建时就已初始化好的n个核心线程,即使线程空闲着也会一直保留在线程池中不被销毁,除非调用线程池方法设置了java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(true)(允许核心线程超时销毁)。
  • maximumPoolSize:线程池允许存在最大线程数。
  • keepAliveTime:当线程数大于核心线程数时,多余的线程在执行任务结束后等待新任务的最大等待时间。
  • unitTimeUnit类型,是keepAliveTime多余线程最大空余时间单位。
  • workQueue:必须指定一个阻塞队列,在线程池执行execute方法时新进来的任务在执行前都会保留到此队列里进入等待。
  • threadFactory:创建线程的工厂,默认采用Executors.defaultThreadFactory()创建线程。
  • handler:拒绝策略,当最大线程数已占满,且队列已满,此时线程池将触发拒绝策略,对新进来的任务做拒绝处理,具体的处理方案在后面详细分析(默认使用java.util.concurrent.ThreadPoolExecutor.AbortPolicy直接抛出异常拒绝处理)。

注:maximumPoolSize如果大于corePoolSize,则多出的部分线程数只有在阻塞队列workQueue占满时才会创建核心线程之外的线程去执行任务,如果我们设置的阻塞队列为无界队列(默认大小为Integer.MAX_VALUE),则队列永远无法占满,就不会去创建额外的线程进行工作,一般情况如果任务数足够,那么也是在队列大小还没达到Integer.MAX_VALUE时就已经出现内存溢出了。Executors线程池工厂中的newFixedThreadPool()、newSingleThreadExecutor()方法就是使用了无界队列LinkedBlockingQueue,防止内存溢出在日常开发过程中一般是不建议直接去使用Executors去创建线程池。

如何创建线程池

上面我们提到的可以使用Executors工厂直接创建线程池,但是Executors提供的创建线程池都是不可控的,我们还是得按自己的业务做好分析自定义一个线程池。

以下是线程池创建的一个案例:

@Slf4j
@Configuration
public class ThreadPoolConfig {

    @Value("${threadPool.corePoolSize:8}")
    private int corePoolSize;

    @Value("${threadPool.maximumPoolSize:16}")
    private int maximumPoolSize;

    @Value("${threadPool.keepAliveTime:60}")
    private int keepAliveTime;

    @Value("${threadPool.queueSize:99999}")
    private int queueSize;

    @Bean
    public ThreadPoolExecutor testExecutor() {
        LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize);
        return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, 
                TimeUnit.SECONDS, queue, getThreadFactory(), getRejectedExecutionHandler());
    }

    
    private ThreadFactory getThreadFactory() {
        return new ThreadFactory() {

            @Override
            public Thread newThread(Runnable r) {
                log.info("===> Create new thread ...");
                return new Thread(r);
            }
        };
    }

    
    private RejectedExecutionHandler getRejectedExecutionHandler() {
        return new RejectedExecutionHandler() {

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 继续往队列里添加任务,这里只是一个案例,这种方式并不友好,会抛出队列已满的异常
                log.info("===> Handler runnable ......");
                executor.getQueue().add(r);
            }
        };
    }
}

application.properties配置文件

threadPool:
  corePoolSize: 8
  maximumPoolSize: 16
  keepAliveTime: 60
  # 为方便测试这里我们配置队列数小一点
  queueSize: 99

由以上的线程池配置,我们写一个demo测试一下:

截取部分运行日志

  • 红框我们可以看到执行到了线程创建工厂部分代码块
  • 蓝色框日志我们可以看到largestPoolSize=11,这是由于我们配置的maximumPoolSize=16 > corePoolSize=8,我们demo执行的是110个任务并发,队列大小是99,由此分析得出(需要额外出创建线程数 = 并发任务总数110 - 核心线程数8 - 队列大小99 = 3),所以线程池在队列已满时会多创建3个线程用于执行任务,在达到keepAliveTime配置的最大空闲时间后这3个线程即会自动销毁。

注:可能有的同学会想线程池使用后需要销毁吗?在这里补充一下,如果我们是作为局部变量创建出来的线程池(如:在执行的方法内使用Executors.newFixedThreadPool(10)创建临时的线程池),这种情况我们用完就必须将它立即销毁,否则主线程就会一直处于运行状态。如果是全局配置的线程池,那么就是为整个系统中诸多业务提供使用的,这种就不需要对线程池做销毁,因为一旦销毁了其他的任务就无法继续使用该线程池执行任务。

  • 销毁线程池主要有两种方式:
    • shutdown():此方法对线程池做销毁,线程池会优先将剩余未完成的任务执行完才会执行销毁任务。
    • shutdownNow():此方法会对线程池做立即销毁,无论线程池中的任务是否执行完成。

拒绝策略

通常我们在配置好有限队列大小后,就会有可能出现队列占满的情况,这时候我们的拒绝策略就会起到作用,接下来我们就来分析一下RejectedExecutionHandler接口具体有哪一些实现方式:

  • AbortPolicy:线程池的默认拒绝策略,在jdk提供的ThreadPoolExecutor线程池中有一个默认线程池变量private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();作为默认拒绝策略,查看如下图源码可知它就是直接抛出RejectedExecutionException异常,并且会直接丢弃当前任务,如果担心异常影响后续任务执行开发人员需自行捕获异常处理

  • CallerRunsPolicy:只要在当线程池未被销毁的情况下,不丢弃任务直接使用主线程(调用线程池执行的线程)执行该任务。因为该策略是由主线程直接执行任务的,所以不建议在并发度高的情况下使用,建议在并发度较低且任务不允许失败的情况下才使用此策略

  • DiscardPolicy:直接丢弃当前任务,不做任何处理。直接丢弃任务的情况下,开发人员也无法排查到哪些任务被丢弃掉,一般不建议使用,除非是无关紧要的任务即使丢弃也无所谓的。

  • DiscardOldestPolicy:在线程池未被销毁的情况下,丢弃最早进入队列的一个任务(即最久未执行的任务),然后再重新将此任务加入线程池,在此策略下需注意被丢弃的任务的重要性,如果任务不重要可直接丢弃。

  • 自定义策略:在以上JDK提供的四种默认拒绝策略之外,我们还可以通过自定义的方式来处理被拒绝的任务。如果担心任务被拒绝或者被丢弃造成不可预估的问题,在时效性没有太大要求的情况下我们可以先将任务内容转换成数据入库做好日志记录,后续可以使用定时任务或者通过MQ消息延迟处理。由以上的线程池配置Demo中的拒绝策略改造伪代码如下:
private RejectedExecutionHandler getRejectedExecutionHandler() {
    return new RejectedExecutionHandler() {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 伪代码
            log.info("===> 可根据任务的重要性区分对待,将任务做转换入库延迟处理 ......");
        }
    };
}

总结

线程池是为了充分利用CPU资源,在合理分批使用的情况下能够极大的提高我们程序的性能,以上的参数配置仅作为参考,并没有一个标准的依据,只能在实际开发过程中开发人员自行多做一些测试来判断参数如何配置更加合理。

在拒绝策略配置方面,如果被拒绝的任务相对紧急且重要不可丢弃的情况下,此类任务可独立做一个线程池处理保证任务不丢失,程序只能慢慢优化变得越来越好,不可能有完美的程序即保证高性能又保证安全可靠。

到此这篇关于Java线程池ThreadPoolExecutor的文章就介绍到这了,更多相关Java线程池ThreadPoolExecutor内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关文章