TimeWheel

Riicarus大约 4 分钟组件组件时间轮

TimeWheel

基本思路

时间轮首先是一个“轮”, 即环形数组. 就像手表一样, 用 buckets[] 代表刻度, 以 tick 为单位递进增加时间, 触发对应 bucket 上的任务, 以此无限循环.

每圈时间轮会经过 interval = tickMs * bucketSize 的时间间隔, 对于超过时间间隔的任务, 基本有两种实现方式----多层时间轮和单层复用时间轮.

多层时间轮通过增加上级时间轮的方式来应对 delay 超过 interval 的情况. 下层时间轮每经过一圈, 就将上层时间轮推进一个 tick. 如果上层时间轮的任务被触发, 就放入到下层时间轮中, 直到在最底层时间轮被触发.

单层复用时间轮通过为 bucket 中的任务增加 round 字段来存放 delay 超过一个 interval 时间的任务, 时间轮每走过一个 tick, 就将对应 bucket 上的任务的 round 值减一, 当 round == 0 时, 触发对应任务.

相关组件

TimeWheel 在各种 Java 中间件中使用很多, 最具代表性的应该是 Netty 和 Kafka 的时间轮, 分别代表了两个不同的设计思路.

Netty 的时间轮是单层的, 但是增加了通过 hash 计算任务在时间轮中对应 bucketindex 的算法, 优化了添加任务的性能(类似于 HashMap 中使用 mask 的方式).

// buckets.length is any power of 2
int mask = buckets.length;
int index = mask & ((delay % interval) / tickMs);

Kafka 的时间轮是多层的. Kafka 的优化点在于使用 DelayQueue 的方式推进时间轮. 如果用每个 tick 进行 sleep() 的方式进行推进, 可能推进很多个没有注册任何任务的空 bucket, 造成性能浪费. DelayQueue 存储了每个时间轮的 bucket, 这样通过 DelayQueue 进行 take() 的方式, 只会推进到下一个触发任务对应的 bucket, 略过其他的没有注册任务的 bucket.

设计思路

基于刚才的分析, 对于总体时间跨度不大的任务注册调度需求来说, 不需要使用多层时间轮的方式进行设计. 但是我们依然想要基于 DelayQueue 进行 tick 推进的优化, 做出如下设计:

TimeWheeldelayQueue 存储注册了任务的 bucket, 每次阻塞取出下一次会触发的任务所在的 bucket, 取出该 bucket 中所有相同 round 的任务(注意取出后重新放回 delayQueue).

在将 task 放入 bucket 的过程中, 由于一个 bucket 中存放了不同 roundtask, 因此新放入的 task 可能会成为该 round 中最先被触发的. 但是 DelayQueue 基于 PriorityQueue 进行设计, 无法动态更新已经添加到其中的元素, 因此, 需要先取出被修改(下一次被触发时间被更新到更早)的 bucket, 然后重新放进去.

不同于其他的时间轮, TimeWheel 只维护一个启动时间戳, 后续一切的时间计算都基于启动时间戳进行. 这样, 在推进时间轮的时候, 不需要每次去更新推进时间范围内(一次推进是一个 tick 范围, 可能推进多个 round, 影响多个 bucket)的所有 bucket.

public class TimeWheel implements Runnable {
    private final int tickMs;
    private final TimeWheelBucket[] buckets;
    private final int interval;
    private final DelayQueue<TimeWheelBucket> delayQueue;
    private final long startTime;

    private boolean running;

    public void add(TimerTaskEntry task) {
        if (task.getEvent().getScheduleTime() <= System.currentTimeMillis() + tickMs) {
            // dispatch, do not add
            return;
        }

        long delayFromStart = task.getEvent().getScheduleTime() - System.currentTimeMillis();
        int idxFromStart = (int) ((delay % interval) / tickMs);
        int roundFromStart = (int) (delay / interval);
        task.setTimestamp(startTime + (long) (interval * roundFromStart) + (long) (tickMs * idxFromStart));
        task.setRound(roundFromStart);

        // put to bucket
        TimeWheelBucket bucket = buckets[idxFromStart];
        if (bucket.add(task)) {
            if (delayQueue.contains(bucket) && !delayQueue.remove(bucket)) {
                // throw exception
            }

            delayQueue.add(bucket);
        }
    }

    public void run() {
        running = true;
        TimeWheelBucket bucket;
        while (running) {
            try {
                bucket = delayQueue.take();
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
                running = false;
            }

            RoundPollInfo info = bucket.pollLatestRound();
            dispatchTasks(info.getTaskEntries());

            // reput to delayQueue
            delayQueue.add(bucket);
        }
    }
}
public class TimeWheelBucket implements Delayed {
    private final int idx;
    private final PriorityQueue<TimerTaskEntry> taskEntries;
    private volatile AtomicLong latestTimestamp = new AtomicLong(Long.MAX_VALUE);
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public long getDelay(TimeUnit unit) {
        lock.readLock().lock();
        try {
            return unit.convert(latestTimestamp - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        } finally {
            lock.readLock().unlock();
        }
    }

    public int compareTo(Object o) {
        return (int) (getDelay() - ((TimeWheelBucket) o).getDelay());
    }

    public boolean add(TimerTaskEntry entry) {
        lock.writeLock().lock();

        try {
            taskEntries.add(entry);

            // if added task becomes the latest triggered task, meaning the bucket should be reput to delayQueue
            if (Objects.equals(entry, taskEntries.peek())) {
                // update latestTimestamp cache
                latestTimestamp.set(entry.getTimestamp());
                return true;
            }

            return false;
        } finally {
            lock.writeLock().unlock();
        }
    }

    public RoundPollInfo pollLatestRound() {
        lock.writeLock().lock();

        try {
            // if no registered task, set timestamp to Long.MAX_VALUE to avoid trigger in delayQueue
            if (taskEntries.isEmpty()) {
                latestTimestamp.set(Long.MAX_VALUE);
                return null;
            }

            TimerTaskEntry latestTask = taskEntries.poll();
            int round = latestTask.getRound();
            List<TimerTaskEntry> tasks = new ArrayList<>();
            tasks.add(latestTask);

            while(!taskEntries.isEmpty() && round == taskEntries.peek().getRound()) {
                tasks.add(taskEntries.poll());
            }

            if (taskEntries.isEmpty()) {
                latestTimestamp.set(Long.MAX_VALUE);
            }
        } finally {
            lock.writeLock().unlock();
        }

        return RoundPollInfo.of(tasks, round, timestamp);
    }
}
public class TimerTaskEntry {
    private final int round;
    private final long timestamp;
    private final ScheduleEvent event;
}
public class RoundPollInfo {
    private final List<TimerTaskEntry>;
    private final int round;
    private final long timestamp;
}