Java线程池详解
本文详解Java线程池,重点解析ThreadPoolExecutor的核心参数、工作原理与拒绝策略。同时涵盖了合理配置、Executors使用风险及优雅关闭等最佳实践。
这是一份关于 Java 线程池的详细解析,从核心概念、工作原理到实践建议,力求全面且易于理解。
一、为什么需要线程池?
在不使用线程池的情况下,我们通常会为每个任务创建一个新线程:new Thread(runnable).start()。这种方式存在几个严重问题:
- 高昂的资源开销:线程的创建和销毁是重量级操作,需要与操作系统交互,涉及内存分配、上下文切换等,开销很大。频繁创建和销毁线程会严重影响性能。
- 资源耗尽风险:如果并发请求量巨大,无限制地创建线程会耗尽系统内存和 CPU 资源,可能导致
OutOfMemoryError或系统崩溃。 - 缺乏统一管理:无法方便地控制并发线程的数量、监控其状态、或进行统一的关闭和异常处理。
线程池正是为了解决这些问题而设计的。它的核心思想是:将线程的创建和管理与任务的执行解耦,通过复用已创建的线程来执行任务。
二、线程池的核心优势
- 降低资源消耗:通过复用线程,避免了频繁创建和销毁线程的开销。
- 提高响应速度:任务到达时,无需等待线程创建,可以直接使用池中的空闲线程执行,缩短了响应时间。
- 提高可管理性:可以统一分配、调优和监控线程,控制最大并发数,防止资源耗尽。
- 提供更多功能:支持定时执行、周期性执行、并发数控制等高级功能。
三、核心类:ThreadPoolExecutor
Java 线程池的核心实现是 java.util.concurrent.ThreadPoolExecutor。理解它,就理解了 Java 线程池的精髓。我们先来看它最复杂的构造函数,因为它包含了所有核心参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
1. 七大核心参数详解
corePoolSize(核心线程数)- 线程池中保持存活的常驻线程数量,即使它们是空闲的。
- 除非设置了
allowCoreThreadTimeOut,否则核心线程不会被销毁。
maximumPoolSize(最大线程数)- 线程池能容纳的线程最大数量。
- 当工作队列满了,且当前线程数小于最大线程数时,才会创建新线程。
keepAliveTime(空闲线程存活时间)- 当线程池中的线程数量超过
corePoolSize时,多余的空闲线程在被销毁前可以存活的时间。
- 当线程池中的线程数量超过
unit(时间单位)keepAliveTime的时间单位(如TimeUnit.SECONDS)。
workQueue(工作队列)- 用于存放等待执行的任务的阻塞队列。这是线程池的核心缓冲机制。常见的队列类型有:
ArrayBlockingQueue:基于数组的有界阻塞队列,必须指定容量,按 FIFO(先进先出)排序。LinkedBlockingQueue:基于链表的阻塞队列,容量可选(默认是Integer.MAX_VALUE,相当于无界),按 FIFO 排序。当使用无界队列时,maximumPoolSize参数会失效,因为任务永远不会填满队列。SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等待一个移除操作,反之亦然。通常配合较大的maximumPoolSize使用(如Executors.newCachedThreadPool())。PriorityBlockingQueue:具有优先级的无界阻塞队列。
- 用于存放等待执行的任务的阻塞队列。这是线程池的核心缓冲机制。常见的队列类型有:
threadFactory(线程工厂)- 用于创建新线程的工厂。可以自定义线程的名称、是否为守护线程、优先级等。自定义线程名对于调试和问题排查非常重要。
handler(拒绝策略)- 当工作队列已满且线程数达到
maximumPoolSize后,新任务将无法处理,此时会调用拒绝策略。JDK 提供了四种默认策略:AbortPolicy(默认):直接抛出RejectedExecutionException异常,阻止系统正常工作。CallerRunsPolicy:"调用者运行"策略。既不抛弃任务,也不抛出异常,而是将任务回退给调用者(提交任务的线程),由它来执行。这是一种有效的流量削峰和反压机制。DiscardPolicy:直接丢弃任务,不进行任何处理,也没有任何异常。DiscardOldestPolicy:丢弃队列中最老的任务(队首任务),然后尝试重新提交当前任务。
- 当工作队列已满且线程数达到
2. 工作原理(任务提交流程)
当一个新任务通过 execute() 方法提交给 ThreadPoolExecutor 时,处理流程如下:
判断核心线程池是否已满:检查当前运行的线程数是否小于
corePoolSize。- 如果是,则创建新的核心线程来执行任务,即使其他核心线程是空闲的。
- 如果否,则进入下一步。
判断工作队列是否已满:尝试将任务添加到
workQueue中。- 如果添加成功,任务将在队列中等待被空闲线程取出执行。
- 如果添加失败(通常因为队列是有界的且已满),则进入下一步。
判断最大线程池是否已满:检查当前运行的线程数是否小于
maximumPoolSize。- 如果是,则创建新的非核心线程(也叫“救急线程”)来执行任务。
- 如果否(即当前线程数已等于
maximumPoolSize),则进入最后一步。
执行拒绝策略:调用
RejectedExecutionHandler来处理这个无法执行的任务。
四、Executors 工具类(不推荐在生产环境使用)
Executors 类提供了一些方便的静态工厂方法来创建不同类型的线程池,但它们存在一些潜在风险,阿里巴巴《Java开发手册》中强制要求不使用 Executors 创建线程池。
Executors.newFixedThreadPool(int nThreads)- 特点:创建固定大小的线程池。
corePoolSize和maximumPoolSize相等。 - 队列:使用
LinkedBlockingQueue(无界队列)。 - 风险:由于队列是无界的,当任务堆积时可能导致大量任务在队列中等待,最终耗尽内存,引发
OutOfMemoryError。
- 特点:创建固定大小的线程池。
Executors.newSingleThreadExecutor()- 特点:创建只有一个线程的线程池,保证所有任务按顺序执行。
- 队列:使用
LinkedBlockingQueue(无界队列)。 - 风险:与
newFixedThreadPool相同,存在内存耗尽的风险。
Executors.newCachedThreadPool()- 特点:创建一个可缓存的线程池。
corePoolSize为 0,maximumPoolSize为Integer.MAX_VALUE。 - 队列:使用
SynchronousQueue。 - 风险:
maximumPoolSize几乎是无限的,当并发量巨大时,会创建大量线程,可能耗尽系统资源,导致OutOfMemoryError。
- 特点:创建一个可缓存的线程池。
最佳实践:始终通过直接构造 ThreadPoolExecutor 的方式创建线程池,这样可以明确指定所有参数,让线程池的行为完全可控。
五、线程池的生命周期与关闭
线程池有5个状态:
- RUNNING:能接受新任务,并处理队列中的任务。
- SHUTDOWN:不接受新任务,但会处理完队列中的已有任务。调用
shutdown()方法会进入此状态。 - STOP:不接受新任务,不处理队列中的任务,并中断正在执行的任务。调用
shutdownNow()方法会进入此状态。 - TIDYING:所有任务都已终止,工作线程数量为0,即将调用
terminated()方法。 - TERMINATED:
terminated()方法执行完毕。
如何关闭线程池?
shutdown(): 平滑关闭。不再接受新任务,但会等待队列中的任务全部执行完毕。这是最常用的关闭方式。shutdownNow(): 强制关闭。立即停止所有正在执行的任务,清空队列,并返回未执行的任务列表。这种方式比较暴力,可能导致数据不一致。awaitTermination(long timeout, TimeUnit unit): 阻塞当前线程,直到线程池状态变为TERMINATED,或者等待超时。通常与shutdown()配合使用,实现优雅停机。
六、核心实践建议
合理配置线程池大小:
- CPU密集型任务:线程数 ≈ CPU核心数 + 1。这能充分利用CPU,同时减少因线程过多导致的上下文切换开销。
- I/O密集型任务:线程数可以设置得更大,例如
CPU核心数 * 2或CPU核心数 / (1 - 阻塞系数)。因为线程在等待I/O时,CPU是空闲的,可以切换给其他线程执行任务。 - 最佳方式:通过性能压测来确定最优线程数。
使用自定义线程工厂:
- 强烈建议使用自定义的
ThreadFactory为线程命名,格式如业务名-pool-线程编号。这样在排查问题(如分析线程堆栈、日志)时,可以快速定位是哪个线程池出了问题。
- 强烈建议使用自定义的
处理任务中的异常:
- 通过
execute()提交的任务,如果内部抛出未捕获的异常,会导致该线程终止并被线程池移除,然后线程池会创建一个新线程来替代它。 - 通过
submit()提交的任务,异常会被捕获并封装在Future对象中,只有在调用Future.get()时才会抛出。 - 最佳实践:在
Runnable或Callable的run/call方法内部使用try-catch块捕获所有异常,确保线程不会意外终止。
- 通过
七、简单示例代码
import java.util.concurrent.*;
public class ThreadPoolExample {
public static void main(String[] args) {
// 自定义线程工厂,方便调试
ThreadFactory namedThreadFactory = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("my-pool-thread-" + count++);
return t;
}
};
// 创建一个核心线程数为2,最大线程数为5,队列容量为10的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new ArrayBlockingQueue<>(10), // workQueue
namedThreadFactory, // threadFactory
new ThreadPoolExecutor.CallerRunsPolicy() // handler
);
// 提交20个任务
for (int i = 0; i < 20; i++) {
final int taskIndex = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is running task " + taskIndex);
try {
// 模拟任务执行
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
System.out.println("All tasks submitted.");
// 优雅地关闭线程池
executor.shutdown();
try {
// 等待线程池完全终止,最多等待1分钟
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate in 60 seconds");
executor.shutdownNow(); // 强制关闭
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("Thread pool shut down.");
}
}