冬眠的笔记
首页文章分类书单项目关于
冬眠
X

© 2026 冬眠的笔记 · 用文字记录思考,用思考改变生活

首页>文章>Java
Java线程池ThreadPoolExecutor并发

Java 线程池详解

从 Executor 接口到 ThreadPoolExecutor 源码深度剖析:核心参数、ctl 状态位、addWorker/runWorker 主循环、四大拒绝策略、Hook 钩子方法,以及生产环境的动态线程池、监控指标、线程池隔离、虚拟线程等最佳实践

冬眠
冬眠
专注于技术、阅读与思考
2025-11-19
发布日期
79 min read
阅读时长
浏览量
2026-05-11
最近更新
Java 线程池详解

为什么需要线程池

池化技术一般是为了解决由于频繁的对象创建、连接创建导致的性能问题,如数据库连接池、HTTP连接池、JVM线程池、Integer的常量池等。通过池化技术,固定维持一批连接或者线程对象,重复利用这些对象,避免频繁的创建和销毁,从而提高效率。

线程池解决两个不同的问题:

  • 执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销;
  • 可以限制和管理执行任务集合时消耗的资源,包括线程。

Executor

Java 中提供了以 Executor 为顶级接口的线程池规范和实现。

Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分

线程池家族: 线程池接口与实现类的继承关系

Executor 相比显式的线程创建更加常用。比如,当你为一系列任务创建线程并启动的时候,你可以不用 new Thread(new RunnableTask()).start(),而是用Executor,如下所示

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

Executor 并没有严格要求任务异步执行,支持在当前线程中马上执行提交的任务

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

但是更多的场景下,任务在调用方线程之外的其他线程中执行

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

ExecutorService

ExecutorService是Java并发包中的核心接口,继承自Executor接口,为线程池提供了更加完善和强大的功能。它不仅包含了Executor的基本任务执行能力,还扩展了任务管理、生命周期控制和结果获取等高级特性。

接口定义与继承关系

ExecutorService 接口位于java.util.concurrent包中。

ExecutorService 支持异步提交任务,不阻塞调用线程;同时在 Executor 的 execute(Runnable r) 之外利用 Future 机制支持有返回值的任务,还支持带超时的任务执行和结果获取,支持任务执行过程中的异常捕获和处理,也支持批量提交和执行任务。

ExecutorService 还对线程池的生命周期进行管理

  • 优雅关闭:提供shutdown()方法进行优雅关闭
  • 强制关闭:提供shutdownNow()方法进行强制关闭
  • 状态查询:提供isShutdown()和isTerminated()方法查询状态

与Executor接口的区别

特性 Executor ExecutorService
任务提交 execute(Runnable) submit()系列方法
返回值 无 Future对象
生命周期管理 无 shutdown()、shutdownNow()
批量操作 无 invokeAll()、invokeAny()
状态查询 无 isShutdown()、isTerminated()

方法概览

ExecutorService接口定义了以下核心方法:

任务提交方法

任务提交方法有两种:

execute() 方法提交一个Runnable任务,无返回值;

submit() 方法可以提交 Callable 和 Runnable,以及一个结果参数,用于在任务完成时返回结果。该方法可以抛出异常,可以通过Future.get()获取结果。

// 继承自Executor接口
void execute(Runnable command);

// ExecutorService新增的submit方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

execute() 在将来的某个时间执行给定的命令。该命令可以在新线程、线程池中的线程或调用线程中执行,这取决于Executor的实现类。

⚠️ 再次注意,execute() 方法没有返回值。

批量执行方法

ExecutorService 还支持批量执行任务的方法,包括 invokeAll() 和 invokeAny()。

invokeAll() 执行给定的所有任务,当所有任务完成时,返回一个保存其状态和结果的Future列表。对于列表中的每一个元素,Future.isDone()都返回true。

特点:

  • 等待所有任务完成
  • 返回Future列表,顺序与输入任务顺序一致
  • 阻塞直到所有任务完成
  • 已完成的任务可以正常终止,也可以通过引发异常终止

invokeAny() 执行给定的任务,返回其中一个成功完成的任务的结果(即,没有抛出异常的任务)。一旦有任务正常完成,就会取消其他未完成的任务。

特点:

  • 只要有一个任务成功完成就返回
  • 取消其他未完成的任务
  • 如果所有任务都抛出异常,则抛出ExecutionException
  • 适用于多个方案解决同一问题的场景,比如ping多个服务器,那个服务器返回最快,就给那个服务器发请求

两个方法都有带超时的版本。

带超时版本的invokeAll方法如果在指定时间内所有任务都完成,则返回结果;否则取消未完成的任务。返回的Future列表中,未完成的任务Future.isCancelled()为true。

带超时版本的invokeAny方法。如果在指定时间内没有任务完成,则抛出TimeoutException。

// 执行所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 执行任意一个任务
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

使用场景对比

方法 使用场景 返回时机 返回值
invokeAll 需要所有任务的结果 所有任务完成 List<Future<T>>
invokeAny 只需要一个结果,多种方案 任意一个任务完成 T

生命周期管理方法

shutdown() 方法将允许之前提交的任务在终止之前执行,但不会接受新任务。如果已关闭,调用没有额外的效果。

shutdownNow()方法尝试停止所有正在执行的任务,停止处理等待任务,并返回正在等待执行的任务列表。此方法不等待正在执行的任务终止。

在终止时,执行器没有正在执行的任务,没有等待执行的任务,也没有新的任务可以提交。

未使用的 ExecutorService 应该被关闭,以允许回收其资源。

awaitTermination() 方法用于等待线程池的终止,超出指定时间将返回false。该方法会在线程池执行关闭操作、线程处理超时、处理线程被中断等条件发生之后,一直阻塞,直到线程池中的所有任务都执行完成。

isShutdown()方法检查线程池是否已经关闭,如果已关闭,则返回TRUE。

isTerminated()方法判断任务是否已关闭,如果所有任务已关闭,请返回True。请注意,除非shutdown 或 shutdownNow首次被调用,否则isTerminated()永远不会为true。

// 关闭线程池
void shutdown();
List<Runnable> shutdownNow();

// 状态查询
boolean isShutdown();
boolean isTerminated();

// 等待终止
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

这里先放一张线程池状态流转的图,这张图后面会再详细说到。

线程池状态流转图:RUNNING → SHUTDOWN/STOP → TIDYING → TERMINATED
线程池状态流转图:RUNNING → SHUTDOWN/STOP → TIDYING → TERMINATED

shutdown()、awaitTermination(long timeout, TimeUnit unit) 和 shutdownNow() 配合执行,能做到优雅的关闭线程池。

/**
* 优雅关闭线程池
* 
* @param executor 要关闭的线程池
* @param timeoutSeconds 等待超时时间(秒)
*/
public static void shutdownGracefully(ExecutorService executor, long timeoutSeconds) {
    if (executor == null || executor.isShutdown()) {
        return;
    }
    
    executor.shutdown();
    try {
        if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
            executor.shutdownNow();
            if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                System.err.println("线程池未能正常关闭");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

AbstractExecutorService

AbstractExecutorService 是 ExecutorService 的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

AbstractExecutorService 提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,JDK中提供了 FutureTask 类作为 RunnableFuture 的实现类。例如,submit(Runnable) 的实现会创建一个关联的 RunnableFuture,并执行和返回。子类可以重写 newTaskFor 方法,以返回除 FutureTask 以外的 RunnableFuture 实现。

newTaskFor()

newTaskFor() 方法通过将不同类型的任务(Runnable 和 Callable)统一包装为 RunnableFuture,使 AbstractExecutorService 能够以一致的方式处理任务提交和执行。

//返回给定Runnable和默认值的RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
	return new FutureTask<T>(runnable, value);
}
//返回给定可调用任务的RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

AbstractExecutorService 子类可以通过覆盖 newTaskFor 方法来提供自定义的 RunnableFuture 实现。

下面是一个示例,该类自定义 ThreadPoolExecutor 以使用 CustomTask(RunnableFuture的子类)类而不是默认的FutureTask:

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    static class CustomTask<V> implements RunnableFuture<V> {...}

    protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
        return new CustomTask<V>(c);
    }
    protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
        return new CustomTask<V>(r, v);
    }
    // ... add constructors, etc.
}

