TimeWheel
TimeWheel
基本思路
时间轮首先是一个“轮”, 即环形数组. 就像手表一样, 用 buckets[]
代表刻度, 以 tick
为单位递进增加时间, 触发对应 bucket
上的任务, 以此无限循环.
每圈时间轮会经过 interval = tickMs * bucketSize
的时间间隔, 对于超过时间间隔的任务, 基本有两种实现方式----多层时间轮和单层复用时间轮.
多层时间轮通过增加上级时间轮的方式来应对 delay
超过 interval
的情况. 下层时间轮每经过一圈, 就将上层时间轮推进一个 tick
. 如果上层时间轮的任务被触发, 就放入到下层时间轮中, 直到在最底层时间轮被触发.
单层复用时间轮通过为 bucket
中的任务增加 round
字段来存放 delay
超过一个 interval
时间的任务, 时间轮每走过一个 tick
, 就将对应 bucket
上的任务的 round
值减一, 当 round == 0
时, 触发对应任务.
相关组件
TimeWheel 在各种 Java 中间件中使用很多, 最具代表性的应该是 Netty 和 Kafka 的时间轮, 分别代表了两个不同的设计思路.
Netty 的时间轮是单层的, 但是增加了通过 hash
计算任务在时间轮中对应 bucket
的 index
的算法, 优化了添加任务的性能(类似于 HashMap
中使用 mask
的方式).
// buckets.length is any power of 2
int mask = buckets.length;
int index = mask & ((delay % interval) / tickMs);
Kafka 的时间轮是多层的. Kafka 的优化点在于使用 DelayQueue
的方式推进时间轮. 如果用每个 tick
进行 sleep()
的方式进行推进, 可能推进很多个没有注册任何任务的空 bucket
, 造成性能浪费. DelayQueue
存储了每个时间轮的 bucket
, 这样通过 DelayQueue
进行 take()
的方式, 只会推进到下一个触发任务对应的 bucket
, 略过其他的没有注册任务的 bucket
.
设计思路
基于刚才的分析, 对于总体时间跨度不大的任务注册调度需求来说, 不需要使用多层时间轮的方式进行设计. 但是我们依然想要基于 DelayQueue
进行 tick
推进的优化, 做出如下设计:
TimeWheel
的 delayQueue
存储注册了任务的 bucket
, 每次阻塞取出下一次会触发的任务所在的 bucket
, 取出该 bucket
中所有相同 round
的任务(注意取出后重新放回 delayQueue
).
在将 task
放入 bucket
的过程中, 由于一个 bucket
中存放了不同 round
的 task
, 因此新放入的 task
可能会成为该 round
中最先被触发的. 但是 DelayQueue
基于 PriorityQueue
进行设计, 无法动态更新已经添加到其中的元素, 因此, 需要先取出被修改(下一次被触发时间被更新到更早)的 bucket
, 然后重新放进去.
不同于其他的时间轮, TimeWheel
只维护一个启动时间戳, 后续一切的时间计算都基于启动时间戳进行. 这样, 在推进时间轮的时候, 不需要每次去更新推进时间范围内(一次推进是一个 tick
范围, 可能推进多个 round
, 影响多个 bucket
)的所有 bucket
.
public class TimeWheel implements Runnable {
private final int tickMs;
private final TimeWheelBucket[] buckets;
private final int interval;
private final DelayQueue<TimeWheelBucket> delayQueue;
private final long startTime;
private boolean running;
public void add(TimerTaskEntry task) {
if (task.getEvent().getScheduleTime() <= System.currentTimeMillis() + tickMs) {
// dispatch, do not add
return;
}
long delayFromStart = task.getEvent().getScheduleTime() - System.currentTimeMillis();
int idxFromStart = (int) ((delay % interval) / tickMs);
int roundFromStart = (int) (delay / interval);
task.setTimestamp(startTime + (long) (interval * roundFromStart) + (long) (tickMs * idxFromStart));
task.setRound(roundFromStart);
// put to bucket
TimeWheelBucket bucket = buckets[idxFromStart];
if (bucket.add(task)) {
if (delayQueue.contains(bucket) && !delayQueue.remove(bucket)) {
// throw exception
}
delayQueue.add(bucket);
}
}
public void run() {
running = true;
TimeWheelBucket bucket;
while (running) {
try {
bucket = delayQueue.take();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
running = false;
}
RoundPollInfo info = bucket.pollLatestRound();
dispatchTasks(info.getTaskEntries());
// reput to delayQueue
delayQueue.add(bucket);
}
}
}
public class TimeWheelBucket implements Delayed {
private final int idx;
private final PriorityQueue<TimerTaskEntry> taskEntries;
private volatile AtomicLong latestTimestamp = new AtomicLong(Long.MAX_VALUE);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public long getDelay(TimeUnit unit) {
lock.readLock().lock();
try {
return unit.convert(latestTimestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} finally {
lock.readLock().unlock();
}
}
public int compareTo(Object o) {
return (int) (getDelay() - ((TimeWheelBucket) o).getDelay());
}
public boolean add(TimerTaskEntry entry) {
lock.writeLock().lock();
try {
taskEntries.add(entry);
// if added task becomes the latest triggered task, meaning the bucket should be reput to delayQueue
if (Objects.equals(entry, taskEntries.peek())) {
// update latestTimestamp cache
latestTimestamp.set(entry.getTimestamp());
return true;
}
return false;
} finally {
lock.writeLock().unlock();
}
}
public RoundPollInfo pollLatestRound() {
lock.writeLock().lock();
try {
// if no registered task, set timestamp to Long.MAX_VALUE to avoid trigger in delayQueue
if (taskEntries.isEmpty()) {
latestTimestamp.set(Long.MAX_VALUE);
return null;
}
TimerTaskEntry latestTask = taskEntries.poll();
int round = latestTask.getRound();
List<TimerTaskEntry> tasks = new ArrayList<>();
tasks.add(latestTask);
while(!taskEntries.isEmpty() && round == taskEntries.peek().getRound()) {
tasks.add(taskEntries.poll());
}
if (taskEntries.isEmpty()) {
latestTimestamp.set(Long.MAX_VALUE);
}
} finally {
lock.writeLock().unlock();
}
return RoundPollInfo.of(tasks, round, timestamp);
}
}
public class TimerTaskEntry {
private final int round;
private final long timestamp;
private final ScheduleEvent event;
}
public class RoundPollInfo {
private final List<TimerTaskEntry>;
private final int round;
private final long timestamp;
}