Netty工具类HashedWheelTimer源码走读(一)

Stella981
• 阅读 674

1. 简单介绍.

    A Timer optimized for approximated I/O timeout scheduling. 

    关于Timer的介绍可以看看这篇文章, 写得不错 :  http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9A%E6%97%B6%E5%99%A8%EF%BC%88Timer%EF%BC%89%E7%9A%84%E5%AE%9E%E7%8E%B0.html

    可以看到, HashedWheelTimer 主要用来高效处理大量定时任务,  且任务对时间精度要求相对不高,  比如链接超时管理等场景, 缺点是,  内存占用相对较高.

2. 简单例子.

    1) 引入最新的Netty 5依赖 (不一定需要Netty5以前的版本里HashedWheelTimer就已存在)

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

    2) 例子代码(LocalTime是Java8的时间类)

import io.netty.util.HashedWheelTimer;
import java.time.LocalTime;
import java.util.concurrent.TimeUnit;

public class Temp {

    public static void main(String[] args) throws Exception {
        //创建Timer, 精度为100毫秒, 
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 16);

        System.out.println(LocalTime.now());

        timer.newTimeout((timeout) -> {
            System.out.println(LocalTime.now());
            System.out.println(timeout);
        }, 5, TimeUnit.SECONDS);

        //阻塞main线程
        System.in.read();
    }

}

    可以看到输出:

14:55:13.735
14:55:18.845
HashedWheelTimer$HashedWheelTimeout(deadline: 101009856 ns ago, task: com.haogrgr.test.main.Temp$$Lambda$4/708890004@1b797119)

3. 原理简介.

    1) 原理如图所示.

Netty工具类HashedWheelTimer源码走读(一)

    可以看到, 就像一个时钟一样, 那么如果写代码来实现一个时钟的话, 大概类似于这样:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.math.IntRange;

public class Temp {

    public static void main(String[] args) throws Exception {
        Clock clock = new Clock().start();
        for (int i = 0; i < 5; i++) {
            clock.echo();
            Thread.sleep(500);
        }
    }
}

final class Clock {

    static ScheduledExecutorService updater = Executors.newSingleThreadScheduledExecutor();

    private int tick = 0;
    private int[] wheel = new IntRange(1, 60).toArray();

    Clock start() { //1s累加一次tick, 当到60s时归零
        updater.scheduleAtFixedRate(() -> tick = ++tick % wheel.length, 0, 1, TimeUnit.SECONDS);
        return this;
    }

    void echo() {
        System.out.println("当前时钟 : " + wheel[tick]);
    }
}

    运行输出:

当前时钟 : 1
当前时钟 : 2
当前时钟 : 2
当前时钟 : 3
当前时钟 : 3

    回到主题, 来看看HashedWheelTimer的构造函数参数:

HashedWheelTimer(
    ThreadFactory threadFactory, //类似于Clock中的updater, 负责创建Worker线程.
    long tickDuration,           //时间刻度之间的时长(默认100ms), 通俗的说, 就是多久tick++一次.
    TimeUnit unit,               //tickDuration的单位.
    int ticksPerWheel            //类似于Clock中的wheel的长度(默认512).
):

    除了构造函数参数, 还有一个比较重要的概念, 轮(Round) :  一轮的时长为 tickDuration * ticksPerWheel, 也就是转一圈的时长.

    其中Worker线程为HashedWheelTimer的核心, 主要负责每过tickDuration时间就累加一次tick. 同时, 也负责执行到期的timeout任务, 同时, 也负责添加timeou任务到指定的wheel中.

    当添加Timeout任务的时候, 会根据设置的时间, 来计算出需要等待的时间长度, 根据时间长度, 进而算出要经过多少次tick, 然后根据tick的次数来算出经过多少轮, 最终得出task在wheel中的位置.

    例如, 如果任务设置为在100s后执行. 如果按照默认的HashedWheelTimer配置(tickDuration为100ms, wheel长为512)则:

任务需要经过的tick数为: (100 * 1000) / 100 = 1000次 (等待时长 / tickDuration)
任务需要经过的轮数为  : 1000次 / 512次/轮 = 1轮     (tick总次数 / ticksPerWheel)
任务存放的wheel索引为 : 1000 - 512 = 488            (走完n轮时间后, 还要多少个tick)