submit()

submit() 方法用于提交一个 Callable 任务或 Runnable 任务,并返回一个 Future 表示异步计算的结果。这个在 ExecutorService 中已经说过了。

这里主要讲一下 submit() 的实现,以 submit(Runnable task) 为例,submit(Runnable task, T result) 和 submit(Callable<T> task) 的实现和 submit(Runnable task) 类似,这里就不详细说了。

public Future<?> submit(Runnable task) {
    //判空
    if (task == null) throw new NullPointerException();
    //调用newTaskFor方法返回RunnableFuture
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //提交任务
    execute(ftask);
    return ftask;
}

AbstractExecutorService 没有对 execute() 方法进行实现,具体实现是在 ThreadPoolExecutor 中进行的,这里先不说了。

invokeAny()

invokeAny() 主要基于私有方法 doInvokeAny() 实现,通过 ExecutorCompletionService 来管理任务的完成状态。

  1. 先提交一个任务
  2. 进入主循环
  3. 先判断是否有任务已经完成了,如果有直接结束
  4. 如果没有,提交新的任务,继续主循环
  5. 如果所有任务都提交了,任务仍然都在运行中,则根据timed配置进行超时等待或者阻塞等待
  6. 阻塞等待,拿到结果后,活跃任务数减一,判断是否执行成功,成功则直接返回,失败则记录异常,继续循环
  7. 超时等待时,如果有任务完成了,则重新计算超时时间,并拿到结果,活跃任务数减一,判断是否执行成功,成功则直接返回,失败则记录异常,继续循环

详细逻辑都在代码注释里了,请看:

/**
 * doInvokeAny方法的核心实现
 * 执行给定的任务集合,返回其中任意一个成功完成的任务结果
 * 一旦有任务成功完成,就会取消其他未完成的任务
 */
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) 
    throws InterruptedException, ExecutionException, TimeoutException {
    
    // 参数校验:空的任务列表,抛出NPE
    if (tasks == null)
        throw new NullPointerException();
    
    // 获取任务的数量
    int ntasks = tasks.size();
    
    // 参数校验:任务数量为0,抛出IllegalArgumentException
    if (ntasks == 0)
        throw new IllegalArgumentException();
    
    // 创建对应的Future列表,用于存储已提交的任务
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    
    // 创建ExecutorCompletionService,用于管理任务的完成状态
    // ExecutorCompletionService内部使用队列来存储已完成的任务
    ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

    // 为了提高效率,特别是在并行度有限的执行器中,
    // 在提交更多任务之前检查先前提交的任务是否已完成。
    // 这种交错执行加上异常处理机制解释了主循环的复杂性。

    try {
        // 如果所有任务都失败了,抛出最后记录的异常
        ExecutionException ee = null;
        
        // 如果有超时限制,计算截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        
        // 获取任务迭代器
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // 首先提交一个任务确保有任务在执行,其余任务逐步提交
        // 这样可以避免一次性提交所有任务造成的资源浪费
        futures.add(ecs.submit(it.next()));
        --ntasks;  // 减少待提交任务数
        int active = 1;  // 当前活跃(正在执行)的任务数

        // 主循环:持续检查任务完成情况并管理任务提交
        for (;;) {
            // 尝试获取一个已完成的任务(非阻塞)
            Future<T> f = ecs.poll();
            
            if (f == null) {
                // 没有任务完成
                if (ntasks > 0) {
                    // 还有任务未提交,继续提交下一个任务
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;  // 增加活跃任务数
                }
                else if (active == 0) {
                    // 没有活跃任务了,跳出循环
                    break;
                }
                else if (timed) {
                    // 有超时限制,带超时地等待任务完成
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();  // 超时抛出异常
                    // 重新计算剩余时间
                    nanos = deadline - System.nanoTime();
                }
                else {
                    // 无超时限制,阻塞等待任务完成
                    f = ecs.take();
                }
            }
            
            if (f != null) {
                // 有任务完成了
                --active;  // 减少活跃任务数
                try {
                    // 尝试获取任务结果,如果成功则直接返回
                    return f.get();
                } catch (ExecutionException eex) {
                    // 任务执行异常,记录异常但继续尝试其他任务
                    ee = eex;
                } catch (RuntimeException rex) {
                    // 运行时异常,包装成ExecutionException
                    ee = new ExecutionException(rex);
                }
            }
        }

        // 如果到这里说明所有任务都失败了
        if (ee == null)
            ee = new ExecutionException();  // 创建一个默认异常
        throw ee;  // 抛出最后记录的异常

    } finally {
        // 无论如何都要取消所有未完成的任务,释放资源
        // 这确保了一旦有任务成功,其他任务会被及时取消
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);  // true表示允许中断正在执行的任务
    }
}

不想看代码的可以看看这张图

invokeAny 主循环流程图:提交首任务 → 检查完成 → 继续提交或等待 → 取消未完成任务
invokeAny 主循环流程图:提交首任务 → 检查完成 → 继续提交或等待 → 取消未完成任务

invokeAll()

invokeAll() 方法用于批量执行任务并等待所有任务完成,它会一次性提交所有任务,等待所有任务完成后返回Future列表。在任务未完成时会进行阻塞等待,直到所有任务全部执行完毕,该方法不关注任务成功或失败,单个任务的失败不会影响其他任务的执行。如果程序执行问题,会在finally代码块中自动取消未完成的任务。

提供了带超时版本和不带超时版本的 invokeAll() 方法,两者的区别如下

特性 无超时版本 带超时版本
执行策略 立即提交所有任务 先创建Future,再逐个提交
等待方式 无限等待直到完成 在超时时间内等待
返回时机 所有任务完成后返回 所有任务完成或超时后返回
超时处理 无 超时后返回部分完成的结果
适用场景 数据批处理、文件批量操作 批量网络请求、有截止时间的计算任务
/**
 * invokeAll方法的无超时版本
 * 执行给定的所有任务,等待所有任务完成后返回Future列表
 * 该方法会阻塞直到所有任务都完成(成功或失败)
 */
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    // 参数校验:空任务集合抛出NPE
    if (tasks == null)
        throw new NullPointerException();
    
    // 创建Future列表,容量等于任务数量
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    
    // 标记是否正常完成,用于finally块中的清理逻辑
    boolean done = false;
    
    try {
        // 第一阶段:提交所有任务
        // 为每个任务创建RunnableFuture并立即执行
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);  // 将Callable包装成RunnableFuture
            futures.add(f);  // 添加到Future列表
            execute(f);      // 立即提交执行
        }
        
        // 第二阶段:等待所有任务完成
        // 遍历所有Future,确保每个任务都已完成
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {  // 如果任务还未完成
                try {
                    f.get();  // 阻塞等待任务完成(忽略结果)
                } catch (CancellationException ignore) {
                    // 忽略取消异常,任务被取消也算完成
                } catch (ExecutionException ignore) {
                    // 忽略执行异常,任务失败也算完成
                }
            }
        }
        
        // 所有任务都已完成,标记为成功
        done = true;
        return futures;  // 返回包含所有Future的列表
        
    } finally {
        // 如果没有正常完成(发生异常),取消所有未完成的任务
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);  // 强制取消任务
    }
}

