Netty工具类HashedWheelTimer源码走读(三)

Stella981
• 阅读 580

接上一篇( http://my.oschina.net/haogrgr/blog/490266 )

8. Worker代码走读. 

//主要负责累加tick, 执行到期任务等.
private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    @Override
    public void run() {
        //初始化startTime, startTime只是一个起始时间的标记, 任务的deadline是相对这个时间点来的.
        startTime = System.nanoTime();

        //因为nanoTime返回值可能为0, 甚至负数, 所以这时赋值为1, Timer中start方法会判断该值, 直到不为0才跳出循环.
        if (startTime == 0) { 
            startTime = 1;
        }

        //唤醒阻塞在Timer.start()方法上的线程, 表示已经启动完成.
        startTimeInitialized.countDown();

        //只要还是启动状态, 就一直循环
        do {
            //waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick
            //返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间
            final long deadline = waitForNextTick();
            if (deadline > 0) {//可能溢出, 所以小于等于0不管
                
                //获取index, 原理见Timer的构造方法注释, 等价于 tick % wheel.length
                int idx = (int) (tick & mask);  
                
                //移除cancel了的task, 具体可以见HashedWheelTimeout.cancel()方法注释
                processCancelledTasks();

                //当前tick对应的wheel
                HashedWheelBucket bucket = wheel[idx];

                //因为添加任务是先加入到timeouts队列中, 而这里就是将任务从队列中取出, 放到对应的bucket中
                transferTimeoutsToBuckets();

                //见上篇HashedWheelBucket.expireTimeouts()方法的注释
                //具体是根据当前的deadline, 判断bucket中的人物是否到期, 到期的任务就执行, 没到期的, 就将人物轮数减一.
                //正常情况下, 一个bucket在一轮中, 只会执行一次expireTimeouts方法.
                bucket.expireTimeouts(deadline);

                //累加tick
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        //返回调用stop()时, 还未处理的任务.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }

        //加上还没来得及放入bucket中的任务
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }

        //最好移除下cancel了的task
        processCancelledTasks();
    }

    //将Timer.newTimeout()调用放入到timeouts时的任务放入到对应的bucket中
    private void transferTimeoutsToBuckets() {
        //一次tick, 最多放入10w任务, 防止太多了, 造成worker线程在这里停留太久.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                //全部处理完了, 退出循环
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                //还没加入到bucket中, 就取消了, 继续.
                continue;
            }

            //calculated 表示任务要经过多少个tick
            long calculated = timeout.deadline / tickDuration;

            //设置任务要经过的轮数
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            //如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 于是方法调用完后就会执行.
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);//同样, 类似于ticks % wheel.length
            
            //这时任务所在的bucket在wheel中的位置就表示, 经过n轮后, 还需要多少次tick才执行.
            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);//将timeout加入到链表
        }
    }

    //将cancel任务从队列中取出, 并执行cancel操作, 具体可以见HashedWheelTimeout.cancel()方法注释.
    private void processCancelledTasks() {
        for (;;) {
            Runnable task = cancelledTimeouts.poll();
            if (task == null) {
                // all processed
                break;
            }
            try {
                task.run();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
     * calculate goal nanoTime from startTime and current tick number,
     * then wait until that goal has been reached.
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    //sleep, 直到下次tick到来, 然后返回该次tick和启动时间之间的时长
    private long waitForNextTick() {

        //下次tick的时间点, 用于计算需要sleep的时间
        long deadline = tickDuration * (tick + 1);

        //循环, 直到HashedWheelTimer被stop, 或者到了下个tick
        for (;;) {

            //计算需要sleep的时间, 之所以加9999999后再除10000000, 是因为保证为10毫秒的倍数.
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {//小于等于0, 表示本次tick已经到了, 返回.
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE; //不懂不懂, 我不懂...估计又是nanoTime的问题.
                } else {
                    return currentTime; //返回过去的时间.
                }
            }

            // Check if we run on windows, as if thats the case we will need
            // to round the sleepTime as workaround for a bug that only affect
            // the JVM if it runs on windows.
            //
            // See https://github.com/netty/netty/issues/356
            if (PlatformDependent.isWindows()) {//不多说, 一个字, 屌
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);//睡吧
            } catch (InterruptedException ignored) {
                //当调用Timer.stop时, 退出
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    public Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}

    终于搞完了, 具体看注释吧, 很详细的注释.

9. 备注. 

   在看代码的时候, 大部分都好, 但是有些代码看的很困惑, 比如说

startTime = System.nanoTime();
if (startTime == 0) {
    // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
    startTime = 1;
}

//startTime 是volatile的, 然后没有其他地方修改startTime, 为什么这里还要判断下是否为0...

    然后我就去群里问了问, 最后定位到是nanoTime的问题, API文档说, 它连负数都可能返回~~~~

    大神(北京-菜鸟多年)贴的nanoTime实现.

jlong os::javaTimeNanos() {
  if (Linux::supports_monotonic_clock()) {
    struct timespec tp;
    int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp);
    assert(status == 0, "gettime error");
    jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec);
    return result;
  } else {
    timeval time;
    int status = gettimeofday(&time, NULL);
    assert(status != -1, "linux error");
    jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec);
    return 1000 * usecs;
  }
}

    北京-菜鸟多年 : 看了下 代码  原来这个 clock_gettime函数 可能会发生时间回绕 

    北京-菜鸟多年 : 然后  获得的纳秒 就变成0了

    北京-菜鸟多年 : 不过, 需要很长时间

北京-菜鸟多年 : 时间是递增的  递增到一定地步就溢出了  然后就从0开始

北京-菜鸟多年 : timespec 这里面  秒和纳秒分开存储的

北京-菜鸟多年 : 就是为了延长回绕出现的几率

北京-菜鸟多年 : else那个分支  就是  currentTimeMillis() 的实现

北京-菜鸟多年 : 不过 是  纳秒级别

北京-菜鸟多年 : 和currentTimeMillis算法一样的, 性能要慢些

10. 总结.

    任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准.

    Wheel Timer, 确实是很精巧的算法, Netty实现的HashedWheelTimer也是经过大神们极致的优化而来的.

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Netty工具类HashedWheelTimer源码走读(二)
接上一篇( http://my.oschina.net/haogrgr/blog/489320(http://my.oschina.net/haogrgr/blog/489320) )6\.HashedWheelTimeout源码走读.//任务的包装类, 链表结构, 负责保存deadline, 轮数, 等//继承M
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
18小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(