# 线程池与ThreadPoolExecutor类浅析

# 一、抛砖引玉

既然Java中支持以多线程的方式来执行相应的任务,但为什么在JDK1.5中又提供了线程池技术呢?这个问题大家自行脑补,多动脑,肯定没坏处,哈哈哈。。。

说起Java中的线程池技术,在很多框架和异步处理中间件中都有涉及,而且性能经受起了长久的考验。可以这样说,Java的线程池技术是Java最核心的技术之一。

在Java的高并发领域中,Java的线程池技术是一个永远绕不开的话题。既然Java的线程池技术这么重要(怎么能说是这么重要呢?那是相当的重要,那家伙老重要了,哈哈哈)。

那么,本文我们就来简单的说下线程池与ThreadPoolExecutor类。至于线程池中的各个技术细节和ThreadPoolExecutor的底层原理和源码解析,我们会在【高并发专题】专栏中进行深度解析。

引言:本文是高并发中线程池的开篇之作,就暂时先不深入讲解,只是让大家从整体上认识下线程池中最核心的类之一——ThreadPoolExecutor,关于ThreadPoolExecutor的底层原理和源码实现,以及线程池中的其他技术细节的底层原理和源码实现,我们会在接下来的文章中,进行死磕。

# 二、Thread直接创建线程的弊端

# 1. 缺乏线程管理

  • 直接使用 Thread 启动线程后,无法方便地进行线程池管理,可能导致线程过多,消耗大量系统资源,影响性能。

# 2. 无法复用

  • Thread 对象不能复用,每次执行任务都要创建新的线程,增加了线程创建和销毁的开销。

# 3. 资源消耗大

  • 频繁创建和销毁线程会带来较高的系统开销(包括 CPU 资源和内存分配),影响程序的稳定性。

# 4. 缺乏灵活性

  • 无法控制线程数量、任务调度等,不适用于需要高并发或线程复用的场景。

# 5. 不利于异常处理

  • 如果线程内部抛出未捕获的异常,可能导致整个应用程序崩溃,而线程池可以提供更好的异常处理机制。

# 6. 难以进行任务调度

  • 不能方便地进行任务的优先级控制、任务超时、定期执行等,而 ExecutorService 提供了更强大的管理能力。

# 三、线程池的好处

# 1. 降低资源消耗

  • 线程池复用已有线程,避免频繁创建和销毁线程所带来的 CPU 和内存开销,提高系统性能。

# 2. 提高系统响应速度

  • 由于线程池中有空闲线程可用,任务可以快速获取线程执行,而无需等待新线程创建。

# 3. 线程管理

  • 线程池提供了统一的线程管理机制,包括:
    • 控制最大并发数(核心线程数 & 最大线程数)。
    • 设置任务队列,防止无限制创建线程导致内存溢出。
    • 配置线程超时时间,避免长时间占用资源。

# 4. 避免系统过载

  • 线程池可以限制最大线程数量,防止由于创建过多线程导致 CPU 过载、内存溢出等问题。

# 5. 提供更好的任务调度

  • 线程池支持:
    • 定时执行(如 ScheduledThreadPoolExecutor)。
    • 任务优先级管理(通过自定义 BlockingQueue)。
    • 拒绝策略(当任务队列满时的处理策略)。

# 6. 便于异常处理

  • ThreadPoolExecutor 提供 afterExecute() 等回调方法,可用于异常捕获和监控,避免线程因未处理异常而终止。

# 7. 增强代码可维护性

  • 线程池使并发代码结构更加清晰,避免了散乱的线程创建和管理,提高可读性和可维护性。

# 四、线程池

# 1.线程池类结构关系

线程池中的一些接口和类的结构关系如下图所示。

后文会死磕这些接口和类的底层原理和源码。

# 2.创建线程池常用的类——Executors

  • Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的大小超过了需要,可以灵活回收空闲线程,如果没有可回收线程,则新建线程