/**
 * invokeAll方法的带超时版本
 * 执行给定的所有任务,在指定时间内等待任务完成
 * 超时后会返回当前状态的Future列表(可能包含未完成的任务)
 */
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
    // 参数校验:空任务集合抛出NPE
    if (tasks == null)
        throw new NullPointerException();
    
    // 将超时时间转换为纳秒
    long nanos = unit.toNanos(timeout);
    
    // 创建Future列表
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    
    // 标记是否正常完成
    boolean done = false;
    
    try {
        // 第一阶段:创建所有Future对象(但不立即执行)
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        // 计算截止时间
        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // 第二阶段:逐个提交任务并检查超时
        // 交错进行时间检查和任务执行,适用于并行度有限的执行器
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));  // 提交任务执行
            nanos = deadline - System.nanoTime();  // 重新计算剩余时间
            if (nanos <= 0L)  // 如果已经超时
                return futures;  // 直接返回当前状态的Future列表
        }

        // 第三阶段:等待所有任务完成或超时
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {  // 如果任务还未完成
                if (nanos <= 0L)  // 检查是否超时
                    return futures;  // 超时则直接返回
                try {
                    // 带超时地等待任务完成
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                    // 忽略取消异常
                } catch (ExecutionException ignore) {
                    // 忽略执行异常
                } catch (TimeoutException toe) {
                    // 单个任务超时,返回当前状态
                    return futures;
                }
                // 重新计算剩余时间
                nanos = deadline - System.nanoTime();
            }
        }
        
        // 所有任务都在超时时间内完成
        done = true;
        return futures;
        
    } finally {
        // 如果没有正常完成,取消所有未完成的任务
        if (!done) {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
}

ThreadPoolExecutor

ThreadPoolExecutor 是 ExecutorService的实现类,线程池的核心实现。ThreadPoolExecutor 中对于线程池的线程数、工作队列和拒绝策略进行定义,同时还提供了线程池的管理方法,如关闭线程池、中断线程池等,此外还维护一些基本统计信息,例如已完成任务的数量。

JDK 中也提供了 Executors 工厂类来创建线程池

方法名 说明
newCachedThreadPool() 无限线程池,具有自动线程回收的功能
newFixedThreadPool(int) 固定大小的线程池
newSingleThreadExecutor() 单后台线程
newScheduledThreadPool() 定时执行的线程池
newSingleThreadScheduledExecutor() 定时执行的线程池,但是只有一个线程

但是不建议直接使用 Executors 工厂类来创建线程池,比如 newFixedThreadPool 使用的是无界的 LinkedBlockingQueue ,可能会导致 OOM。

如果要自定义线程池参数,请参考核心参数章节。

ctl 属性

Java线程池的核心控制状态ctl是一个原子整数(AtomicInteger),它通过位操作将workerCount 和 runState 两个概念字段打包在一个32位整数中:

  1. ​高3位​​表示线程池状态(runState)
  2. 低29位​​表示工作线程数(workerCount),最大支持(2^29)-1(大约5亿)个线程

workerCount

workerCount,指示线程的有效数量。 workerCount是允许启动和不允许停止的工作线程数量。该值可能暂时不同于活动线程的实际数量,例如正在退出的线程在终止前仍在执行任务时。用户可见的池大小是工作集的当前大小。

runState

runState,指示线程是否运行、关闭等

状态 二进制高3位 十进制值 描述
RUNNING 111 -536870912 接受新任务并处理队列任务(初始化默认状态)
SHUTDOWN 000 0 拒绝新任务但处理队列任务(调用shutdown()触发)
STOP 001 536870912 拒绝新任务、中断进行中任务、丢弃队列任务(调用shutdownNow()触发)
TIDYING 010 1073741824 所有任务终止,准备执行terminated()钩子方法
TERMINATED 011 1610612736 terminated()方法执行完成

线程池状态流转图见前文 ExecutorService 一节,这里不再重复贴图。

状态值严格递增(RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED),便于直接通过整数比较判断状态,比如 isRunning(int c) 方法通过比较 c < SHUTDOWN(即数值是否小于 0)判断是否处于 RUNNING 状态。

//前3位是runStatus  后29位是workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//计算workerCount的位数,Integer.SIZE=32,则COUNT_BITS=29,workerCount最大值为2^29-1,大约5亿
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大的线程数量限制,2^29-1,大约5亿
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState存储在ctl的高位,前3位
private static final int RUNNING    = -1 << COUNT_BITS;//11100000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;//00000000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;//00100000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;//01000000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;//01100000000000000000000000000000

// 对ctl进行包装和拆包
//计算当前的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//计算当前线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//通过runStatus和workerCount获取ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池核心参数

参数 描述
corePoolSize 核心线程数,线程池中始终存活的线程数,即使它们处于空闲状态。如果将allowCoreThreadTimeOut设置为true,那么核心线程也会在等待新任务时保持空闲状态。
maximumPoolSize 最大线程数,线程池中允许的最大线程数。当线程数达到corePoolSize,且任务队列已满时,线程池会创建新的线程,直到线程数达到maximumPoolSize。
keepAliveTime 空闲线程的存活时间,当线程数大于corePoolSize时,空闲线程的最大存活时间。如果将allowCoreThreadTimeOut设置为true,那么核心线程也会在空闲时被终止。
unit keepAliveTime的时间单位,例如TimeUnit.SECONDS。
workQueue 任务队列,用于保存等待执行的任务。当所有核心线程都在处理任务时,新任务将被放入队列中等待执行。
threadFactory 线程工厂,用于创建新线程。
RejectedExecutionHandler 拒绝策略,当线程池无法接受新任务时,会触发拒绝策略。常见的拒绝策略有AbortPolicy(默认)、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。

execute()

ThreadPoolExecutor 运行机制:

  1. 如果线程池中的线程数量少于 corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
  2. 如果线程池中的线程数量大于等于 corePoolSize,但缓冲队列 workQueue 未满,则将新添加的任务放到 workQueue 中,按照 FIFO 的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);
  3. 如果线程池中的线程数量大于等于 corePoolSize,且缓冲队列 workQueue 已满,但线程池中的线程数量小于 maximumPoolSize,则会创建新的线程来处理被添加的任务;
  4. 如果线程池中的线程数量等于了 maximumPoolSize,则执行拒绝策略。
线程池任务执行流程:核心线程 → 队列 → 非核心线程 → 拒绝策略
线程池任务执行流程:核心线程 → 队列 → 非核心线程 → 拒绝策略

线程池的运行机制,其实就是靠 execute() 方法实现的。execute(Runnable command) 主要负责提交任务,任务提交到线程池之后,线程池会根据当前线程池的状态,以及线程池的参数,来决定如何处理任务。

先看上面的运行机制流程,然后拿着这个流程去看 execute() 的代码逻辑。

