Netty里面的Boss和Worker【Server篇】

Stella981
• 阅读 1253

#Netty里面的Boss和Worker【Server篇】 最近在总结Dubbo关于Netty通信方面的实现,于是也就借此机会深入体会了一下Netty。一般启动Netty的Server端时都会设置两个ExecutorService对象,我们都习惯用boss,worker两个变量来引用这两个对象,于是从我一开始接触Netty就有了boss和worker的概念。这篇博客将对boss和worker进行介绍,但并不是涉及Netty其他部分介绍。

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。

<!--lang:java-->
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

上面这段代码是Dubbo用来开启服务的,也是大部分使用Netty进行服务端开发常用的方式启动服务端。首先是设置boss和worker的线程池,以能够让它们在各自的线程池里面异步执行。当调用bootstrap.bind(getBindAddress())的时候最终受理绑定操作的是NioServerSocketPipelineSinkeventSunk方法,看类名和方法签名就应该知道是处理IO事件的。方法eventSunk实现如下:

<!--lang:java-->
public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    Channel channel = e.getChannel();
    if (channel instanceof NioServerSocketChannel) {
        handleServerSocket(e);
    } else if (channel instanceof NioSocketChannel) {
        handleAcceptedSocket(e);
    }
}

由于这个时候Server还处于bind阶段,所以channel肯定不是NioSocketChannel,于是就到了方法handleServerSocket里面,最后将会调用bind方法来绑定某个端口启动服务。下面是bind方法实现:

<!--lang:java-->
private void bind(
        NioServerSocketChannel channel, ChannelFuture future,
        SocketAddress localAddress) {

    boolean bound = false;
    boolean bossStarted = false;
    try {
        channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
        bound = true;

        future.setSuccess();
        fireChannelBound(channel, channel.getLocalAddress());

        Executor bossExecutor =
            ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
        DeadLockProofWorker.start(
                bossExecutor,
                new ThreadRenamingRunnable(
                        new Boss(channel),
                        "New I/O server boss #" + id + " (" + channel + ')'));
        bossStarted = true;
    } catch (Throwable t) {
        future.setFailure(t);
        fireExceptionCaught(channel, t);
    } finally {
        if (!bossStarted && bound) {
            close(channel, future);
        }
    }
}

可以看到socket的绑定以及设置异步的future成功,已通知服务启动成功,同时将绑定成功事件通知出去。接下来我看的重点来了,就是bossExecutor,可以看到它是通过NioServerSocketChannelFactory里面去获取的,NioServerSocketChannelFactory里面的boss就是之前我们设置进去的,可以确定我们之前设置boss的异步线程池是在这里被使用了。紧接下来的是启动我们的异步线程池,到这里进入了Boss该做的事情,Boss其实是实现了Runnable接口,从而可以交给boss的线程池运行,接下来的关注点就是Boss的run方法,这里才是Boss做事情的地方。再此之前先看看Boss初始化做了什么事情:

<!--lang:java-->
Boss(NioServerSocketChannel channel) throws IOException {
        this.channel = channel;

        selector = Selector.open();

        boolean registered = false;
        try {
            channel.socket.register(selector, SelectionKey.OP_ACCEPT);
            registered = true;
        } finally {
            if (!registered) {
                closeSelector();
            }
        }

        channel.selector = selector;
    }

Boss初始化过程中其实就是将serversocket注册到一个selector里面,从而可以实现NIO的异步IO处理。

<!--lang:java-->
public void run() {
        final Thread currentThread = Thread.currentThread();

        channel.shutdownLock.lock();
        try {
            for (;;) {
                try {
                    if (selector.select(1000) > 0) {
                        selector.selectedKeys().clear();
                    }

                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket != null) {
                        registerAcceptedChannel(acceptedSocket, currentThread);
                    }
                } catch (SocketTimeoutException e) {
                    // Thrown every second to get ClosedChannelException
                    // raised.
                } catch (CancelledKeyException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedSelectorException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedChannelException e) {
                    // Closed as requested.
                    break;
                } catch (Throwable e) {
                    logger.warn(
                            "Failed to accept a connection.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        // Ignore
                    }
                }
            }
        } finally {
            channel.shutdownLock.unlock();
            closeSelector();
        }
    }

run方法里面是一个死循环,里面在不间断的等待客户端的连接,如果有客户端的连接,那么将会调用方法registerAcceptedChannel进行后续的处理。

<!--lang:java-->
 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
        try {
            ChannelPipeline pipeline =
                channel.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = nextWorker();
            worker.register(new NioAcceptedSocketChannel(
                    channel.getFactory(), pipeline, channel,
                    NioServerSocketPipelineSink.this, acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            logger.warn(
                    "Failed to initialize an accepted socket.", e);
            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                logger.warn(
                        "Failed to close a partially accepted socket.",
                        e2);
            }
        }
    }

