线程池

Riicarus大约 9 分钟JavaJUCJavaJUC线程池

线程池

概述

什么是线程池

线程池是一种基于池化思想管理线程的工具, 通常出现在多线程服务器中.

线程池维护多个线程, 等待管理者分配可以并发执行的任务. 这样一方面避免了处理任务时创建和销毁线程带来的开销, 另一方面避免了线程数量膨胀带来的过度调度问题, 保证了对内核的充分利用.

使用线程池的好处如下:

  1. 降低资源消耗;
  2. 提高相应速度;
  3. 提高线程的可管理性;
  4. 提供更多强大功能;

线程池的核心设计

总体设计

Java 线程池的核心实现类是 ThreadPoolExecutor, 下面是其继承关系图:

ThreadPoolExecutorUML
ThreadPoolExecutorUML

顶层接口 Executor 提供的是一种思想: 将任务提交和任务执行解耦. 用户无需关心线程的创建和调度, 只需要提供 Runnable 对象, 将任务的运行逻辑提交到执行器(Executor)中, 由执行器来完成线程的调度和任务执行.

ExecutorService 接口增加了一些功能:

  1. 扩充执行任务的能力, 补充可以为一个或者一批异步任务生成 Future 的方法;
  2. 提供了管控线程池的方法, 比如停止线程池的运行;

AbstractExecutorService 则是上层的抽象类, 将任务执行的流程串联了起来, 保证下层只需要关注一个执行任务的方法即可.

ThreadPoolExecutor 实现最复杂的运行部分, 一方面维护自身的生命周期, 一方面同时管理线程和任务.

如下是 ThreadPoolExecutor 的运行机制:

ThreadPoolExecutorOperatingMechanism
ThreadPoolExecutorOperatingMechanism

线程池在内部构建了一个生产者-消费者模型, 用于实现线程和任务的解耦, 可以更好的缓冲任务, 复用线程. 线程池的运行主要分为两部分: 任务管理和线程管理.
任务管理部分充当生产者的角色, 任务被提交后, 线程池会判断该任务后续的流转:

  1. 直接申请线程执行任务;
  2. 缓冲到队列中等待线程执行;
  3. 使用拒绝策略;

线程管理部分是消费者, 根据任务请求进行线程分配, 线程执行完任务之后会继续获取新的任务进行执行, 当线程拿不到任务的时候, 会被回收.

生命周期管理

线程池运行的状态不是用户显式设置的, 而是伴随着线程池的执行, 由内部进行维护. 线程池内部使用一个变量维护两个值: 运行状态(runState) 和 线程数量(workerCount):

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 中, 高 3 位保存 runState, 低 29 位保存 workerCount.

ThreadPoolExecutor 运行状态有 5 种:

  1. RUNNING: 能接受新提交的任务, 并且也能处理阻塞队列中的任务;
  2. SHUTDOWN: 关闭状态, 不再接收新提交的任务, 但可以继续处理阻塞队列中已保存的任务;
  3. STOP: 不能接收新任务, 也不会处理队列中的任务, 会中断正在处理任务的线程;
  4. TIDYING: 所有的任务都已经终止, workerCount 为 0;
  5. TERMINATED: terminated() 方法执行完后, 进入此状态;

生命周期转化如下图:

ThreadPoolExecutorLifeCircle
ThreadPoolExecutorLifeCircle

任务执行机制

任务调度

任务调度是线程池的主要入口, 当用户提交了一个任务, 接下来任务如何执行都由任务调度决定. 这一部分是线程池的核心机制.

首先, 所有任务的调度都是由 execute() 方法完成的, 这部分完成的工作是: 检查现在线程池的运行状态/运行线程数/运行策略, 决定接下来的执行流程, 是直接申请线程执行, 还是放入缓冲队列, 抑或是直接拒绝.

执行过程如下:

  1. 首先检查线程池运行状态, 如果不是 RUNNING, 则直接拒绝, 线程池要保证在 RUNNING 状态下完成任务;
  2. 如果 workerCount < corePoolSize, 则创建并启动一个线程来执行新提交的任务;
  3. 如果 workerCount >= corePoolSize, 且线程池的阻塞队列未满, 将任务添加到缓冲队列中;
  4. 如果 workerCount >= corePoolSize 并且 workerCount < maximumPoolSize, 且线程池内的缓冲队列已满, 则创建并启动一个线程来执行新提交的任务;
  5. 如果 workerCount >= maximumPoolSize, 并且线程池内的阻塞队列已满, 则根据拒绝策略来处理, 默认为拒绝并抛出异常;

任务缓冲

任务缓冲是线程池能够管理任务的核心部分.