public void execute(Runnable command) {
  	//空判断,不能传入null
    if (command == null)
        throw new NullPointerException();
    /*
     * 分3步执行
     * 
     * 1. 如果池中线程数量小于corePoolSize,则尝试去启动一个新线程执行给定的第一个任务。
     * 对addWorker的调用会自动检查runState和workerCount,从而通过返回false来防止误报,
     * 因为误报会在不应该添加线程的时候添加线程。 —— 核心线程创建
     *
     * 2. 如果一个任务可以成功入队,那么我们仍然需要再次检查是否应该添加一个线程
     * (因为现有的线程在上次检查之后已经死亡),或者在进入这个方法之后线程池已经关闭。
     * 因此,我们将重新检查状态,并在必要时回滚队列,或者在没有队列时启动一个新线程。
     * 
     * 3. 如果任务无法入队,则尝试去创建一个新的线程。 —— 非核心线程创建
     * 4. 如果创建失败,我们知道线程池已经关闭或饱和,所以拒绝了新的任务 —— 拒绝策略
     */
  	//获取ctl的值
    int c = ctl.get();
  	//线程数小于corePoolSize时,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        //true 表示创建核心线程 false 创建非核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //这里做线程池状态检测,是为了防止在线程池关闭时或关闭后提交任务,线程池在shutdown过程中不可提交任务
    //workQueue.offer() 是判断工作队列是否已满,如果入队失败,则去尝试创建非核心线程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //这里的二次检查是为了防止在入队成功后,线程池状态发生了改变
        if (!isRunning(recheck) && remove(command))
            reject(command);
        /**
         * 这里是为了防止任务饿死,当任务成功入队后,如果此时线程池中的工作线程数量为0,
         * 那么队列中的任务将永远不会被执行,因为没有线程去消费这些任务。
         * 为什么会出现线程数为0的情况:
         * 在高并发环境下,可能出现以下时序:
         * 线程池中最后一个工作线程刚好执行完任务并退出
         * 同时有新任务提交并成功入队
         * 此时线程池状态仍然是RUNNING,但工作线程数为0
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

添加线程的方法是 addWorker(Runnable firstTask, boolean core), 第二个参数core标记是否是核心线程。

addWorker 通过CAS的方式添加线程,添加成功直接跳出循环,失败则重新获取ctl的值,然后进行自旋。CAS 是为了防止多线程并发添加线程时,线程数超过maximumPoolSize,导致添加失败。

先说下JDK21的实现。

private boolean addWorker(Runnable firstTask, boolean core) {
    // label,退出循环时用 —— 这个语法,感觉是10年的概念了,真的很少见
    retry:
    for (int c = ctl.get();;) {
        // 检查线程池状态,只有在必要时才检查队列是否为空
        // 如果线程池状态 >= SHUTDOWN,且满足以下任一条件则返回false:
        // 1. 线程池状态 >= STOP
        // 2. firstTask不为null(SHUTDOWN状态下不接受新任务)
        // 3. 工作队列为空(SHUTDOWN状态下如果队列为空则不需要新线程)
        if (runStateAtLeast(c, SHUTDOWN)  && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
            return false;

        // 内层循环:尝试增加工作线程数量
        for (;;) {
            // 检查当前工作线程数是否已达到限制
            // core为true时检查是否超过corePoolSize,false时检查是否超过maximumPoolSize
            if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            
            // 使用CAS操作尝试增加工作线程计数
            if (compareAndIncrementWorkerCount(c))
                break retry; // CAS成功,跳出外层循环
            
            // CAS失败,重新读取ctl值
            c = ctl.get();
            
            // 如果线程池状态发生变化(>=SHUTDOWN),重新开始外层循环
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 否则是因为workerCount变化导致的CAS失败,继续内层循环重试
        }
    }

    // 工作线程创建和启动的相关标志
    boolean workerStarted = false;  // 工作线程是否成功启动
    boolean workerAdded = false;    // 工作线程是否成功添加到workers集合
    Worker w = null;
    
    try {
        // 创建新的Worker对象,Worker内部会创建一个Thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        
        if (t != null) {
            // 获取主锁,确保线程安全地操作workers集合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 持有锁期间再次检查线程池状态
                // 防止在获取锁之前线程池状态发生变化或ThreadFactory创建线程失败
                int c = ctl.get();

                // 允许添加工作线程的条件:
                // 1. 线程池正在运行,或者
                // 2. 线程池状态 < STOP 且 firstTask为null(SHUTDOWN状态下可以添加线程处理队列中的任务)
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 检查线程状态,新创建的线程应该处于NEW状态
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    
                    // 将Worker添加到workers集合中
                    workers.add(w);
                    workerAdded = true;
                    
                    // 更新线程池历史最大线程数记录  largestPoolSize没有用volatile修饰,所以必须在mainLock中进行修改
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                // 释放主锁
                mainLock.unlock();
            }
            
            // 如果Worker成功添加到集合中,启动工作线程
            if (workerAdded) {
                container.start(t);  // 启动线程
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败,进行清理工作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    
    // 返回工作线程是否成功启动
    return workerStarted;
}

compareAndIncrementWorkerCount 方法通过CAS的方式增加线程数。

private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

这里要说明一点,我们总是说线程池里面有核心线程和非核心线程,其实线程池内部是没有进行核心线程和非核心线程的区分的,在 addWorker() 被调用时传入 true 表示创建核心线程,false 表示创建非核心线程,但是实际在addWorker() 中只是做一个数量判断依据,如下:

if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))

所以,核心线程不会回收的说法也是不准确的,因为根本就没办法区分核心线程和非核心线程,实际区分的其实是一个数量,线程数达到核心线程数之后,再进行线程回收,线程池的线程数也不会低于核心线程数了(当然,设置了allowCoreThreadTimeOut=true的情况例外)。

对于 Worker,本质上是一个基于 AQS 实现的非可重入独占锁,重写了 tryAcquire、tryRelease,并对外提供 lock、tryLock、unlock 等方法。

那为什么 Worker 不直接用 ReentrantLock,而要自己用 AQS 实现一个非可重入锁?核心原因有两点:

  1. 防止任务执行中被误中断:shutdown() 内部会调用 interruptIdleWorkers() 中断"空闲" Worker,判断空闲的方式就是 w.tryLock()——如果能拿到锁,说明该 Worker 当前没有在跑任务,可以安全中断;如果拿不到锁,说明 Worker 正在执行任务,不能中断。runWorker 在执行任务前会调用 w.lock(),执行完再 w.unlock(),从而把"是否在跑任务"映射成"锁是否被持有"。
  2. 避免重入引发的状态混乱:如果使用可重入锁,Worker 在执行任务时调用 setCorePoolSize 等需要操作 worker 集合的方法时,可能因为重入而被 shutdown() 误判成空闲然后中断,所以这里必须用非可重入的独占锁。

实现本身不复杂,感兴趣可以去翻 ThreadPoolExecutor.Worker 的源码,配合 AQS 一起看会更清楚。

runWorker:工作线程的主循环

addWorker 启动线程后,Worker 的 run() 方法会调用 runWorker(this)。runWorker 是整个线程池真正"干活"的入口,它做的事情可以归纳为四步:

  1. 取任务:先执行构造时传入的 firstTask,之后通过 getTask() 从队列里循环取任务,取不到(返回 null)就退出循环。
  2. 加锁标记忙碌:执行任务前 w.lock(),让 shutdown 时的 interruptIdleWorkers 跳过自己。
  3. 执行任务:调用 beforeExecute → task.run() → afterExecute。任务抛出的异常会被记录到 thrown 变量,但不会让 Worker 直接挂掉。
  4. 退出清理:进入 finally 后 w.unlock(),循环外异常或 getTask 返回 null 时,由 processWorkerExit 收尾。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断(构造时 state=-1 禁止中断,这里释放)
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池处于 STOP 及以上状态,确保线程被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false; // 正常退出(getTask 返回 null)
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit 负责把 Worker 从 workers 集合移除、累计 completedTaskCount、调用 tryTerminate() 判断是否需要把线程池推进到 TIDYING / TERMINATED 状态。如果是异常退出(completedAbruptly = true),还会调用 addWorker(null, false) 补一个新的 Worker,保证线程数符合配置。

tryTerminate() 是状态流转的关键:当线程池处于 SHUTDOWN 且队列为空,或者处于 STOP 时,会通过 CAS 把状态推进到 TIDYING,调用 terminated() 钩子,再推进到 TERMINATED 并唤醒所有 awaitTermination 的等待者。

核心线程数 corePoolSize

接下来咱们再看核心线程数这个属性。

在创建了线程池后,默认情况下,线程池中并没有 任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者 prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

调用 prestartAllCoreThreads() 或者 prestartCoreThread() 来实现提前创建线程,这种方式叫做线程池预热。线程池预热主要用于在系统启动或负载增加前预先创建和初始化线程池中的线程,以减少任务到来时的延迟和资源竞争,如在促销活动、秒杀或突发流量场景下,线程池需要快速扩容以应对高并发。如果线程池未预热,临时创建线程会导致任务堆积或超时。

线程池预热的代码比较简单,就是在调用方法时,通过 addWorker(null, true) 来创建核心线程,实现比较简单。

public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

最大线程数 maximumPoolSize

maximumPoolSize 指示线程池中最多能创建多少个线程

通过将corePoolSize和maximumPoolSize设置相同,可以创建一个固定大小的线程池 —— Executors.newFixedThreadPool(int) 就是将两者设置为相同值。

通过将maximumPoolSize设置为本质上无界的值,例如Integer.MAX_VALUE(Executors.newCachedThreadPool()就是这么做的),允许线程池容纳任意数量的并发任务 —— 但是会有OOM的风险,生产中千万不要这样使用。

corePoolSize 和 maximumPoolSize 一般只在 ThreadPoolExecutor 创建时配置,但也可以使用 setCorePoolSize(int) 和setMaximumPoolSize(int) 动态更改 —— 动态线程池。

任务队列 workQueue

workQueue 是一个 BlockingQueue<Runnable> ,存储等待执行的任务。通过 workQueue,可以平滑处理突发流量,避免因任务激增导致系统崩溃,也可以限制任务数量,防止线程池过载(如内存耗尽)。

目前 Executors 中提供的几个工厂方法,主要用到了SynchronousQueue、LinkedBlockingQueue 和 DelayQueue

队列类型​ ​​特性​ ​​适用场景​ 应用
SynchronousQueue 无缓冲队列,直接移交任务给线程,不存储任务。需配合无界线程数使用。 高并发、低延迟场景 如实时请求处理,但需避免任务堆积导致拒绝。 Executors.newCachedThreadPool()
LinkedBlockingQueue 默认无界,容量为Integer.MAX_VALUE,也可设为有界。链表结构,吞吐量高。 任务量波动大但需避免资源耗尽需手动设置合理容量。 Executors.newFixedThreadPool()
ArrayBlockingQueue 有界队列,数组实现,插入/删除效率稳定。 固定资源下的任务控制如数据库连接池,防止内存溢出。 暂无
DelayQueue 无界队列,任务按过期时间排序,仅到期任务可被取出。 定时任务如订单超时取消、延迟执行场景。 Executors.newScheduledThreadPool()

空闲存活时间 keepAliveTime

线程空闲存活时间。当 worker 队列中没有待执行任务、线程持续空闲超过 keepAliveTime 时,超出核心线程数的那部分线程会被回收;线程数等于核心线程数时不再回收,以保证池中始终有 corePoolSize 个线程可立即响应。

如果希望连核心线程也参与回收,可以调用 allowCoreThreadTimeOut(true),此后核心线程在空闲超时后也会被销毁。

public boolean allowsCoreThreadTimeOut() {
  return allowCoreThreadTimeOut;
}

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();
    }
}

线程池中的工作线程(Worker)通过 getTask() 方法从任务队列(workQueue)中获取任务。keepAliveTime 的生效逻辑主要体现在此方法中。

getTask 方法的 keepAliveTime 生效逻辑:核心线程 take 阻塞、非核心线程 poll 超时退出
getTask 方法的 keepAliveTime 生效逻辑:核心线程 take 阻塞、非核心线程 poll 超时退出

我们知道 execute() 方法提交任务后,会执行 addWorker() 方法向任务队列添加任务,在添加成功之后,会通过 SharedThreadContainer 的 start(Thread t) 启动线程,执行 Worker 的 run() 方法。

而 Worker 的 run() 方法只有一行代码

public void run() {
    runWorker(this);
}

runWoker 方法通过 getTask() 从任务队列中获取待执行的任务信息

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//省略了部分代码
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

可以看到,如果开启了 allowCoreThreadTimeOut 或者当前线程数大于核心线程数,则会以 poll(keepAliveTime, ...) 的方式带超时阻塞获取任务,如果超过 keepAliveTime 时间仍未取到任务,poll 返回 null,getTask() 随即返回 null,runWorker 主循环因为拿不到任务而退出,Worker 通过 processWorkerExit 走线程回收流程;否则以 take() 的方式无超时阻塞等待,直到获取到任务信息。

注意:poll 超时返回 null,而不是抛出 InterruptedException。只有线程被中断时才会抛出该异常。不了解 poll() 和 take() 区别的,可以去找 BlockingQueue 的文章看看。

线程工厂 ThreadFactory

线程工厂 ThreadFactory 主要通过 newThread() 方法创建线程,在创建线程时,可以定制线程的属性,比如线程名、线程优先级、是否守护线程等。JVM 中默认使用 DefaultThreadFactory 来创建线程。

不建议使用 DefaultThreadFactory,因为需要根据业务场景设置线程名称,而 DefaultThreadFactory 线程名称是 "pool-1001-thread-1002 这种形式的,没有业务意义。

以下是一个线程工厂的案例,可以做为参考。

/**
 * 增强版线程工厂
 * 
 * @param namePrefix 线程名称前缀
 * @param daemon 是否为守护线程
 * @param priority 线程优先级
 * @return ThreadFactory
 */
public static ThreadFactory createEnhancedThreadFactory(String namePrefix, 
                                                       boolean daemon, 
                                                       int priority) {
    return new ThreadFactory() {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final ThreadGroup group = Thread.currentThread().getThreadGroup();
        
        @Override
        public Thread newThread(Runnable r) {
            String threadName = namePrefix + "-thread-pool-" + threadNumber.getAndIncrement();
            Thread thread = new Thread(group, r, threadName, 0);
            
            // 设置守护线程
            thread.setDaemon(daemon);
            
            // 设置线程优先级 不建议,因为用处不大,如果设置不好,甚至会导致线程饥饿
            if (thread.getPriority() != priority) {
                thread.setPriority(priority);
            }
            
            // 设置异常处理器
            thread.setUncaughtExceptionHandler((t, e) -> {
                log.error("Thread {} threw exception {} ", t.getName(), e.getMessage());
            });
            return thread;
        }
    };
}

也可以直接使用 Guava 的 ThreadFactoryBuilder。

// 使用Guava的ThreadFactoryBuilder
ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat("MyPool-%d")
    .setDaemon(false)
    .setPriority(Thread.NORM_PRIORITY)
    .setUncaughtExceptionHandler((t, e) -> {
        log.error("Thread exception: ", e);
    })
    .build();

或者 Apache Commons 的 BasicThreadFactory:

ThreadFactory threadFactory = new BasicThreadFactory.Builder()
    .namingPattern("worker-%d")
    .daemon(false)
    .priority(Thread.NORM_PRIORITY)
    .uncaughtExceptionHandler((t, e) -> {
        // 异常处理逻辑
    })
    .build();

这里正好讲一下关于线程名和守护线程。

线程名最好是使用有意义的线程名前缀,包含业务模块信息,方便后期问题排查和监控。

是否设置为守护线程,看业务选择和场景。

  • 对于业务线程,建议设置为非守护线程;
  • 对于后台清理线程,建议设置为守护线程。

同时需要注意,要避免守护线程执行重要业务逻辑。因为当所有非守护线程结束时,JVM会立即退出,不会等待守护线程完成;这时候,如果守护线程运行了一些重要业务逻辑,那么会被强制中断,可能会出现丢失数据的情况。

public class DaemonThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread daemonThread = new Thread(() -> {
            try {
                // 模拟重要的业务逻辑:保存用户数据
                System.out.println("开始保存用户数据...");
                Thread.sleep(5000); // 模拟耗时操作
                System.out.println("用户数据保存完成!"); // 这行可能永远不会执行
            } catch (InterruptedException e) {
                System.out.println("数据保存被中断!");
            }
        });
        
        daemonThread.setDaemon(true); // 设置为守护线程
        daemonThread.start();
        
        // 主线程很快结束
        Thread.sleep(1000);
        System.out.println("主线程结束,JVM即将退出");
        // JVM退出,守护线程被强制终止,数据可能丢失
    }
}

