接上一篇( http://my.oschina.net/haogrgr/blog/490266 )
8. Worker代码走读.
//主要负责累加tick, 执行到期任务等.
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
//初始化startTime, startTime只是一个起始时间的标记, 任务的deadline是相对这个时间点来的.
startTime = System.nanoTime();
//因为nanoTime返回值可能为0, 甚至负数, 所以这时赋值为1, Timer中start方法会判断该值, 直到不为0才跳出循环.
if (startTime == 0) {
startTime = 1;
}
//唤醒阻塞在Timer.start()方法上的线程, 表示已经启动完成.
startTimeInitialized.countDown();
//只要还是启动状态, 就一直循环
do {
//waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick
//返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间
final long deadline = waitForNextTick();
if (deadline > 0) {//可能溢出, 所以小于等于0不管
//获取index, 原理见Timer的构造方法注释, 等价于 tick % wheel.length
int idx = (int) (tick & mask);
//移除cancel了的task, 具体可以见HashedWheelTimeout.cancel()方法注释
processCancelledTasks();
//当前tick对应的wheel
HashedWheelBucket bucket = wheel[idx];
//因为添加任务是先加入到timeouts队列中, 而这里就是将任务从队列中取出, 放到对应的bucket中
transferTimeoutsToBuckets();
//见上篇HashedWheelBucket.expireTimeouts()方法的注释
//具体是根据当前的deadline, 判断bucket中的人物是否到期, 到期的任务就执行, 没到期的, 就将人物轮数减一.
//正常情况下, 一个bucket在一轮中, 只会执行一次expireTimeouts方法.
bucket.expireTimeouts(deadline);
//累加tick
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//返回调用stop()时, 还未处理的任务.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
//加上还没来得及放入bucket中的任务
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//最好移除下cancel了的task
processCancelledTasks();
}
//将Timer.newTimeout()调用放入到timeouts时的任务放入到对应的bucket中
private void transferTimeoutsToBuckets() {
//一次tick, 最多放入10w任务, 防止太多了, 造成worker线程在这里停留太久.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
//全部处理完了, 退出循环
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
//还没加入到bucket中, 就取消了, 继续.
continue;
}
//calculated 表示任务要经过多少个tick
long calculated = timeout.deadline / tickDuration;
//设置任务要经过的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;
//如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 于是方法调用完后就会执行.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);//同样, 类似于ticks % wheel.length
//这时任务所在的bucket在wheel中的位置就表示, 经过n轮后, 还需要多少次tick才执行.
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);//将timeout加入到链表
}
}
//将cancel任务从队列中取出, 并执行cancel操作, 具体可以见HashedWheelTimeout.cancel()方法注释.
private void processCancelledTasks() {
for (;;) {
Runnable task = cancelledTimeouts.poll();
if (task == null) {
// all processed
break;
}
try {
task.run();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
//sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长
private long waitForNextTick() {
//下次tick的时间点, 用于计算需要sleep的时间
long deadline = tickDuration * (tick + 1);
//循环, 直到HashedWheelTimer被stop, 或者到了下个tick
for (;;) {
//计算需要sleep的时间, 之所以加9999999后再除10000000, 是因为保证为10毫秒的倍数.
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {//小于等于0, 表示本次tick已经到了, 返回.
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE; //不懂不懂, 我不懂...估计又是nanoTime的问题.
} else {
return currentTime; //返回过去的时间.
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {//不多说, 一个字, 屌
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);//睡吧
} catch (InterruptedException ignored) {
//当调用Timer.stop时, 退出
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
public Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
终于搞完了, 具体看注释吧, 很详细的注释.
9. 备注.
在看代码的时候, 大部分都好, 但是有些代码看的很困惑, 比如说
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
//startTime 是volatile的, 然后没有其他地方修改startTime, 为什么这里还要判断下是否为0...
然后我就去群里问了问, 最后定位到是nanoTime的问题, API文档说, 它连负数都可能返回~~~~
大神(北京-菜鸟多年)贴的nanoTime实现.
jlong os::javaTimeNanos() {
if (Linux::supports_monotonic_clock()) {
struct timespec tp;
int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp);
assert(status == 0, "gettime error");
jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec);
return result;
} else {
timeval time;
int status = gettimeofday(&time, NULL);
assert(status != -1, "linux error");
jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec);
return 1000 * usecs;
}
}
北京-菜鸟多年 : 看了下 代码 原来这个 clock_gettime函数 可能会发生时间回绕
北京-菜鸟多年 : 然后 获得的纳秒 就变成0了
北京-菜鸟多年 : 不过, 需要很长时间
北京-菜鸟多年 : 时间是递增的 递增到一定地步就溢出了 然后就从0开始
北京-菜鸟多年 : timespec 这里面 秒和纳秒分开存储的
北京-菜鸟多年 : 就是为了延长回绕出现的几率
北京-菜鸟多年 : else那个分支 就是 currentTimeMillis() 的实现
北京-菜鸟多年 : 不过 是 纳秒级别
北京-菜鸟多年 : 和currentTimeMillis算法一样的, 性能要慢些
10. 总结.
任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准.
Wheel Timer, 确实是很精巧的算法, Netty实现的HashedWheelTimer也是经过大神们极致的优化而来的.