Netty 启动流程解析

Stella981
• 阅读 859

戳蓝字「TopCoder」关注我们哦!

Netty 启动流程解析

编者注:Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如sofa-bolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。

关于Netty的入门讲解可参考:Netty 入门,这一篇文章就够了

Netty的启动流程(ServerBootstrap),就是创建NioEventLoopGroup(内部可能包含多个NioEventLoop,每个eventLoop是一个线程,内部包含一个FIFO的taskQueue和Selector)和ServerBootstrap实例,并进行bind的过程(bind流程涉及到channel的创建和注册),之后就可以对外提供服务了。

Netty的启动流程中,涉及到多个操作,比如register、bind、注册对应事件等,为了不影响main线程执行,这些工作以task的形式提交给NioEventLoop,由NioEventLoop来执行这些task,也就是register、bind、注册事件等操作。

NioEventLoop(准确来说是SingleThreadEventExecutor)中包含了private volatile Thread thread,该thread变量的初始化是在new的线程第一次执行run方式时才赋值的,这种形式挺新颖的。

Netty 启动流程解析

Netty启动流程图如下所示:

Netty 启动流程解析

大致了解了Netty启动流程之后,下面就按照Netty启动流程中涉及到的源码来进行分析。

netty启动流程分为server端和client端,不同之处就是前者监听端口,对外提供服务(socket->bind->listen操作),对应类ServerBootstrap;后者主动去连接远端端口(socket->connect),对应类Bootstrap。

server端启动流程