跑远了,前面说了线程工厂是创建线程用的,那线程工厂是在哪个环节起作用的呢?

在 new Worker() 的时候!

在 Worker 的构造器中,通过 getThreadFactory().newThread(this) 创建线程,而 getThreadFactory() 方法返回的是线程工厂,默认是 DefaultThreadFactory。

Worker(Runnable firstTask) {
    // 省略其他代码
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

拒绝策略RejectedExecutionHandler

RejectedExecutionHandler 对象,拒绝执行处理器,通常叫做拒绝策略。

ThreadPoolExecutor 中内置了四种拒绝策略

拒绝策略 作用 适用场景
CallerRunsPolicy 由调用者线程直接执行被拒绝的任务 不允许丢任务、希望对上游产生反压(back-pressure):调用方线程被占住后无法继续提交,从而拖慢生产速度,给下游消化时间
AbortPolicy 直接抛出 RejectedExecutionException,丢弃新任务 默认策略,适合让调用方感知失败、由业务层决定重试或降级
DiscardPolicy 静默丢弃新任务,不抛异常 任务可丢、对调用方无感的场景,如埋点上报
DiscardOldestPolicy 丢弃队列中最早的任务,再尝试提交新任务 只关心最新数据的场景,如实时行情推送

关于 CallerRunsPolicy:它最大的价值不在于"代为执行",而在于反压——把任务塞回调用线程后,调用线程在执行完之前无法再次调用 execute,相当于天然限流。在 MQ 消费、批量导入等"宁可慢,不可丢"的场景里非常好用。

AbortPolicy 是默认的拒绝策略

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

几种拒绝策略的实现如下

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    //由调用者线程来执行新来的工作任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //检查线程池是否被关闭
        if (!e.isShutdown()) {
            //注意:这里是R,而不是e,线程直接执行run方法
            r.run();
        }
    }
}

