Netty工具类HashedWheelTimer源码走读(二)

Stella981
• 阅读 587

接上一篇( http://my.oschina.net/haogrgr/blog/489320 )

6. HashedWheelTimeout源码走读.

//任务的包装类, 链表结构, 负责保存deadline, 轮数, 等
//继承MpscLinkedQueueNode, 是因为timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理(并发优化)
private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
        implements Timeout {

    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;

    static {
        AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
        }
        STATE_UPDATER = updater;
    }

    private final HashedWheelTimer timer; //timer引用
    private final TimerTask task; //要执行的任务引用
    private final long deadline; //Timer启动时间 - 任务执行时间(任务加入时间+任务延迟时间)

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int state = ST_INIT;

    //离任务执行还要等待的轮数, 当任务加入到wheel中时计算该值, 并在Worker中, 每过一轮, 该值减一.
    long remainingRounds;

    //双链表, 因为只有Worker这一个线程访问, 所以不需要synchronization / volatile.
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    //HashedWheelTimeout 所在的 wheel
    HashedWheelBucket bucket;

    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        //这里只修改状态从ST_INIT到ST_CANCELLED
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }

        //如果状态修改成功, 则表示第一次调用cancel方法, 将HashedWheelTimeout从bucked中移除的操作封装,
        //加入到cancelled队列, 等待下一次tick再移除, 跟踪下了源码历史发现之所以这么做, 是为了对GC友好, 以前取消任务要等到下一轮才会被处理,
        //于是, 改成将cancel的任务放在timeous队列里, 然后统一处理, timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理,
        //然而, 后面又发现有锁的问题, 因为timeous这个队列可能被多个线程操作(HashedWheelTimer.newTimeout()), 开始是加锁的, 
        //于是, 将cancel任务另外存一个队列, 这样, 就不需要使用锁了, 具体见:
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        timer.cancelledTimeouts.add(new Runnable() {
            @Override
            public void run() {
                HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
                if (bucket != null) {
                    bucket.remove(HashedWheelTimeout.this);
                }
            }
        });
        return true;
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    @Override
    public HashedWheelTimeout value() {
        return this;
    }

    //到期, 执行任务
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }
}

    果然是大牛, 各种优化, 看了下源码的提交记录, 截取几段:

    1) https://github.com/netty/netty/commit/1f68479e3cd94deb3172edd3c01aa74f35032b9b   (以前wheel用的HashSet, 改成了数组)

Motivation:
At the moment there are two issues with HashedWheelTimer:
* the memory footprint of it is pretty heavy (250kb fon an empty instance)
* the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen.

Modification:
Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts.  So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance.

Result:
Lower memory-footprint and better performance

    2) https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1

Motivation:
At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC'ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before.

Modification:
Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced.

Result:
Less memory usage for cancelled Timeouts.

    3) https://github.com/netty/netty/commit/44ea769f537bf16b833d03db844b1f3067b3acd7

Motivation:
Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:
Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC'ed.

Result:
No more NPE

    回到主题, 代码并不复杂, 开始看的时候, 发现继承了MpscLinkedQueueNode, 但是又没有地方用到, 后面看了下, 发现MpscLinkedQueue对其有特殊处理.

    可以看到HashedWheelTimeout就是对Timeout任务的包装, 链表结构方便加入wheel, 记录deadline, remainingRounds, state等信息, 

7. HashedWheelBucket 源码走读. 

//用来存放HashedWheelTimeout, 结构有点像linked-list, 方便移除操作.
private static final class HashedWheelBucket {

    //链表结构
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    //添加HashedWheelTimeout, 链表操作, 不多说~~~
    public void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    //当tick到该wheel的时候, Worker会调用这个方法, 根据deadline来判断任务是否过期(remainingRounds是否为0), 
    //任务到期就执行, 没到期, 就timeout.remainingRounds--, 因为走到这里, 表示改wheel里的任务又过了一轮了.
    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        //遍历链表
        while (timeout != null) {
            boolean remove = false;
            if (timeout.remainingRounds <= 0) {//任务已到执行点
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
                remove = true;
            } else if (timeout.isCancelled()) {
                remove = true;
            } else {//没到期, 剩余轮数减一
                timeout.remainingRounds --;
            }

            //先保存next, 因为移除后, 再获取timeout.next会为空.
            HashedWheelTimeout next = timeout.next;
            if (remove) {//当以到期, 或者被取消, 就将timeou从链表中移除
                remove(timeout);
            }
            timeout = next;
        }
    }

    //链表移除, 不多说
    public void remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            tail = timeout.prev;
        }
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
    }

    //Clear this bucket and return all not expired / cancelled {@link Timeout}s.
    public void clearTimeouts(Set<Timeout> set) {
        for (;;) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    //链表的poll
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head =  null;
        } else {
            this.head = next;
            next.prev = null;
        }

        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}

    可以看到, 代码也不复杂, 主要是提供一个类似于LinkedList的容器, 用来存放HashedWheelTimeout, 并提供expireTimeouts(long deadline) 方法来处理该wheel中的任务. 具体处理看注释.

    字数限制... 接第二篇..., 还剩最后的Worker的代码.

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Netty工具类HashedWheelTimer源码走读(三)
接上一篇(http://my.oschina.net/haogrgr/blog/490266(http://my.oschina.net/haogrgr/blog/490266) )8. Worker代码走读. //主要负责累加tick, 执行到期任务等.private final class Worker imple
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这
美凌格栋栋酱 美凌格栋栋酱
14小时前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(