戳蓝字「TopCoder」关注我们哦!
编者注:Netty是Java领域有名的开源网络库,特点是高性能和高扩展性,因此很多流行的框架都是基于它来构建的,比如我们熟知的Dubbo、Rocketmq、Hadoop等,针对高性能RPC,一般都是基于Netty来构建,比如sofa-bolt。总之一句话,Java小伙伴们需要且有必要学会使用Netty并理解其实现原理。
关于Netty的入门讲解可参考:Netty 入门,这一篇文章就够了
Netty的启动流程(ServerBootstrap
),就是创建NioEventLoopGroup
(内部可能包含多个NioEventLoop,每个eventLoop是一个线程,内部包含一个FIFO的taskQueue和Selector)和ServerBootstrap实例,并进行bind的过程(bind流程涉及到channel的创建和注册),之后就可以对外提供服务了。
Netty的启动流程中,涉及到多个操作,比如register、bind、注册对应事件等,为了不影响main线程执行,这些工作以task的形式提交给NioEventLoop,由NioEventLoop来执行这些task,也就是register、bind、注册事件等操作。
NioEventLoop(准确来说是SingleThreadEventExecutor
)中包含了private volatile Thread thread
,该thread变量的初始化是在new的线程第一次执行run方式时才赋值的,这种形式挺新颖的。
Netty启动流程图如下所示:
大致了解了Netty启动流程之后,下面就按照Netty启动流程中涉及到的源码来进行分析。
netty启动流程分为server端和client端,不同之处就是前者监听端口,对外提供服务(socket->bind->listen操作),对应类ServerBootstrap;后者主动去连接远端端口(socket->connect),对应类Bootstrap。
server端启动流程
server端启动流程可以理解成创建ServerBootstrap实例的过程,就以下面代码为例进行分析(echo服务):
1public final class EchoServer { 2 static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); 3 4 public static void main(String[] args) throws Exception { 5 // bossGroup处理connect事件 6 // workerGroup处理read/write事件 7 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 8 EventLoopGroup workerGroup = new NioEventLoopGroup(); 9 EchoServerHandler serverHandler = new EchoServerHandler();10 try {11 ServerBootstrap b = new ServerBootstrap();12 b.group(bossGroup, workerGroup)13 .channel(NioServerSocketChannel.class)14 .option(ChannelOption.SO_BACKLOG, 100)15 .handler(new LoggingHandler(LogLevel.INFO))16 .childHandler(new ChannelInitializer<SocketChannel>() {17 @Override18 public void initChannel(SocketChannel ch) throws Exception {19 // 当连接建立后(register到childWorkerGroup前)初始化channel.pipeline20 ch.pipeline().addLast(serverHandler);21 }22 });2324 // Start the server.25 ChannelFuture f = b.bind(PORT).sync();26 // Wait until the server socket is closed.27 f.channel().closeFuture().sync();28 } finally {29 // Shut down all event loops to terminate all threads.30 bossGroup.shutdownGracefully();31 workerGroup.shutdownGracefully();32 }33 }34}3536public class EchoServerHandler extends ChannelInboundHandlerAdapter {37 @Override38 public void channelRead(ChannelHandlerContext ctx, Object msg) {39 ctx.write(msg);40 }4142 @Override43 public void channelReadComplete(ChannelHandlerContext ctx) {44 ctx.flush();45 }4647 @Override48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {49 // Close the connection when an exception is raised.50 cause.printStackTrace();51 ctx.close();52 }53}
EventLoopGroup创建
EventLoopGroup中可能包含了多个EventLoop,EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册和ServerSocket绑定操作等。关于NioEventLoop,后续专门写一篇文章分析,这里就不再展开,只需知道个大概即可,其架构图如下:
EventLoopGroup创建本质就是创建多个NioEventLoop,这里创建NioEventLoop就是初始化一个Reactor,包括selector和taskQueue。主要逻辑如下:
1protected MultithreadEventExecutorGroup(int nThreads, Executor executor, 2 EventExecutorChooserFactory chooserFactory, Object... args) { 3 // 创建NioEventLoop实例 4 children = new EventExecutor[nThreads]; 5 // 初始化NioEventLoop,实际调用的是NioEventLoopGroup.newChild方法 6 for (int i = 0; i < nThreads; i ++) { 7 children[i] = newChild(executor, args); 8 } 910 // 多个NioEventLoop中选择策略11 chooser = chooserFactory.newChooser(children);12}1314NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,15 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {16 // 创建taskQueue17 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);18 // 是不是很熟悉,java nio selector操作19 provider = selectorProvider;20 final SelectorTuple selectorTuple = openSelector();21 selector = selectorTuple.selector;22 unwrappedSelector = selectorTuple.unwrappedSelector;23 selectStrategy = strategy;24}
EventLoopGroup创建OK后,启动的第一步就算完成了,接下来该进行bind、listen操作了。
ServerBootstrap流程
bind操作
bind操作是ServerBootstrap流程重要的一环,bind流程涉及到NioChannel的创建、初始化和注册(到Selector),启动NioEventLoop,之后就可以对外提供服务了。
1public ChannelFuture bind(SocketAddress localAddress) { 2 validate(); // 参数校验 3 return doBind(localAddress); 4} 5private ChannelFuture doBind(final SocketAddress localAddress) { 6 // 1. 初始化注册操作 7 final ChannelFuture regFuture = initAndRegister(); 8 final Channel channel = regFuture.channel(); 9 if (regFuture.cause() != null) {10 return regFuture;11 }1213 // 2. doBind0操作14 if (regFuture.isDone()) {15 // register已完成,这里直接调用doBind016 ChannelPromise promise = channel.newPromise();17 doBind0(regFuture, channel, localAddress, promise);18 return promise;19 } else {20 // register还未完成,注册listener回调,在回调中调用doBind021 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);22 regFuture.addListener(new ChannelFutureListener() {23 /**24 * channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,25 * 会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法26 */27 @Override28 public void operationComplete(ChannelFuture future) throws Exception {29 Throwable cause = future.cause();30 if (cause != null) {31 promise.setFailure(cause);32 } else {33 promise.registered();34 doBind0(regFuture, channel, localAddress, promise);35 }36 }37 });38 return promise;39 }40}
这里涉及到2个操作,一个是channel的创建、初始化、注册操作,另一个是bind操作,下面兵分两路,分别来讲。
注意,这里如果main线程执行到regFuture.isDone()时,register还未完成,那么main线程是不会直接调用bind操作的,而是往regFuture上注册一个Listenner,这样channel register完成(注册到Selector并且调用了invokeHandlerAddedIfNeeded)之后,会调用safeSetSuccess,触发各个ChannelFutureListener,最终会调用到这里的operationComplete方法,进而在执行bind操作。
channel初始化、注册操作
1final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 // 1.创建(netty自定义)Channel实例,并初始化 5 // channel为 NioServerSocketChannel 实例,NioServerSocketChannel的父类AbstractNioChannel保存有nio的ServerSocketChannel 6 channel = channelFactory.newChannel(); 7 // 2.初始化channel() 8 init(channel); 9 } catch (Throwable t) {10 }1112 // 3.向Selector注册channel13 ChannelFuture regFuture = config().group().register(channel);14 if (regFuture.cause() != null) {15 if (channel.isRegistered()) {16 channel.close();17 } else {18 channel.unsafe().closeForcibly();19 }20 }2122 return regFuture;23}
这里重点关注下初始化channel流程,主要操作是设置channel属性、设置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注册到selector之后被回调的。
1/** 2 * 初始channel属性,也就是ChannelOption对应socket的各种属性。 3 * 比如 SO_KEEPALIVE SO_RCVBUF ... 可以与Linux中的setsockopt函数对应起来。 4 * 最后将ServerBootstrapAcceptor添加到对应channel的ChannelPipeline中。 5 */ 6@Override 7void init(Channel channel) throws Exception { 8 final Map<ChannelOption<?>, Object> options = options0(); 9 synchronized (options) {10 setChannelOptions(channel, options, logger);11 }1213 ChannelPipeline p = channel.pipeline();14 // 获取childGroup和childHandler,传递给ServerBootstrapAcceptor15 final EventLoopGroup currentChildGroup = childGroup;16 final ChannelHandler currentChildHandler = childHandler;17 final Entry<ChannelOption<?>, Object>[] currentChildOptions;18 final Entry<AttributeKey<?>, Object>[] currentChildAttrs;19 synchronized (childOptions) {20 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));21 }22 synchronized (childAttrs) {23 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));24 }2526 p.addLast(new ChannelInitializer<Channel>() {27 /**28 * 在register0中,将channel注册到Selector之后,会调用invokeHandlerAddedIfNeeded,29 * 进而调用到这里的initChannel方法30 */31 @Override32 public void initChannel(final Channel ch) throws Exception {33 final ChannelPipeline pipeline = ch.pipeline();34 ChannelHandler handler = config.handler();35 if (handler != null) {36 pipeline.addLast(handler);37 }3839 // 这里注册一个添加ServerBootstrapAcceptor的任务40 ch.eventLoop().execute(new Runnable() {41 @Override42 public void run() {43 // 添加ServerBootstrapAcceptor44 pipeline.addLast(new ServerBootstrapAcceptor(45 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));46 }47 });48 }49 });50}
channel初始化之后就该将其注册到selector,即下面的register流程:
1public ChannelFuture register(Channel channel) { 2 // next()挑选一个EventLoop,默认轮询选择某个NioEventLoop 3 return next().register(channel); 4} 5public ChannelFuture register(final ChannelPromise promise) { 6 promise.channel().unsafe().register(this, promise); 7 return promise; 8} 9// AbstractChannel10public final void register(EventLoop eventLoop, final ChannelPromise promise) {11 AbstractChannel.this.eventLoop = eventLoop;1213 // 直接执行register0或者以任务方式提交执行14 // 启动时,首先执行到这里的是main线程,所以是以任务的方式来提交执行的。15 // 也就是说,该任务是NioEventLoop第一次执行的任务,即调用register016 if (eventLoop.inEventLoop()) {17 register0(promise);18 } else {19 // 往NioEventLoop中(任务队列)添加任务时,如果NioEventLoop线程还未启动,则启动该线程20 eventLoop.execute(new Runnable() {21 @Override22 public void run() {23 register0(promise);24 }25 });26 }27}
register操作
register操作之后伴随着多个回调及listener的触发:
1// AbstractChannel$AbstractUnsafe 2private void register0(ChannelPromise promise) { 3 boolean firstRegistration = neverRegistered; 4 // 这里调用的是AbstractNioChannel.doRegister 5 // 这里将channel注册上去,并没有关注对应的事件(read/write事件) 6 doRegister(); 7 neverRegistered = false; 8 registered = true; 910 // 调用handlerAdd事件,这里就会调用initChannel方法,设置channel.pipeline,也就是添加 ServerBootstrapAcceptor11 pipeline.invokeHandlerAddedIfNeeded();1213 // 调用operationComplete回调14 safeSetSuccess(promise);15 // 回调fireChannelRegistered16 pipeline.fireChannelRegistered();17 // Only fire a channelActive if the channel has never been registered. This prevents firing18 // multiple channel actives if the channel is deregistered and re-registered.19 if (isActive()) {20 if (firstRegistration) {21 // 回调fireChannelActive22 pipeline.fireChannelActive();23 } else if (config().isAutoRead()) {24 beginRead();25 }26 }27}
上面代码中的initChannel回调也就是设置对外监听channel的channelHanlder为ServerBootstrapAcceptor;operationComplete回调也就是触发ChannelFutureListener.operationComplete
,这里会进行后续的doBind操作。
1// AbstractBootstrap 2private static void doBind0( 3 final ChannelFuture regFuture, final Channel channel, 4 final SocketAddress localAddress, final ChannelPromise promise) { 5 // doBind0向EventLoop任务队列中添加一个bind任务来完成后续操作。 6 channel.eventLoop().execute(new Runnable() { 7 @Override 8 public void run() { 9 if (regFuture.isSuccess()) {10 // bind操作11 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);12 }13 }14 });15}
bind操作
在回顾上面的bind操作代码,bind操作是在register之后进行的,因为register0是由NioEventLoop执行的,所以main线程需要先判断下future是否完成,如果完成直接进行doBind即可,否则添加listener回调进行doBind。
bind操作及后续初始化操作(channelActive回调、设置监听事件)
1public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 2 boolean wasActive = isActive(); 3 try { 4 // 调用底层bind操作 5 doBind(localAddress); 6 } catch (Throwable t) { 7 safeSetFailure(promise, t); 8 closeIfClosed(); 9 return;10 }1112 if (!wasActive && isActive()) {13 invokeLater(new Runnable() {14 @Override15 public void run() {16 pipeline.fireChannelActive();17 }18 });19 }20 safeSetSuccess(promise);21}2223// 最后底层bind逻辑bind入参包括了backlog,也就是底层会进行listen操作24// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel25protected void doBind(SocketAddress localAddress) throws Exception {26 if (PlatformDependent.javaVersion() >= 7) {27 javaChannel().bind(localAddress, config.getBacklog());28 } else {29 javaChannel().socket().bind(localAddress, config.getBacklog());30 }31}3233public void channelActive(ChannelHandlerContext ctx) throws Exception {34 // 回调fireChannelActive35 ctx.fireChannelActive();3637 // 设置selectKey监听事件,对于监听端口就是SelectionKey.OP_ACCEPT,对于新建连接就是SelectionKey.OP_READ38 readIfIsAutoRead();39}
到这里为止整个netty启动流程就基本接近尾声,可以对外提供服务了。
推荐阅读
欢迎小伙伴 关注【TopCoder】 阅读更多精彩好文。
本文分享自微信公众号 - TopCoder(gh_12e4a74a5c9c)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。