public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }

    //直接抛出RejectedExecutionException异常,丢弃掉新来的工作任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {
  
    public DiscardPolicy() { }

    //将新来的工作任务丢弃
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        //检查线程池是否被关闭
        if (!e.isShutdown()) {
            //线程池没有关闭时,去获取工作队列中第一个工作任务,出队, 但是不执行,相当于将最早的一个工作任务丢弃了
            e.getQueue().poll();
            //提交新来的工作任务 —— 这里也不是马上执行新来的任务,而是重新提交任务,可能立即执行,也可能入队,取决于线程池的状态
            e.execute(r);
        }
    }
}

自定义拒绝策略

当然,开发者也可以自定义拒绝策略,参考上面几个拒绝策略的实现即可。

以下是我基于 Spring Retry 实现的一个线程池拒绝策略,可以在提交任务失败时,根据 RetryRejectedExecutionConfig 的配置进行重试。RetryRejectedExecutionConfig 中指定了重试次数、重试间隔时间、重试间隔递增因子、最大重试间隔时间等参数。 如果重试仍然失败,则打印日志和堆栈信息,并抛出异常。

import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 基于Spring Retry的线程池拒绝策略
 * 当线程池无法接受新任务时,会进行重试,最多重试指定次数
 * 重试间隔时间、重试间隔递增因子、最大重试间隔时间等参数可以根据实际情况进行配置
 * 重试完成后仍未提交成功,则抛出异常,并打印日志
 */
@Slf4j
@Component
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
    
    private final RetryRejectedExecutionConfig config;
    private final RetryTemplate retryTemplate;
    private final AtomicLong rejectedCount = new AtomicLong(0);
    private final AtomicLong retrySuccessCount = new AtomicLong(0);
    private final AtomicLong retryFailedCount = new AtomicLong(0);
    
    public RetryRejectedExecutionHandler(RetryRejectedExecutionConfig config) {
        this.config = config;
        this.retryTemplate = createRetryTemplate();
    }
    
    /**
     * 创建重试模板
     */
    private RetryTemplate createRetryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        // 设置重试策略
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(config.getMaxAttempts());
        template.setRetryPolicy(retryPolicy);
        
        // 设置退避策略(指数退避)
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(config.getRetryInterval());
        backOffPolicy.setMultiplier(config.getBackoffMultiplier());
        backOffPolicy.setMaxInterval(config.getMaxRetryInterval());
        template.setBackOffPolicy(backOffPolicy);
        
        return template;
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!config.isEnabled()) {
            // 如果未启用重试策略,则直接抛出异常
            log.error("线程池任务被拒绝,重试策略未启用。线程池状态: 核心线程数={}, 最大线程数={}, 当前线程数={}, 队列大小={}",
                    executor.getCorePoolSize(), executor.getMaximumPoolSize(), 
                    executor.getPoolSize(), executor.getQueue().size());
            throw new java.util.concurrent.RejectedExecutionException(
                    "Task " + r.toString() + " rejected from " + executor.toString());
        }
        
        long rejectedNumber = rejectedCount.incrementAndGet();
        log.warn("线程池任务被拒绝,开始重试。拒绝次数: {}, 任务: {}", rejectedNumber, r.toString());
        
        try {
            // 使用Spring Retry进行重试
            retryTemplate.execute(new RetryCallback<Void, Exception>() {
                @Override
                public Void doWithRetry(RetryContext context) throws Exception {
                    try {
                        // 尝试重新提交任务
                        executor.execute(r);
                        if (context.getRetryCount() > 0) {
                            log.info("任务重试成功,重试次数: {}, 任务: {}", context.getRetryCount(), r.toString());
                            retrySuccessCount.incrementAndGet();
                        }
                        return null;
                    } catch (java.util.concurrent.RejectedExecutionException e) {
                        log.warn("任务重试失败,重试次数: {}/{}, 任务: {}, 错误: {}", 
                                context.getRetryCount(), config.getMaxAttempts(), r.toString(), e.getMessage());
                        throw e;
                    }
                }
            });
        } catch (Exception e) {
            // 所有重试都失败了,记录错误日志并打印堆栈
            retryFailedCount.incrementAndGet();
            log.error("任务重试全部失败,已达到最大重试次数: {}。线程池状态: 核心线程数={}, 最大线程数={}, 当前线程数={}, 活跃线程数={}, 队列大小={}, 已完成任务数={}",
                    config.getMaxAttempts(),
                    executor.getCorePoolSize(),
                    executor.getMaximumPoolSize(),
                    executor.getPoolSize(),
                    executor.getActiveCount(),
                    executor.getQueue().size(),
                    executor.getCompletedTaskCount());
            log.error("被拒绝的任务详情: {}", r.toString());
            log.error("重试失败堆栈信息: ", e);
            
            // 最终还是要抛出异常,让调用方知道任务执行失败
            throw new java.util.concurrent.RejectedExecutionException(
                    "Task " + r.toString() + " rejected from " + executor.toString() + 
                    " after " + config.getMaxAttempts() + " retry attempts", e);
        }
    }
    
    /**
     * 获取拒绝统计信息
     */
    public String getStatistics() {
        return String.format("拒绝策略统计 - 总拒绝次数: %d, 重试成功次数: %d, 重试失败次数: %d", 
                rejectedCount.get(), retrySuccessCount.get(), retryFailedCount.get());
    }
    
    /**
     * 重置统计信息
     */
    public void resetStatistics() {
        rejectedCount.set(0);
        retrySuccessCount.set(0);
        retryFailedCount.set(0);
        log.info("重试拒绝策略统计信息已重置");
    }
}

RetryRejectedExecutionConfig 的源码如下


import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * 重试拒绝策略配置
 */
@Data
@Component
@ConfigurationProperties(prefix = "moon.threadpool.retry")
public class RetryRejectedExecutionConfig {
    
    /**
     * 最大重试次数,默认5次
     */
    private int maxAttempts = 5;
    
    /**
     * 重试间隔时间(毫秒),默认100ms
     */
    private long retryInterval = 100L;
    
    /**
     * 重试间隔递增因子,默认1.5
     */
    private double backoffMultiplier = 1.5;
    
    /**
     * 最大重试间隔时间(毫秒),默认1秒
     */
    private long maxRetryInterval = 1000L;
    
    /**
     * 是否启用重试策略,默认true
     */
    private boolean enabled = true;
}

Hook methods

ThreadPoolExecutor 提供了受保护的可重写 beforeExecute(Thread,Runnable) 和 afterExecute(Runnable,Throwable) 方法,这两个方法在每个任务执行之前和之后都会被调用。

这些可以用来操纵执行环境;例如,重新初始化 ThreadLocals、收集统计信息或添加日志条目。此外,可以重写 terminated() 以执行 Executor 完全终止后需要执行的任何特殊处理。

如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。具体路径是:beforeExecute 抛异常会导致任务根本不执行,异常顺着 runWorker 抛出,Worker 在 finally 中被 processWorkerExit 当成异常退出(completedAbruptly=true),线程池随后补一个新的 Worker。afterExecute 抛异常也是类似的链路。所以在钩子里要么自己 try-catch 兜底,要么明确知道异常会带来 Worker 重建的开销。

此类的大多数扩展都会覆盖一个或多个受保护的钩子方法。例如,下面是一个子类,它添加了一个简单的暂停/恢复功能:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(...) { super(...); }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) 
                unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}

实际这种设计思路可以在日常开发中进行借鉴,比如封装一个公共的保存数据的工具类,提供一个保存方法,进行数据库的更新,同时定义一个保存前方法,进行保存前的数据校验;定义一个保存后方法,进行数据保存后的处理,比如发送邮件、短信等。

通过这种方法,业务方只需要继承该工具类,实现保存前方法和保存后方法,关注校验和后续的处理,而不需要关心数据库的更新逻辑。

//定义一个保存前方法
public void saveBefore() {
    //进行数据校验
    //...
}

//定义一个保存方法
public void save(T t) {
    saveBefore();
    common.save(t);
    saveAfter();
}

//定义一个保存后方法
public void saveAfter() {
    //进行数据保存后的处理
    //...
}

Executors