方法registerAcceptedChannel就是将客户端的channle分配给一个worker,而这个worker是通过方法nextWorker获取 NioWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; }

可以看到方法nextWorker是一个让worker里面的客户端channel保持平衡的作用,可能你会疑问这个workers是哪里来的,其实是在上面初始化NioServerSocketChannelFactory的时候,NioServerSocketChannelFactory再去初始化NioServerSocketPipelineSink时候构造出来的,默认情况下workers的数量是我们初始化NioServerSocketChannelFactory设置进去的。可以看到是调用worker的register方法将客户端的channel注册到worker里面的。

<!--lang:java-->
void register(NioSocketChannel channel, ChannelFuture future) {

    boolean server = !(channel instanceof NioClientSocketChannel);
    Runnable registerTask = new RegisterTask(channel, future, server);
    Selector selector;

    synchronized (startStopLock) {
        if (!started) {
            .....
                this.selector = selector = Selector.open();
           .....
                DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;
          .....
        } else {
            selector = this.selector;
        }

        assert selector != null && selector.isOpen();

        started = true;
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;
    }

    if (wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

上面对worker有一个started状态的检测,如果没启动,则启动worker,这个额一般都是将第一个客户端的channel注册到worker里面才进行的。由于worker也是实现了Rannable接口,所以启动的主要工作就是让worker在某个线程里面跑起来,并且为这个worker分配一个selector,用来进行监控IO事件。下面便是这个过程实现:

<!--lang:java-->
  DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;

其中的executor便是我们一开始设置的workerExecutor。 worker启动成功之后,接下来要做的便是让worker管理器客户端的channel

<!--lang:java-->
 Runnable registerTask = new RegisterTask(channel, future, server);
    .......
 boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

worker是将客户端包装成一个RegisterTask,然后放入队列,可见RegisterTask也实现了Runnable接口。那放入队列以后谁去取这个队列里面的数据呢?当然,肯定是worker去取。上面介绍启动worker的时候是让worker在某个线程里面跑起来,并且worker是实现了Rannable方法,于是运行worker的线程肯定是调用worker的run方法。

<!--lang:java-->
 public void run() {
    thread = Thread.currentThread();
    boolean shutdown = false;
    Selector selector = this.selector;
    for (;;) {
         .....
        try {
            SelectorUtil.select(selector);
            .....

            cancelledKeys = 0;
            processRegisterTaskQueue();
            processWriteTaskQueue();
            processSelectedKeys(selector.selectedKeys());
             .....
        } catch (Throwable t) {
             
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
              
            }
        }
    }
}

可以看到run方法里面也是一个死循环,在不断的轮询调用selector的select IO的事件。接下来会调用三个方法processRegisterTaskQueue,processWriteTaskQueueprocessSelectedKeys。通过方法签名就应该知道这个三个方法具体是做什么事情的,第一个是处理上面registerTaskQueue的,并且queue里面对象的run方法,而第二个processWriteTaskQueue是处理写任务的,而processSelectedKeys是处理selector匹配的IO事件。我们先看看registerTaskQueue是做了什么?

<!--lang:java-->
 private void processRegisterTaskQueue() throws IOException {
    for (;;) {
        final Runnable task = registerTaskQueue.poll();
        if (task == null) {
            break;
        }

        task.run();
        cleanUpCancelledKeys();
    }
}

上面介绍过registerTaskQueue里面的元素是RegisterTask。所以需要去看看RegisterTask的run方法实现,其中RegisterTaskNioWorker里面的内部类,所以RegisterTask是可以访问NioWorker的元素信息。

<!--lang:java-->
 public void run() {
        SocketAddress localAddress = channel.getLocalAddress();
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (localAddress == null || remoteAddress == null) {
            if (future != null) {
                future.setFailure(new ClosedChannelException());
            }
            close(channel, succeededFuture(channel));
            return;
        }

        try {
            if (server) {
                channel.socket.configureBlocking(false);
            }

            synchronized (channel.interestOpsLock) {
                channel.socket.register(
                        selector, channel.getRawInterestOps(), channel);
            }
            if (future != null) {
                channel.setConnected();
                future.setSuccess();
            }
        } catch (IOException e) {
            if (future != null) {
                future.setFailure(e);
            }
            close(channel, succeededFuture(channel));
           ....
        }

        if (!server) {
            if (!((NioClientSocketChannel) channel).boundManually) {
                fireChannelBound(channel, localAddress);
            }
            fireChannelConnected(channel, remoteAddress);
        }
    }

可以看到这里面主要做的事情是将Boss分配给worker的客户端channel和worker的selector关联上,从而worker可以处理该客户端channel的IO事件。

到这里就完成了由Boss接收到一个客户端连接,到分配给某个worker,以及worker是怎么去和客户端的channel关联的,其中由于worker有可能为多个客户端channel服务,所以worker并不会直接和某个channel产生引用,而是将客户端的channel注册在该worker的selector上面,worker的run方法里面通过不断对selector的select轮询,以达到对channel进行处理。接下来看看worker怎么处理selector的io事件的

<!--java:lang-->
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

上面的方法完成的是处理selector产生的io事件,其中如果当前IO时间是读,那么将SelectionKey中的channel流进行读出,并且向上交给Netty的Handler。如果是当前某个channel的写满足条件,则触发writeFromSelectorLoop查看是否有待写出的内容。

对于写数据Netty在worker提供了三种入口

<!--lang:java-->
void writeFromUserCode(final NioSocketChannel channel) {
    if (!channel.isConnected()) {
        cleanUpWriteBuffer(channel);
        return;
    }

    if (scheduleWriteIfNecessary(channel)) {
        return;
    }

    if (channel.writeSuspended) {
        return;
    }

    if (channel.inWriteNowLoop) {
        return;
    }

    write0(channel);
}

void writeFromTaskLoop(final NioSocketChannel ch) {
    if (!ch.writeSuspended) {
        write0(ch);
    }
}

void writeFromSelectorLoop(final SelectionKey k) {
    NioSocketChannel ch = (NioSocketChannel) k.attachment();
    ch.writeSuspended = false;
    write0(ch);
}

其中writeFromUserCode是提供外部直接写出的,writeFromTaskLoop是在worker的run方法调用processWriteTaskQueue时候会触发。

点赞
收藏
评论区
推荐文章
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
3年前
Reactor模式的.net版本简单实现
    近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。    闲话少说,进入正题。netty的性能之所以能够
Stella981 Stella981
3年前
Netty源码分析(二):服务端启动
上一篇粗略的介绍了一下netty,本篇将详细介绍Netty的服务器的启动过程。ServerBootstrap看过上篇事例的人,可以知道ServerBootstrap是Netty服务端启动中扮演着一个重要的角色。它是Netty提供的一个服务端引导类,继承自AbstractBootstrap。Serv
Stella981 Stella981
3年前
Netty创建服务器与客户端
Netty创建Server服务端Netty创建全部都是实现自AbstractBootstrap。客户端的是Bootstrap,服务端的则是ServerBootstrap。创建一个HelloServerpackageorg.examp
Stella981 Stella981
3年前
Netty之大名鼎鼎的EventLoop
EventLoopGroup与Reactor:前面的章节中我们已经知道了,一个Netty程序启动时,至少要指定一个EventLoopGroup(如果使用到的是NIO,通常是指NioEventLoopGroup),那么,这个NioEventLoopGroup在Netty中到底扮演着什么角色呢?我们知道,Netty是Reactor模型的
Stella981 Stella981
3年前
Netty在Dubbo中的线程名称
在项目中,我们会使用RocketMQ和Dubbo.前者用于发送或消费消息,后者用于两个模块之间的接口调用.RocketMQ和Dubbo在它们的底层都使用Netty作为网络通信的框架.那么今天我们就来看一下,在Dubbo中,使用的Netty线程名称叫什么?环境和流程如下1.启动zookeeper2.一个简单的Dubbo提供者,并启动它
Wesley13 Wesley13
3年前
Netty4.0学习笔记系列之一:Server与Client的通讯
本文是学习Netty的第一篇文章,主要对Netty的Server和Client间的通讯机制进行验证。Server与Client建立连接后,会执行以下的步骤:1、Client向Server发送消息:Areyouok?2、Server接收客户端发送的消息,并打印出来。3、Server端向客户端发送消息:Iamok!4、Client接收
Stella981 Stella981
3年前
Netty之粘包问题解决
最近接到一项新的任务,其中涉及到用netty解决粘包问题,该问题解决得很顺利。下面是一些心得体会。我们知道,netty当中有boss线程和worker线程,通常是1对多的关系,可以理解为boss接到客户的请求之后,分配给其中一个worker去处理,如果客户过多,可能会出现一个worker服务多个客户的情况。这是背景。按照我的理解,粘包问题解决的关键在
Stella981 Stella981
3年前
NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战
前言本文将演示一个iOS客户端程序,通过UDP协议与两个典型的NIO框架服务端,实现跨平台双向通信的完整Demo。服务端将分别用MINA2和Netty4进行实现,而通信时服务端你只需选其一就行了。同时用MINA2和Netty4分别实现服务端的目的,是因为很多人都在纠结到底是用MINA还是Netty来实现高并发的Java网络通信服务端
Stella981 Stella981
3年前
Netty 学习笔记(1)
服务端启动流程packagecom.example.netty;importcom.example.netty.handler.HelloServerHandler;importio.netty.bootstrap.ServerBootstrap;importio.netty.cha