/**
创建一个线程池,该线程池根据需要创建新线程,但之前构建的线程也可重用它们。这些池通常会提高执行许多短期异步任务的程序的性能。如果没有可重用的现有线程,将创建一个新线程并将其添加到池中。60 秒未使用的线程将终止并从缓存中删除。因此,保持空闲足够长时间的池不会消耗任何资源
*/
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
1
2
3
4
5
6
7
8

代码:

package io.binghe.concurrent.lab06;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 可缓存的线程池,其特点是线程数量不固定,根据任务数量动态创建或回收线程,适用于大量短生命周期的任务。
 * @author CurleyG
 * @date 2025/3/25 23:48
 */
public class CachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                try {
                    Thread.sleep(2000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  • Executors.newFixedThreadPool:创建一个定长的线程池,可以控制线程的最大并发数,超出的线程会在队列中等待
/**
该方法用于创建一个固定大小的线程池,核心线程数和最大线程数相同,线程池中的线程会复用,并且使用无界队列存储待执行的任务
    1.线程池始终维持 nThreads 个线程,不会动态扩展或收缩。
    2.线程不会因空闲而被销毁,除非手动 shutdown() 线程池
    3.使用 LinkedBlockingQueue 作为任务队列
      3.1队列是无界的(实际上是 Integer.MAX_VALUE),意味着任务不会因队列满而被拒绝,但可能导致 OOM(内存溢出)。
      3.2当所有线程都在执行任务时,新任务会被存入队列,等待线程可用时执行。
    4.如果线程池中的某个线程在执行任务时发生异常而终止(崩溃),那么线程池会自动创建一个新的线程来代替它,以确保线程池仍然维持固定数量的线	  程,并继续执行后续任务	
  
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

代码:

/**
 * 创建一个固定大小的线程池
 * @author CurleyG
 * @date 2025/3/25 23:46
 */
public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池,包含 3 个线程
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交 5 个任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                try {
                    Thread.sleep(2000); // 模拟任务执行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown(); // 关闭线程池
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  • Executors.newScheduledThreadPool:创建一个定长的线程池,支持定时、周期性的任务执行
  /**
  创建一个可调度的线程池,可以执行定时任务或周期性任务,类似于 Timer,但功能更强大。
	corePoolSize:线程池的核心线程数,表示线程池会始终保持的线程数量(即使线程空闲)

  */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
   //最终调用的方法
  public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

代码:

package io.binghe.concurrent.lab06;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author CurleyG
 * @date 2025/3/25 23:51
 */

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

        Runnable task = () -> System.out.println("任务执行时间:" + System.currentTimeMillis());

        // 延迟 3 秒执行
        scheduler.schedule(task, 3, TimeUnit.SECONDS);

        scheduler.shutdown();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  • Executors.newSingleThreadExecutor: 创建一个单线程化的线程池,使用一个唯一的工作线程执行任务,保证所有任务按照指定顺序(先入先出或者优先级)执行
//创建一个单线程的线程池,确保所有任务按顺序执行,适用于需要顺序执行任务或确保线程安全的场景。    
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
1
2
3
4
5
6
7

代码:

package io.binghe.concurrent.lab06;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 单线程的线程池,确保所有任务按顺序执行,适用于需要顺序执行任务或确保线程安全的场景
 * @author CurleyG
 * @date 2025/3/25 23:53
 */

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskId);
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  • Executors.newSingleThreadScheduledExecutor:创建一个单线程化的线程池,支持定时、周期性的任务执行
// 创建一个单线程的调度线程池,支持定时任务和周期任务,但始终只有一个线程,任务按顺序执行  
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

1
2
3
4
5
6

代码:

package io.binghe.concurrent.lab06;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 创建一个单线程的调度线程池,支持定时任务和周期任务,但始终只有一个线程,任务按顺序执行
 * @author CurleyG
 * @date 2025/3/25 23:55
 */
public class SingleThreadScheduledExecutorExample {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

        Runnable task = () -> System.out.println(Thread.currentThread().getName() + " 执行任务,时间:" + System.currentTimeMillis());

        // 延迟 2 秒执行
        scheduler.schedule(task, 2, TimeUnit.SECONDS);

        scheduler.shutdown();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  • Executors.newWorkStealingPool:创建一个具有并行级别的work-stealing线程池
//创建一个 ForkJoinPool 线程池,支持任务并行执行,并且使用**"工作窃取"**(work-stealing)算法来提高 CPU 利用率
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

1
2
3
4
5
6
7
8

代码:

package io.binghe.concurrent.lab06;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
 *  创建一个 ForkJoinPool 线程池,支持任务并行执行,并且使用work-stealing)算法来提高 CPU 利用率
 * @author CurleyG
 * @date 2025/3/25 23:58
 */


public class WorkStealingPoolExample {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newWorkStealingPool();

        // 提交多个任务
        IntStream.range(1, 10).forEach(i ->
                executor.submit(() -> {
                    System.out.println(Thread.currentThread().getName() + " 执行任务 " + i);
                    try {
                        Thread.sleep(1000); // 模拟任务执行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                })
        );

        // 等待子任务执行完
        Thread.sleep(3000);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 3.线程池实例的几种状态

ThreadPoolExecutor 中,线程池有 5 种状态,由 ctl 变量(AtomicInteger)控制:

状态 描述
RUNNING 线程池正常运行,可以接受新任务 & 执行队列中的任务。
SHUTDOWN 线程池停止接收新任务,但仍执行队列中的任务
STOP 线程池不再接收新任务 & 丢弃队列中的任务,中断正在执行的任务。
TIDYING 所有任务完成,线程数=0,即将调用 terminated() 回调。
TERMINATED 线程池完全终止,已执行 terminated()

注意:不需要对线程池的状态做特殊的处理,线程池的状态是线程池内部根据方法自行定义和处理的。

线程池状态转换关系

RUNNING  -> SHUTDOWN   (调用 shutdown())
RUNNING  -> STOP       (调用 shutdownNow())
SHUTDOWN -> TIDYING    (任务执行完,线程数变为 0)
STOP     -> TIDYING    (任务中断后,线程数变为 0)
TIDYING  -> TERMINATED (执行 terminated() 方法)
1
2
3
4
5
状态 触发方式 能否接收任务 执行队列任务 中断执行中任务
RUNNING 默认 ✅ 是 ✅ 是 ❌ 否
SHUTDOWN shutdown() ❌ 否 ✅ 是 ❌ 否
STOP shutdownNow() ❌ 否 ❌ 否 ✅ 是
TIDYING 任务执行完 & 线程=0 ❌ 否 ❌ 否 ❌ 否
TERMINATED 执行 terminated() ❌ 否 ❌ 否 ❌ 否

正常关闭shutdown()(推荐)

紧急停止shutdownNow()

检测是否关闭isShutdown() & isTerminated()

# 4.合理配置线程的一些建议

(1)CPU密集型任务,就需要尽量压榨CPU,参考值可以设置为NCPU+1(CPU的数量加1)。 (2)IO密集型任务,参考值可以设置为2*NCPU(CPU数量乘以2)

# 五、线程池最核心的类之一——ThreadPoolExecutor

# 1.构造方法

ThreadPoolExecutor参数最多的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler rejectHandler) 
1
2
3
4
5
6
7

其他的构造方法都是调用的这个构造方法来实例化对象,可以说,我们直接分析这个方法之后,其他的构造方法我们也明白是怎么回事了!接下来,就对此构造方法进行详细的分析。

注意:为了更加深入的分析ThreadPoolExecutor类的构造方法,会适当调整参数的顺序进行解析,以便于大家更能深入的理解ThreadPoolExecutor构造方法中每个参数的作用。

上述构造方法接收如下参数进行初始化:

(1)corePoolSize:核心线程数量。

(2)maximumPoolSize:最大线程数。

(3)workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响。

其中,上述三个参数的关系如下所示:

  • 如果运行的线程数小于corePoolSize,直接创建新线程处理任务,即使线程池中的其他线程是空闲的。
  • 如果运行的线程数大于等于corePoolSize,并且小于maximumPoolSize,此时,只有当workQueue满时,才会创建新的线程处理任务。
  • 如果设置的corePoolSize与maximumPoolSize相同,那么创建的线程池大小是固定的,此时,如果有新任务提交,并且workQueue没有满时,就把请求放入到workQueue中,等待空闲的线程,从workQueue中取出任务进行处理。
  • 如果运行的线程数量大于maximumPoolSize,同时,workQueue已经满了,会通过拒绝策略参数rejectHandler来指定处理策略。

根据上述三个参数的配置,线程池会对任务进行如下处理方式:

当提交一个新的任务到线程池时,线程池会根据当前线程池中正在运行的线程数量来决定该任务的处理方式。处理方式总共有三种:直接切换、使用无限队列、使用有界队列。

  • 直接切换常用的队列就是SynchronousQueue。
  • 使用无限队列就是使用基于链表的队列,比如:LinkedBlockingQueue,如果使用这种方式,线程池中创建的最大线程数就是corePoolSize,此时maximumPoolSize不会起作用。当线程池中所有的核心线程都是运行状态时,提交新任务,就会放入等待队列中。
  • 使用有界队列使用的是ArrayBlockingQueue,使用这种方式可以将线程池的最大线程数量限制为maximumPoolSize,可以降低资源的消耗。但是,这种方式使得线程池对线程的调度更困难,因为线程池和队列的容量都是有限的了。

根据上面三个参数,我们可以简单得出如何降低系统资源消耗的一些措施:

  • 如果想降低系统资源的消耗,包括CPU使用率,操作系统资源的消耗,上下文环境切换的开销等,可以设置一个较大的队列容量和较小的线程池容量。这样,会降低线程处理任务的吞吐量。
  • 如果提交的任务经常发生阻塞,可以考虑调用设置最大线程数的方法,重新设置线程池最大线程数。如果队列的容量设置的较小,通常需要将线程池的容量设置的大一些,这样,CPU的使用率会高些。如果线程池的容量设置的过大,并发量就会增加,则需要考虑线程调度的问题,反而可能会降低处理任务的吞吐量。

接下来,我们继续看ThreadPoolExecutor的构造方法的参数。

(4)keepAliveTime:线程没有任务执行时最多保持多久时间终止 当线程池中的线程数量大于corePoolSize时,如果此时没有新的任务提交,核心线程外的线程不会立即销毁,需要等待,直到等待的时间超过了keepAliveTime就会终止。

(5)unit:keepAliveTime的时间单位

(6)threadFactory:线程工厂,用来创建线程 默认会提供一个默认的工厂来创建线程,当使用默认的工厂来创建线程时,会使新创建的线程具有相同的优先级,并且是非守护的线程,同时也设置了线程的名称

(7)rejectHandler:拒绝处理任务时的策略

如果workQueue阻塞队列满了,并且没有空闲的线程池,此时,继续提交任务,需要采取一种策略来处理这个任务。

线程池总共提供了四种策略:

  • 直接抛出异常,这也是默认的策略。实现类为AbortPolicy。
  • 用调用者所在的线程来执行任务。实现类为CallerRunsPolicy。
  • 丢弃队列中最靠前的任务并执行当前任务。实现类为DiscardOldestPolicy。
  • 直接丢弃当前任务。实现类为DiscardPolicy。

# 2.ThreadPoolExecutor提供的启动和停止任务的方法

(1)execute():提交任务,交给线程池执行 (2)submit():提交任务,能够返回执行结果 execute+Future (3)shutdown():关闭线程池,等待任务都执行完 (4)shutdownNow():立即关闭线程池,不等待任务执行完

# 3.ThreadPoolExecutor提供的适用于监控的方法

(1)getTaskCount():线程池已执行和未执行的任务总数 (2)getCompletedTaskCount():已完成的任务数量 (3)getPoolSize():线程池当前的线程数量 (4)getCorePoolSize():线程池核心线程数 (5)getActiveCount():当前线程池中正在执行任务的线程数量