Executors 是线程池的工厂类,提供了几个静态方法快速创建线程池。但生产环境强烈不建议直接使用,《阿里巴巴 Java 开发手册》也明确禁用。原因如下:

工厂方法 内部实现 风险
newFixedThreadPool(int) 固定线程数 + 无界 LinkedBlockingQueue 任务无限堆积,可能 OOM
newSingleThreadExecutor() 1 个线程 + 无界 LinkedBlockingQueue 同上
newCachedThreadPool() maximumPoolSize = Integer.MAX_VALUE + SynchronousQueue 线程数失控,耗尽系统资源(OOM 或操作系统拒绝创建线程)
newScheduledThreadPool(int) DelayedWorkQueue(无界) 延迟任务堆积导致 OOM

正确做法是直接 new ThreadPoolExecutor(...),显式指定有界队列和合理的拒绝策略,并用自定义的 ThreadFactory 设置有意义的线程名。具体参考前文「自定义线程池」一节。

线程池实践

线程池死锁

一个典型的线程池死锁的场景就是主子任务共用一个线程池。

比如线程池中核心线程数和最大线程数都为1,任务队列长度为10

此时将主任务提交到线程池中,线程池创建核心线程执行主任务,此时核心线程数为1,最大线程数为1,任务队列中没有任务。

主任务中提交了3个子任务到线程池中,此时因为核心线程数已满,所以3个子任务进入任务队列,等待主任务释放线程资源;

而主任务又必须等3个子任务执行完成才能继续执行,所以此时主任务也在等待。

这样主任务等待子任务执行完成,子任务又无法获取线程资源,无法执行,最终陷入无限等待,引起线程池死锁。

这就是线程池死锁的本质:线程池中的线程在等待其他任务完成,但这些任务又需要线程池中的线程来执行,而线程池已经没有可用线程了。

线程池死锁通常发生在以下场景:

  1. 任务嵌套提交:正在执行的任务又向同一个线程池提交新任务
  2. 线程池容量有限:线程池的线程数量不足以同时处理所有层级的任务
  3. 同步等待:外层任务同步等待内层任务完成(如使用Future.get())

这一点在适用 CompletableFuture 时尤为常见,因为 CompletableFuture 在多核CPU服务器上默认使用的线程池是 ForkJoinPool,如下代码所示

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(ASYNC_POOL, supplier);
}

private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

private static final boolean USE_COMMON_POOL = (ForkJoinPool.getCommonPoolParallelism() > 1);

ForkJoinPool.getCommonPoolParallelism() 返回的是 ForkJoinPool 中线程的并行级别,这个并行级别是在 JVM 启动时就确定的,通常是 CPU 核心数 - 1,如果是单核服务器,则返回1,所以在超过2核的服务器上执行时 USE_COMMON_POOL 就为true。

虽然可以通过java.util.concurrent.ForkJoinPool.common.parallelism参数进行设置,但是不建议这样做,因为这样做会影响到整个应用程序的线程池并行级别,可能会导致一些未知的问题。

所以在 CompletableFuture 提交任务时,如果不指定线程池,则统一使用 ForkJoinPool.commonPool(),当涉及到主子任务执行时就特别容易出现前面所说的线程池死锁问题。

public class ThreadPoolDeadlockExample {
    private static final ExecutorService executor = Executors.newFixedThreadPool(1);
    
    public static void main(String[] args) {
        Future<?> future = executor.submit(() -> {
            System.out.println("主任务开始执行");
            // 在主任务中提交子任务并等待结果
            Future<?> subTask = executor.submit(() -> {
                System.out.println("子任务执行");
            });
            try {
                subTask.get(); // 死锁发生在这里
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("主任务结束");
        });
        
        try {
            future.get(5, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            System.out.println("发生死锁,任务超时");
        }
    }
}

解决这个问题的方式也很简单,就是在使用 CompletableFuture 时指定线程池,且主子任务的线程池要区分开来。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    
}, customThreadPool);

线程池隔离的实战做法:除了主子任务隔离,生产环境中还推荐做以下层级的隔离,避免单个线程池故障扩散导致全链路雪崩。

  1. 按业务模块隔离:核心交易、支付、消息推送各用独立线程池,避免推送任务积压拖垮交易;
  2. 按任务类型隔离:I/O 密集型与 CPU 密集型分开,参数配置差异大,混用会互相干扰;
  3. 按调用层级隔离:外部接口调用(HTTP、RPC)与内部计算分开,外部超时不影响内部;
  4. 按重要程度隔离:核心链路与非核心链路分开,必要时给非核心线程池配 DiscardPolicy 让位。

Hystrix / Sentinel 等熔断框架本质上也是基于线程池隔离实现资源保护的。

线程池参数配置

可能网上有很多资料都说线程池配置的时候要区分I/O密集型和CPU密集型,CPU密集型核心数设置为CPU核数+1,I/O密集型设置为核心数的两倍,或者按照 CPU核心数 * (1 + 平均I/O等待时间 / 平均CPU计算时间) 来计算。

其实这些都有一定的道理,但是不能一概而论。

我们在使用线程池时确实要按照执行的任务类型对线程池进行区分配置。

I/O 密集型,主要阻塞在I/O操作上,CPU空闲的时间比较长,比如读取文件、HTTP请求、数据库读取等,主要等待外部的资源,所以可以将核心线程数设置大一点,保证更多的任务去同时执行;

CPU 密集型,线程持续占用CPU,几乎不阻塞,比如数学计算、字符串处理、压缩、排序等,所以不能将核心线程数设置的过大,否则会导致CPU占用率过高,影响其他任务的执行。

但是不能粗暴的完全按照 CPU核数+1 或 核心数2倍 的方式来设置,因为服务中可能同时会存在多个线程池,线程池之间也会进行资源抢占,影响执行效率。

最好的方法是先根据历史经验设置一个初值,然后进行大量的压测,根据压测结果进行调整。

JDK 21 起虚拟线程(Virtual Threads)正式 GA,对 I/O 密集型场景的线程池配置思路有较大冲击:虚拟线程极其轻量(几 KB 栈),I/O 阻塞不再占用平台线程,传统"核数 × 2"的公式失去意义。在 I/O 密集型任务上可以直接用 Executors.newVirtualThreadPerTaskExecutor(),不再需要精细调参;但 CPU 密集型仍然要用平台线程池,因为虚拟线程不能真正并行计算。这是新项目选型时值得关注的方向。

那,如果生产环境由于突发的情况导致的线程资源耗尽或任务等待、执行的时间过长,应该怎么处理呢?

答案是动态线程池。

动态线程池

我们前面提到过 ThreadPoolExecutor 提供了一些 get set 方法

方法 作用
setCorePoolSize(int) 设置核心线程数
getCorePoolSize() 获取核心线程数
setMaximumPoolSize(int) 设置最大线程数
getMaximumPoolSize() 获取最大线程数
setKeepAliveTime(long,TimeUnit) 设置空闲线程存活时间
getKeepAliveTime(TimeUnit) 获取空闲线程存活时间
getActiveCount() 获取活跃线程数
getQueue() 获取任务队列

可以基于这些方法进行线程池的监管,当需要调整线程池参数时,通过 set 方法进行重新设置。

网上也有一些开源的动态线程池方案,如 Dynamic TP、Hippo4 等,可以去了解下。

生产环境关键监控指标

只暴露 set/get 接口还不够,要做到"能调"必须先做到"看得见"。生产环境建议至少埋点以下指标,并配置告警:

指标 计算方式 告警建议
活跃比 activeCount / maximumPoolSize 持续 > 0.8 提示扩容
队列堆积率 queue.size() / queueCapacity 持续 > 0.8 提示队列将满
拒绝次数 自定义 RejectedExecutionHandler 内 AtomicLong 累加 出现非零即告警
任务平均执行耗时 beforeExecute 记录开始时间到 ThreadLocal,afterExecute 计算并打点 同比突增 50%+ 告警
任务排队耗时 在 Runnable 包装类里记 submit 时间,runWorker 真正开始时计算差值 反映线程池是否处理不过来
历史最大线程数 largestPoolSize 接近 maximumPoolSize 提示需要扩容