server端启动流程可以理解成创建ServerBootstrap实例的过程,就以下面代码为例进行分析(echo服务):

 1public final class EchoServer { 2    static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); 3 4    public static void main(String[] args) throws Exception { 5        // bossGroup处理connect事件 6        // workerGroup处理read/write事件 7        EventLoopGroup bossGroup = new NioEventLoopGroup(1); 8        EventLoopGroup workerGroup = new NioEventLoopGroup(); 9        EchoServerHandler serverHandler = new EchoServerHandler();10        try {11            ServerBootstrap b = new ServerBootstrap();12            b.group(bossGroup, workerGroup)13             .channel(NioServerSocketChannel.class)14             .option(ChannelOption.SO_BACKLOG, 100)15             .handler(new LoggingHandler(LogLevel.INFO))16             .childHandler(new ChannelInitializer<SocketChannel>() {17                 @Override18                 public void initChannel(SocketChannel ch) throws Exception {19                     // 当连接建立后(register到childWorkerGroup前)初始化channel.pipeline20                     ch.pipeline().addLast(serverHandler);21                 }22             });2324            // Start the server.25            ChannelFuture f = b.bind(PORT).sync();26            // Wait until the server socket is closed.27            f.channel().closeFuture().sync();28        } finally {29            // Shut down all event loops to terminate all threads.30            bossGroup.shutdownGracefully();31            workerGroup.shutdownGracefully();32        }33    }34}3536public class EchoServerHandler extends ChannelInboundHandlerAdapter {37    @Override38    public void channelRead(ChannelHandlerContext ctx, Object msg) {39        ctx.write(msg);40    }4142    @Override43    public void channelReadComplete(ChannelHandlerContext ctx) {44        ctx.flush();45    }4647    @Override48    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {49        // Close the connection when an exception is raised.50        cause.printStackTrace();51        ctx.close();52    }53}

EventLoopGroup创建

EventLoopGroup中可能包含了多个EventLoop,EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册和ServerSocket绑定操作等。关于NioEventLoop,后续专门写一篇文章分析,这里就不再展开,只需知道个大概即可,其架构图如下:

Netty 启动流程解析

EventLoopGroup创建本质就是创建多个NioEventLoop,这里创建NioEventLoop就是初始化一个Reactor,包括selector和taskQueue。主要逻辑如下:

 1protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2                                            EventExecutorChooserFactory chooserFactory, Object... args) { 3    // 创建NioEventLoop实例 4    children = new EventExecutor[nThreads]; 5    // 初始化NioEventLoop,实际调用的是NioEventLoopGroup.newChild方法 6    for (int i = 0; i < nThreads; i ++) { 7        children[i] = newChild(executor, args); 8    } 910    // 多个NioEventLoop中选择策略11    chooser = chooserFactory.newChooser(children);12}1314NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,15                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {16    // 创建taskQueue17    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);18    // 是不是很熟悉,java nio selector操作19    provider = selectorProvider;20    final SelectorTuple selectorTuple = openSelector();21    selector = selectorTuple.selector;22    unwrappedSelector = selectorTuple.unwrappedSelector;23    selectStrategy = strategy;24}

EventLoopGroup创建OK后,启动的第一步就算完成了,接下来该进行bind、listen操作了。

ServerBootstrap流程

bind操作

bind操作是ServerBootstrap流程重要的一环,bind流程涉及到NioChannel的创建、初始化和注册(到Selector),启动NioEventLoop,之后就可以对外提供服务了。

 1public ChannelFuture bind(SocketAddress localAddress) { 2    validate(); // 参数校验 3    return doBind(localAddress); 4} 5private ChannelFuture doBind(final SocketAddress localAddress) { 6    // 1. 初始化注册操作 7    final ChannelFuture regFuture = initAndRegister(); 8    final Channel channel = regFuture.channel(); 9    if (regFuture.cause() != null) {10        return regFuture;11    }1213    // 2. doBind0操作14    if (regFuture.isDone()) {15        // register已完成,这里直接调用doBind016        ChannelPromise promise = channel.newPromise();17        doBind0(regFuture, channel, localAddress, promise);18        return promise;19    } else {20        // register还未完成,注册listener回调,在回调中调用doBind021        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);22        regFuture.addListener(new ChannelFutureListener() {23            /**24             * channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,25             * 会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法26             */27            @Override28            public void operationComplete(ChannelFuture future) throws Exception {29                Throwable cause = future.cause();30                if (cause != null) {31                    promise.setFailure(cause);32                } else {33                    promise.registered();34                    doBind0(regFuture, channel, localAddress, promise);35                }36            }37        });38        return promise;39    }40}

这里涉及到2个操作,一个是channel的创建、初始化、注册操作,另一个是bind操作,下面兵分两路,分别来讲。

注意,这里如果main线程执行到regFuture.isDone()时,register还未完成,那么main线程是不会直接调用bind操作的,而是往regFuture上注册一个Listenner,这样channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法,进而在执行bind操作。

channel初始化、注册操作

 1final ChannelFuture initAndRegister() { 2    Channel channel = null; 3    try { 4        // 1.创建(netty自定义)Channel实例,并初始化 5        // channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannel 6        channel = channelFactory.newChannel(); 7        // 2.初始化channel() 8        init(channel); 9    } catch (Throwable t) {10    }1112    // 3.向Selector注册channel13    ChannelFuture regFuture = config().group().register(channel);14    if (regFuture.cause() != null) {15        if (channel.isRegistered()) {16            channel.close();17        } else {18            channel.unsafe().closeForcibly();19        }20    }2122    return regFuture;23}

这里重点关注下初始化channel流程,主要操作是设置channel属性、设置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注册到selector之后被回调的。

 1/** 2 * 初始channel属性,也就是ChannelOption对应socket的各种属性。 3 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以与Linux中的setsockopt函数对应起来。 4 * 最后将ServerBootstrapAcceptor添加到对应channel的ChannelPipeline中。 5 */ 6@Override 7void init(Channel channel) throws Exception { 8    final Map<ChannelOption<?>, Object> options = options0(); 9    synchronized (options) {10        setChannelOptions(channel, options, logger);11    }1213    ChannelPipeline p = channel.pipeline();14    // 获取childGroup和childHandler,传递给ServerBootstrapAcceptor15    final EventLoopGroup currentChildGroup = childGroup;16    final ChannelHandler currentChildHandler = childHandler;17    final Entry<ChannelOption<?>, Object>[] currentChildOptions;18    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;19    synchronized (childOptions) {20        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));21    }22    synchronized (childAttrs) {23        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));24    }2526    p.addLast(new ChannelInitializer<Channel>() {27        /**28         * 在register0中,将channel注册到Selector之后,会调用invokeHandlerAddedIfNeeded,29         * 进而调用到这里的initChannel方法30         */31        @Override32        public void initChannel(final Channel ch) throws Exception {33            final ChannelPipeline pipeline = ch.pipeline();34            ChannelHandler handler = config.handler();35            if (handler != null) {36                pipeline.addLast(handler);37            }3839            // 这里注册一个添加ServerBootstrapAcceptor的任务40            ch.eventLoop().execute(new Runnable() {41                @Override42                public void run() {43                    // 添加ServerBootstrapAcceptor44                    pipeline.addLast(new ServerBootstrapAcceptor(45                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));46                }47            });48        }49    });50}

channel初始化之后就该将其注册到selector,即下面的register流程:

 1public ChannelFuture register(Channel channel) { 2    // next()挑选一个EventLoop,默认轮询选择某个NioEventLoop 3    return next().register(channel); 4} 5public ChannelFuture register(final ChannelPromise promise) { 6    promise.channel().unsafe().register(this, promise); 7    return promise; 8} 9// AbstractChannel10public final void register(EventLoop eventLoop, final ChannelPromise promise) {11    AbstractChannel.this.eventLoop = eventLoop;1213    // 直接执行register0或者以任务方式提交执行14    // 启动时,首先执行到这里的是main线程,所以是以任务的方式来提交执行的。15    // 也就是说,该任务是NioEventLoop第一次执行的任务,即调用register016    if (eventLoop.inEventLoop()) {17        register0(promise);18    } else {19        // 往NioEventLoop中(任务队列)添加任务时,如果NioEventLoop线程还未启动,则启动该线程20        eventLoop.execute(new Runnable() {21            @Override22            public void run() {23                register0(promise);24            }25        });26    }27}

register操作

register操作之后伴随着多个回调及listener的触发:

 1// AbstractChannel$AbstractUnsafe 2private void register0(ChannelPromise promise) { 3    boolean firstRegistration = neverRegistered; 4    // 这里调用的是AbstractNioChannel.doRegister 5    // 这里将channel注册上去,并没有关注对应的事件(read/write事件) 6    doRegister(); 7    neverRegistered = false; 8    registered = true; 910    // 调用handlerAdd事件,这里就会调用initChannel方法,设置channel.pipeline,也就是添加 ServerBootstrapAcceptor11    pipeline.invokeHandlerAddedIfNeeded();1213    // 调用operationComplete回调14    safeSetSuccess(promise);15    // 回调fireChannelRegistered16    pipeline.fireChannelRegistered();17    // Only fire a channelActive if the channel has never been registered. This prevents firing18    // multiple channel actives if the channel is deregistered and re-registered.19    if (isActive()) {20        if (firstRegistration) {21            // 回调fireChannelActive22            pipeline.fireChannelActive();23        } else if (config().isAutoRead()) {24            beginRead();25        }26    }27}

上面代码中的initChannel回调也就是设置对外监听channel的channelHanlder为ServerBootstrapAcceptor;operationComplete回调也就是触发ChannelFutureListener.operationComplete,这里会进行后续的doBind操作。

 1// AbstractBootstrap 2private static void doBind0( 3        final ChannelFuture regFuture, final Channel channel, 4        final SocketAddress localAddress, final ChannelPromise promise) { 5    // doBind0向EventLoop任务队列中添加一个bind任务来完成后续操作。 6    channel.eventLoop().execute(new Runnable() { 7        @Override 8        public void run() { 9            if (regFuture.isSuccess()) {10                // bind操作11                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);12            }13        }14    });15}

bind操作

在回顾上面的bind操作代码,bind操作是在register之后进行的,因为register0是由NioEventLoop执行的,所以main线程需要先判断下future是否完成,如果完成直接进行doBind即可,否则添加listener回调进行doBind。

Netty 启动流程解析

bind操作及后续初始化操作(channelActive回调、设置监听事件)

 1public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 2    boolean wasActive = isActive(); 3    try { 4        // 调用底层bind操作 5        doBind(localAddress); 6    } catch (Throwable t) { 7        safeSetFailure(promise, t); 8        closeIfClosed(); 9        return;10    }1112    if (!wasActive && isActive()) {13        invokeLater(new Runnable() {14            @Override15            public void run() {16                pipeline.fireChannelActive();17            }18        });19    }20    safeSetSuccess(promise);21}2223// 最后底层bind逻辑bind入参包括了backlog,也就是底层会进行listen操作24// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel25protected void doBind(SocketAddress localAddress) throws Exception {26    if (PlatformDependent.javaVersion() >= 7) {27        javaChannel().bind(localAddress, config.getBacklog());28    } else {29        javaChannel().socket().bind(localAddress, config.getBacklog());30    }31}3233public void channelActive(ChannelHandlerContext ctx) throws Exception {34    // 回调fireChannelActive35    ctx.fireChannelActive();3637    // 设置selectKey监听事件,对于监听端口就是SelectionKey.OP_ACCEPT,对于新建连接就是SelectionKey.OP_READ38    readIfIsAutoRead();39}

到这里为止整个netty启动流程就基本接近尾声,可以对外提供服务了。

推荐阅读

欢迎小伙伴 关注【TopCoder】 阅读更多精彩好文。

Netty 启动流程解析

本文分享自微信公众号 - TopCoder(gh_12e4a74a5c9c)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Netty 入门,这一篇文章就够了
Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如sockbolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。netty旨在为可维护的高性能、高可扩展
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这