所以这里任务需要经过一轮后, 还要等待488次tick, 才会执行, 进而任务存放的wheel位置也就是488.

    到这里, 大概原理已经介绍完了, 接下来看源码吧.

4. 主要成员.

    1) HashedWheelTimer, 对外的类, 主要负责启动Worker线程, 添加任务等. 

    2) Worker, 内部负责添加任务, 累加tick, 执行任务等. 

    3) HashedWheelTimeout, 任务的包装类, 链表结构, 负责保存deadline, 轮数, 等.

    4) HashedWheelBucket, wheel数组元素, 负责存放HashedWheelTimeout链表.

5. HashedWheelTimer源码走读.

   下面是HashedWheelTimer的代码, 去掉了一些非关键代码.

public class HashedWheelTimer implements Timer {

    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER;
    static {
        AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = PlatformDependent
                .newAtomicIntegerFieldUpdater(HashedWheelTimer.class, "workerState");
        if (workerStateUpdater == null) {
            workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
        }
        WORKER_STATE_UPDATER = workerStateUpdater;
    }

    private final Worker worker = new Worker();
    private final Thread workerThread;

    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;
    @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down

    private final long tickDuration;
    private final HashedWheelBucket[] wheel;
    private final int mask;
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue();

    private volatile long startTime;

    //创建Timer
    public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
        //校验参数, 代码略

        //创建wheel数组, 和HashMap的entry数组长度类似, 为2的次方.
        wheel = createWheel(ticksPerWheel);

        //用于计算任务存放wheel的索引
        //因为wheel长度为2的次方, 则, 如果长度为16(10000), mask就为15(1111)
        //那么, 通过    n & mask 就可以实现 类似于 n % mask, 而 & 更高效........
        mask = wheel.length - 1;

        //tickDuration 不能大于 Long.MAX_VALUE / wheel.length, 也就是一轮的时间不能大于Long.MAX_VALUE 纳秒
        this.tickDuration = unit.toNanos(tickDuration);

        //创建worker线程
        workerThread = threadFactory.newThread(worker);
    }

    //创建wheel数组
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //参数校验, 略

        //2的次方
        ticksPerWheel = 1;
        while (ticksPerWheel < ticksPerWheel) {
            ticksPerWheel <<= 1;
        }

        //初始化
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

    //启动Timer, 不需要显示调用, 调用newTimeout时, 会自动调用该方法
    public void start() {
        //初始为WORKER_STATE_INIT, cas修改为WORKER_STATE_STARTED, 并启动worker线程
        switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
        }

        //等待worker启动, 并初始化startTime完成
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

    //停止Timer
    public Set<Timeout> stop() {
        //worker线程不能调用stop方法, 也就是我们添加的Task中不能调用stop方法.
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
                    + TimerTask.class.getSimpleName());
        }

        //cas修改状态为shutdown, 如果修改失败, 则当前状态只可能是WORKER_STATE_INIT和WORKER_STATE_SHUTDOWN
        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);//总是设置为WORKER_STATE_SHUTDOWN
            return Collections.emptySet();//状态为0和2时, 是没有遗留任务的.
        }

        //中断worker线程, worker线程中会轮询Timer状态的.
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            workerThread.interrupt();
            try {
                workerThread.join(100);
            } catch (InterruptedException ignored) {
                interrupted = true;
            }
        }

        //恢复中断标志
        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        //返回未处理的任务
        return worker.unprocessedTimeouts();
    }

    //添加定时任务, delay为延迟时间
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        start();//未启动, 则启动

        //任务先添加到timeouts队列中, 等待下一个tick时, 再添加到对应的wheel中去.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
}

    注释的比较详细了, 代码也比较简单, 就不多说了, 看注释吧.

    添加任务时, 并不是直接将人物添加到wheel中, 而是先放入队列, 再等待Worker线程在下一次tick时, 将人物放入wheel中.

    AtomicIntegerFieldUpdater是JUC的类, Netty会判断, 当存在Unsafe时, 会使用Netty自己利用Unsafe实现的UnsafeAtomicIntegerFieldUpdater.

    字数限制... 接第二篇...

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这