使用不同的队列可以实现不同的任务存取策略:

  1. ArrayBlockingQueue: 一个数组实现的有界队列, 按照 FIFO 规则对元素排序. 支持公平锁和非公平锁;
  2. LinkedBlockingQueue: 一个链表实现的有界队列, 按照 FIFO 规则对元素排序. 默认长度为 Integer.MAX_VALUE, 由 OOM 风险;
  3. PriorityBlockingQueue: 一个支持线程优先级排序的无界队列, 默认自然序进行排序, 也可以自定义实现 compareTo() 排序方法, 不能保证同优先级元素的顺序;
  4. DelayQueue: 一个实现 PriorityBlockingQueue 的可延迟获取的无界队列, 创建元素时, 可以指定多久才可以从队列中获取当前元素. 只有延时期满后才可以从队列中获取元素;
  5. SynchronousQueue: 一个不存储元素的队列, 每个 put 操作都需要等待 take, 否则不能添加元素. 支持公平锁和非公平锁;
  6. LinkedTransferQueue: 一个由链表组成的无界阻塞队列, 只是多了 transfer()tryTransfer() 方法;
  7. LinkedBlockingDeque: 一个链表组成的双向阻塞队列, 头尾都可以添加/移除元素, 降低并发的锁竞争.

任务获取

线程需要从任务缓冲模块获取任务来执行, 这部分策略由 getTask() 实现, 流程如下:

ThreadPoolGetTask
ThreadPoolGetTask

getTask() 需要平衡当前线程池中的线程数, 如果认为线程池中线程数量过多, 就返回 null. 工作线程 Worker 会不断获取新任务执行, 如果没有可执行任务 就会被回收.

任务拒绝

任务拒绝是线程池保护模块的重要部分, 当任务的缓冲队列已满, 并且 workerCount >= maximumPoolSize 时, 就会开始拒绝所有任务, 采取任务拒绝策略;

拒绝策略是一个接口, 设计如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

JDK 提供了四种拒绝策略:

  1. AbortPolicy: 默认拒绝策略, 丢弃任务并且抛出异常;
  2. DiscordPolicy: 丢弃任务, 但是不抛出异常;
  3. DiscordOldestPolicy: 丢弃队列最前面的任务, 然后重新提交被拒绝的任务;
  4. CallerRunsPolicy: 由调用线程(提交任务的线程)处理该任务;

工作线程管理

工作线程

线程池为了掌握线程的状态并且维护线程的生命周期, 设计了线程池的工作线程 worker. 定义如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    //Worker持有的线程
    final Thread thread;
    //初始化的任务,可以为null
    Runnable firstTask;
}

thread 是调用构造方法时, 通过 ThreadFactory 创建的线程, 可以用来执行任务.
firstTask 是创建时传入的第一个任务, 可以为 null. 如果值是非空的, 工作线程就会在启动初期立即执行这个任务, 也就是对应核心线程创建时的情况; 如果为空值, 就会创建一个线程去执行任务列表中的任务, 也就是非核心线程的创建.

ThreadPoolWorkerGetTaskProcess
ThreadPoolWorkerGetTaskProcess

线程池需要管理线程的生命周期, 需要在线程长时间不运行时进行回收. 线程池使用一张 hashMap 去维护线程的引用, 这样可以通过添加引用/移除引用这样的操作来控制线程的生命周期. 最重要的事就是如何判断这个线程是否运行.

Worker 通过继承 AbstractQueueSynchronizer 来实现独占锁这个功能, 而不是使用可重入锁, 目的是使用不可重入的特性来反映当前线程的执行状态.

  1. lock 方法一旦获取了独占锁, 就表示当前线程正在执行任务;
  2. 如果正在执行任务, 则不应该中断线程;
  3. 如果不是独占锁的状态, 也就是处于空闲状态, 这时候可以对线程进行中断;
  4. 线程池在执行 shutdown()tryTerminate() 方法时会调用 interruptIdleWorkers() 方法来中断空闲的线程, interruptIdleWorkers() 会调用 tryLock() 方法来判断线程池中的线程是否处于空闲状态; 如果是空闲状态就可以安全回收;

工作线程增加

通过 addWorker() 方法来增加工作线程, 不考虑线程池在哪个阶段增加的线程, 仅仅完成增加线程并使它执行, 最后返回是否成功的结果.

ThreadPoolWorkerThreadIncrement
ThreadPoolWorkerThreadIncrement

工作线程回收

线程池中线程的销毁依赖 JVM 的垃圾回收, 线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收. 线程池只需要消除对线程的引用即可使其被回收.

Worker 被创建出来后, 就会不断地进行轮询并且获取任务去执行, 核心线程可以无限等待获取任务, 非核心线程要限时获取任务. 当 Worker 无法获取到任务时, 就会结束循环, 并且主动将自己在线程池中的引用消除.

try {
    while (task != null || (task = getTask()) != null) {
        //执行任务
    }
} finally {
    //获取不到任务时,主动回收自己
    processWorkerExit(w, completedAbruptly);
}

线程回收的工作在 processWorkerExit() 方法中完成, 线程池会根据销毁的情景来动态改变自己的状态, 或者平衡线程.

工作线程执行任务

Worker 类中的 run() 方法调用了 runWorker() 方法来执行任务, runWorker() 方法的执行过程如下:

ThreadPoolWorkerExecuteTask
ThreadPoolWorkerExecuteTask

参考资料:

  1. Java 线程池实现原理及其在美团业务中的实践open in new window