线程池
线程池
概述
什么是线程池
线程池是一种基于池化思想管理线程的工具, 通常出现在多线程服务器中.
线程池维护多个线程, 等待管理者分配可以并发执行的任务. 这样一方面避免了处理任务时创建和销毁线程带来的开销, 另一方面避免了线程数量膨胀带来的过度调度问题, 保证了对内核的充分利用.
使用线程池的好处如下:
- 降低资源消耗;
- 提高相应速度;
- 提高线程的可管理性;
- 提供更多强大功能;
线程池的核心设计
总体设计
Java 线程池的核心实现类是 ThreadPoolExecutor
, 下面是其继承关系图:
顶层接口 Executor
提供的是一种思想: 将任务提交和任务执行解耦. 用户无需关心线程的创建和调度, 只需要提供 Runnable
对象, 将任务的运行逻辑提交到执行器(Executor
)中, 由执行器来完成线程的调度和任务执行.
ExecutorService
接口增加了一些功能:
- 扩充执行任务的能力, 补充可以为一个或者一批异步任务生成
Future
的方法; - 提供了管控线程池的方法, 比如停止线程池的运行;
AbstractExecutorService
则是上层的抽象类, 将任务执行的流程串联了起来, 保证下层只需要关注一个执行任务的方法即可.
ThreadPoolExecutor
实现最复杂的运行部分, 一方面维护自身的生命周期, 一方面同时管理线程和任务.
如下是 ThreadPoolExecutor
的运行机制:
线程池在内部构建了一个生产者-消费者模型, 用于实现线程和任务的解耦, 可以更好的缓冲任务, 复用线程. 线程池的运行主要分为两部分: 任务管理和线程管理.
任务管理部分充当生产者的角色, 任务被提交后, 线程池会判断该任务后续的流转:
- 直接申请线程执行任务;
- 缓冲到队列中等待线程执行;
- 使用拒绝策略;
线程管理部分是消费者, 根据任务请求进行线程分配, 线程执行完任务之后会继续获取新的任务进行执行, 当线程拿不到任务的时候, 会被回收.
生命周期管理
线程池运行的状态不是用户显式设置的, 而是伴随着线程池的执行, 由内部进行维护. 线程池内部使用一个变量维护两个值: 运行状态(runState
) 和 线程数量(workerCount
):
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
在 ctl
中, 高 3 位保存 runState
, 低 29 位保存 workerCount
.
ThreadPoolExecutor
运行状态有 5 种:
RUNNING
: 能接受新提交的任务, 并且也能处理阻塞队列中的任务;SHUTDOWN
: 关闭状态, 不再接收新提交的任务, 但可以继续处理阻塞队列中已保存的任务;STOP
: 不能接收新任务, 也不会处理队列中的任务, 会中断正在处理任务的线程;TIDYING
: 所有的任务都已经终止,workerCount
为 0;TERMINATED
:terminated()
方法执行完后, 进入此状态;
生命周期转化如下图:
任务执行机制
任务调度
任务调度是线程池的主要入口, 当用户提交了一个任务, 接下来任务如何执行都由任务调度决定. 这一部分是线程池的核心机制.
首先, 所有任务的调度都是由 execute()
方法完成的, 这部分完成的工作是: 检查现在线程池的运行状态/运行线程数/运行策略, 决定接下来的执行流程, 是直接申请线程执行, 还是放入缓冲队列, 抑或是直接拒绝.
执行过程如下:
- 首先检查线程池运行状态, 如果不是
RUNNING
, 则直接拒绝, 线程池要保证在RUNNING
状态下完成任务; - 如果
workerCount
<corePoolSize
, 则创建并启动一个线程来执行新提交的任务; - 如果
workerCount
>=corePoolSize
, 且线程池的阻塞队列未满, 将任务添加到缓冲队列中; - 如果
workerCount
>=corePoolSize
并且workerCount
<maximumPoolSize
, 且线程池内的缓冲队列已满, 则创建并启动一个线程来执行新提交的任务; - 如果
workerCount
>=maximumPoolSize
, 并且线程池内的阻塞队列已满, 则根据拒绝策略来处理, 默认为拒绝并抛出异常;
任务缓冲
任务缓冲是线程池能够管理任务的核心部分.
使用不同的队列可以实现不同的任务存取策略:
ArrayBlockingQueue
: 一个数组实现的有界队列, 按照 FIFO 规则对元素排序. 支持公平锁和非公平锁;LinkedBlockingQueue
: 一个链表实现的有界队列, 按照 FIFO 规则对元素排序. 默认长度为Integer.MAX_VALUE
, 由 OOM 风险;PriorityBlockingQueue
: 一个支持线程优先级排序的无界队列, 默认自然序进行排序, 也可以自定义实现compareTo()
排序方法, 不能保证同优先级元素的顺序;DelayQueue
: 一个实现PriorityBlockingQueue
的可延迟获取的无界队列, 创建元素时, 可以指定多久才可以从队列中获取当前元素. 只有延时期满后才可以从队列中获取元素;SynchronousQueue
: 一个不存储元素的队列, 每个put
操作都需要等待take
, 否则不能添加元素. 支持公平锁和非公平锁;LinkedTransferQueue
: 一个由链表组成的无界阻塞队列, 只是多了transfer()
和tryTransfer()
方法;LinkedBlockingDeque
: 一个链表组成的双向阻塞队列, 头尾都可以添加/移除元素, 降低并发的锁竞争.
任务获取
线程需要从任务缓冲模块获取任务来执行, 这部分策略由 getTask()
实现, 流程如下:
getTask()
需要平衡当前线程池中的线程数, 如果认为线程池中线程数量过多, 就返回 null
. 工作线程 Worker
会不断获取新任务执行, 如果没有可执行任务 就会被回收.
任务拒绝
任务拒绝是线程池保护模块的重要部分, 当任务的缓冲队列已满, 并且 workerCount
>= maximumPoolSize
时, 就会开始拒绝所有任务, 采取任务拒绝策略;
拒绝策略是一个接口, 设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
JDK 提供了四种拒绝策略:
AbortPolicy
: 默认拒绝策略, 丢弃任务并且抛出异常;DiscordPolicy
: 丢弃任务, 但是不抛出异常;DiscordOldestPolicy
: 丢弃队列最前面的任务, 然后重新提交被拒绝的任务;CallerRunsPolicy
: 由调用线程(提交任务的线程)处理该任务;
工作线程管理
工作线程
线程池为了掌握线程的状态并且维护线程的生命周期, 设计了线程池的工作线程 worker
. 定义如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
//Worker持有的线程
final Thread thread;
//初始化的任务,可以为null
Runnable firstTask;
}
thread
是调用构造方法时, 通过 ThreadFactory
创建的线程, 可以用来执行任务.firstTask
是创建时传入的第一个任务, 可以为 null
. 如果值是非空的, 工作线程就会在启动初期立即执行这个任务, 也就是对应核心线程创建时的情况; 如果为空值, 就会创建一个线程去执行任务列表中的任务, 也就是非核心线程的创建.
线程池需要管理线程的生命周期, 需要在线程长时间不运行时进行回收. 线程池使用一张 hashMap
去维护线程的引用, 这样可以通过添加引用/移除引用这样的操作来控制线程的生命周期. 最重要的事就是如何判断这个线程是否运行.
Worker
通过继承 AbstractQueueSynchronizer
来实现独占锁这个功能, 而不是使用可重入锁, 目的是使用不可重入的特性来反映当前线程的执行状态.
lock
方法一旦获取了独占锁, 就表示当前线程正在执行任务;- 如果正在执行任务, 则不应该中断线程;
- 如果不是独占锁的状态, 也就是处于空闲状态, 这时候可以对线程进行中断;
- 线程池在执行
shutdown()
或tryTerminate()
方法时会调用interruptIdleWorkers()
方法来中断空闲的线程,interruptIdleWorkers()
会调用tryLock()
方法来判断线程池中的线程是否处于空闲状态; 如果是空闲状态就可以安全回收;
工作线程增加
通过 addWorker()
方法来增加工作线程, 不考虑线程池在哪个阶段增加的线程, 仅仅完成增加线程并使它执行, 最后返回是否成功的结果.
工作线程回收
线程池中线程的销毁依赖 JVM 的垃圾回收, 线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收. 线程池只需要消除对线程的引用即可使其被回收.
Worker
被创建出来后, 就会不断地进行轮询并且获取任务去执行, 核心线程可以无限等待获取任务, 非核心线程要限时获取任务. 当 Worker
无法获取到任务时, 就会结束循环, 并且主动将自己在线程池中的引用消除.
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
//获取不到任务时,主动回收自己
processWorkerExit(w, completedAbruptly);
}
线程回收的工作在 processWorkerExit()
方法中完成, 线程池会根据销毁的情景来动态改变自己的状态, 或者平衡线程.
工作线程执行任务
在 Worker
类中的 run()
方法调用了 runWorker()
方法来执行任务, runWorker()
方法的执行过程如下:
参考资料: