因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。
为了加强NIO的性能,我们加入多线程的操作,当然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。
为完成NIO的多线程,我们应该有一个调度类,一个服务类。
调度类的目的是初始化一定数量的线程,以及线程交接。
package com.netty.nionetty.pool; import com.netty.nionetty.NioServerBoss; import com.netty.nionetty.NioServerWorker; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; _/** _ * Created by Administrator on 2018-05-17. */ public class NioSelectorRunnablePool { private final AtomicInteger bossIndex = new AtomicInteger(); //欢迎线程数组 private Boss[] bosses; private final AtomicInteger workerIndex = new AtomicInteger(); //工作线程数组 private Worker[] workers; public NioSelectorRunnablePool(Executor boss,Executor worker) { initBoss(boss,1); initWorker(worker,Runtime.getRuntime().availableProcessors() * 2); } //初始化1个欢迎线程 private void initBoss(Executor boss,int count) { this.bosses = new NioServerBoss[count]; for (int i = 0;i < bosses.length;i++) { bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this); } } //初始化2倍计算机核数的工作线程 private void initWorker(Executor worker,int count) { this.workers = new NioServerWorker[count]; for (int i = 0; i < workers.length;i++) { workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this); } } //交接工作线程(从工作线程数组中挑出) public Worker nextWorker() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } //交接欢迎线程(从欢迎线程数组中挑出) public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
另外带一个欢迎线程接口,一个工作线程接口
package com.netty.nionetty.pool; import java.nio.channels.ServerSocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public interface Boss { void registerAcceptChannelTask(ServerSocketChannel serverChannel); }
package com.netty.nionetty.pool; import java.nio.channels.SocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public interface Worker { void registerNewChannelTask(SocketChannel channel); }
有两种线程(欢迎线程和工作线程),所以我们有一个抽象线程类
package com.netty.nionetty;
import com.netty.nionetty.pool.NioSelectorRunnablePool;
import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean;
_/**
_ * Created by Administrator on 2018-05-17. */ public abstract class AbstractNioSelector implements Runnable {
//线程池
private final Executor executor;
//NIO消息选择器
protected Selector selector;
protected final AtomicBoolean wakeUp = new AtomicBoolean();
//线程任务队列
private final Queue
public void run() {
Thread._currentThread_().setName(this.threadName);
while (true) { try { wakeUp.set(false); //把消息选择器的状态定为未唤醒状态 select(selector); //消息选择器选择消息方式 processTaskQueue(); //因为在主程序中绑定端口的时候已经注册了接收通道任务线程,所以这里是读出任务。 process(selector); //任务处理,欢迎线程跟工作线程各不相同 } catch (IOException e) { e.printStackTrace(); } } } //欢迎线程跟工作线程各自添加不同的线程,再把消息选择器唤醒 protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakeUp.compareAndSet(false,true)) { selector.wakeup(); } }else { taskQueue.remove(task); } } public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; }
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) { break; } task.run(); } } protected abstract int select(Selector selector) throws IOException; protected abstract void process(Selector selector) throws IOException; }
欢迎线程跟工作线程的具体实现
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; _/** _ * Created by Administrator on 2018-05-17. */ public class NioServerBoss extends AbstractNioSelector implements Boss { public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); } //注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型 public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { serverChannel.register(selector,SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); }
@Override
protected int select(Selector selector) throws IOException {
return selector.select();
}
//NIO操作,开始接收,接收后再启用工作线程,接收线程依然存在,而且工作线程也不断给到线程池未使用线程
//具体看初始化的时候初始了多少工作线程,但是是几个连接对应一个工作线程。
@Override
protected void process(Selector selector) throws IOException {
Set
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; _/** _ * Created by Administrator on 2018-05-17. */ public class NioServerWorker extends AbstractNioSelector implements Worker { public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); }
public void registerNewChannelTask(final SocketChannel channel) {
final Selector selector = this.selector;
registerTask(new Runnable() { public void run() { try { channel.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); }
@Override
protected int select(Selector selector) throws IOException { return selector.select(500); }
@Override
protected void process(Selector selector) throws IOException {
Set
服务类
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public class ServerBootstap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public void bind(final SocketAddress localAddress) { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(localAddress); Boss nextBoss = selectorRunnablePool.nextBoss(); nextBoss.registerAcceptChannelTask(serverChannel); } catch (IOException e) { e.printStackTrace(); } }
}
主程序
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.net.InetSocketAddress; import java.util.concurrent.Executors; _/** _ * Created by Administrator on 2018-05-17. */ public class Start { public static void main(String[] args) { NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }
其实最主要的就是在线程调度器中,各种线程已经被初始化存在于线程池内存中了,所以后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。