任务执行耗时可以通过继承 ThreadPoolExecutor 并重写 beforeExecute / afterExecute 实现:

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<>();

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTime.set(System.nanoTime());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long cost = System.nanoTime() - startTime.get();
        startTime.remove();
        // 上报到 Micrometer / Prometheus
        metrics.recordTaskDuration(cost);
    }
}

这些指标接入 Prometheus + Grafana 或公司监控平台后,配合动态参数调整能力,就构成了一个完整的动态线程池闭环。

自定义线程池

这里提供一个线程池工厂,提供创建 I/O 密集型(createIOIntensiveThreadPoolWithRetry)、CPU 密集型(createCPUIntensiveThreadPoolWithRetry)的线程池,也可以根据业务需要自由指定线程池的参数(createCustomThreadPoolWithRetry),同时提供了优雅关闭线程池的方法(shutdownGracefully)。

import com.moon.cloud.threadpool.registry.ThreadPoolRegistry;
import com.moon.cloud.threadpool.rejector.RetryRejectedExecutionHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义线程池工厂类,支持创建IO密集型和CPU密集型线程池
 */
@Slf4j
public class MoonThreadPoolFactory {

    /**
     * 创建IO密集型线程池(使用重试拒绝策略)
     * IO密集型任务特点:大量的网络请求、文件读写等,线程经常处于阻塞状态
     * 线程数配置:通常设置为 2 * CPU核心数,因为IO操作会阻塞线程
     * 
     * @param threadNamePrefix 线程名称前缀
     * @param retryHandler 重试拒绝策略处理器
     * @return ExecutorService
     */
    public static ExecutorService createIOIntensiveThreadPoolWithRetry(String threadNamePrefix, 
                                                                       RetryRejectedExecutionHandler retryHandler) {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        int maximumPoolSize = corePoolSize * 2;
        long keepAliveTime = 60L;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                createThreadFactory(threadNamePrefix + "-io-"),
                retryHandler != null ? retryHandler : new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ThreadPoolRegistry.register(threadNamePrefix, executor);
        return executor;
    }

    /**
     * 创建CPU密集型线程池(使用重试拒绝策略)
     * CPU密集型任务特点:大量的计算操作,线程持续占用CPU资源
     * 线程数配置:通常设置为 CPU核心数 + 1,避免过多线程导致上下文切换开销
     * 
     * @param threadNamePrefix 线程名称前缀
     * @param retryHandler 重试拒绝策略处理器
     * @return ExecutorService
     */
    public static ExecutorService createCPUIntensiveThreadPoolWithRetry(String threadNamePrefix,
                                                                        RetryRejectedExecutionHandler retryHandler) {
        //当某个线程因为页缺失(page fault)或其它短暂阻塞(如系统调用)时,额外的线程可以立即填补这个空档,保持CPU处于忙碌状态。
        int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
        int maximumPoolSize = corePoolSize;
        long keepAliveTime = 60L;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(500),
                createThreadFactory(threadNamePrefix + "-cpu-"),
                retryHandler != null ? retryHandler : new ThreadPoolExecutor.AbortPolicy()
        );
        ThreadPoolRegistry.register(threadNamePrefix, executor);
        return executor;
    }

    /**
     * 创建自定义线程池(使用重试拒绝策略)
     * 
     * @param corePoolSize 核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime 线程空闲时间
     * @param queueCapacity 队列容量
     * @param threadNamePrefix 线程名称前缀
     * @param retryHandler 重试拒绝策略处理器
     * @return ExecutorService
     */
    public static ExecutorService createCustomThreadPoolWithRetry(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            int queueCapacity,
            String threadNamePrefix,
            RetryRejectedExecutionHandler retryHandler) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(queueCapacity),
                createThreadFactory(threadNamePrefix + "-custom-"),
                retryHandler != null ? retryHandler : new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ThreadPoolRegistry.register(threadNamePrefix, executor);
        return executor;
    }

    /**
     * 创建线程工厂
     * 
     * @param namePrefix 线程名称前缀
     * @return ThreadFactory
     */
    private static ThreadFactory createThreadFactory(String namePrefix) {
        return new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final ThreadGroup group = Thread.currentThread().getThreadGroup();


            @Override
            public Thread newThread(Runnable r) {
                String threadName = namePrefix + "-thread-" + threadNumber.getAndIncrement();
                Thread thread = new Thread(group, r, threadName, 0);

                // 设置守护线程
                thread.setDaemon(true);

                // 设置异常处理器
                thread.setUncaughtExceptionHandler((t, e) -> {
                    System.err.println("Thread " + t.getName() + " threw exception: " + e.getMessage());
                    e.printStackTrace();
                });

                return thread;
            }
        };
    }

    /**
     * 优雅关闭线程池
     * 
     * @param executor 要关闭的线程池
     * @param timeoutSeconds 等待超时时间(秒)
     */
    public static void shutdownGracefully(ExecutorService executor, long timeoutSeconds) {
        if (executor == null || executor.isShutdown()) {
            return;
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                executor.shutdownNow();
                if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                    System.err.println("线程池未能正常关闭");
                }
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

ThreadPoolRegistry 主要用于线程池的注册,在创建线程池时,会将线程池注册到 ThreadPoolRegistry 中,以便后续进行管理。线程池注册到 ThreadPoolRegistry 后,可以根据线程池的名称获取线程池实例对象,再进行线程池的一些其他操作,比如我在 SpringBoot 自定义 Endpoint 实现线程池动态管理 一文中就基于 ThreadPoolRegistry 实现了一个线程池监控 Endpoint,也可以通过 Endpoint 进行线程池的动态管理。


import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

import com.moon.cloud.threadpool.factory.MoonThreadPoolFactory;

public class ThreadPoolRegistry {

    private static final ConcurrentHashMap<String, ThreadPoolExecutor> registry = new ConcurrentHashMap<>();

    /**
     * 注册线程池
     * @param poolName
     * @param executor
     */
    public static void register(String poolName, ThreadPoolExecutor executor) {
        registry.put(poolName, executor);
    }

    /**
     * 获取线程池实例
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor getExecutor(String poolName) {
        return registry.get(poolName);
    }

    /**
     * 获取所有线程池名称
     * @return
     */
    public static Set<String> getAllPoolNames() {
        return registry.keySet();
    }

    public static void shutdown(String poolName) {
        ThreadPoolExecutor executor = registry.get(poolName);
        if (executor != null) {
            MoonThreadPoolFactory.shutdownGracefully(executor, 120);
        }
    }

    public static void shutdown(ThreadPoolExecutor executor) {
        if (executor != null) {
            MoonThreadPoolFactory.shutdownGracefully(executor, 120);
        }
    }
}

其他

关于线程池,暂时想到的就这么多,后面如果对线程池有了更深的认识,再进行补充吧。

这篇文章内容比较多,前前后后整理了近 2 个星期,真是太难了。

参考资料

  • 美团技术博客:Java 线程池实现原理及其在美团业务中的实践
  • OpenJDK 21 源码:java.util.concurrent.ThreadPoolExecutor、AbstractExecutorService
  • 《Java 并发编程实战》(Brian Goetz)第 6、8 章
  • SpringBoot 自定义 Endpoint 实现线程池动态管理
  • 本系列其他文章:ThreadLocal、AbstractQueuedSynchronizer (AQS)、BlockingQueue

文章标签

Java线程池ThreadPoolExecutor并发
线程 FullGC 问题排查
上一篇

线程 FullGC 问题排查

2025-11-19

自定义线程池工厂类
下一篇

自定义线程池工厂类

2025-11-19

冬眠

冬眠

博主

专注于技术、阅读与思考。在这里记录学习、思考与生活。

116
文章
2
分类
关注我
系列:Java 线程池

第 3 篇,共 3 篇

上一篇

自定义线程池工厂类

已是最后一篇

文章目录

目录

  • 为什么需要线程池
  • Executor
  • ExecutorService
  • AbstractExecutorService
  • ThreadPoolExecutor
  • 线程池核心参数
  • Hook methods
  • Executors
  • 线程池实践
  • 其他
  • 参考资料

相关文章

查看更多
自定义线程池工厂类

自定义线程池工厂类

2025-11-19 · 4 min read

ThreadLocal 详解

ThreadLocal 详解

2025-11-19 · 25 min read

JUC 中的队列

JUC 中的队列

2025-11-19 · 25 min read