NIO如何多线程操作

Wesley13
• 阅读 712

因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。

为了加强NIO的性能,我们加入多线程的操作,当然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。

为完成NIO的多线程,我们应该有一个调度类,一个服务类。

调度类的目的是初始化一定数量的线程,以及线程交接。

package com.netty.nionetty.pool; import com.netty.nionetty.NioServerBoss; import com.netty.nionetty.NioServerWorker; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; _/** _ * Created by Administrator on 2018-05-17. */ public class NioSelectorRunnablePool { private final AtomicInteger bossIndex = new AtomicInteger();     //欢迎线程数组 private Boss[] bosses; private final AtomicInteger workerIndex = new AtomicInteger();     //工作线程数组 private Worker[] workers; public NioSelectorRunnablePool(Executor boss,Executor worker) { initBoss(boss,1); initWorker(worker,Runtime.getRuntime().availableProcessors() * 2); }     //初始化1个欢迎线程 private void initBoss(Executor boss,int count) { this.bosses = new NioServerBoss[count]; for (int i = 0;i < bosses.length;i++) { bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this); } }     //初始化2倍计算机核数的工作线程 private void initWorker(Executor worker,int count) { this.workers = new NioServerWorker[count]; for (int i = 0; i < workers.length;i++) { workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this); } }     //交接工作线程(从工作线程数组中挑出) public Worker nextWorker() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; }     //交接欢迎线程(从欢迎线程数组中挑出) public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }

另外带一个欢迎线程接口,一个工作线程接口

package com.netty.nionetty.pool; import java.nio.channels.ServerSocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public interface Boss { void registerAcceptChannelTask(ServerSocketChannel serverChannel); }

package com.netty.nionetty.pool; import java.nio.channels.SocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public interface Worker { void registerNewChannelTask(SocketChannel channel); }

有两种线程(欢迎线程和工作线程),所以我们有一个抽象线程类

package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; _/** _ * Created by Administrator on 2018-05-17. */ public abstract class AbstractNioSelector implements Runnable {     //线程池 private final Executor executor;     //NIO消息选择器 protected Selector selector; protected final AtomicBoolean wakeUp = new AtomicBoolean();     //线程任务队列 private final Queue taskQueue = new ConcurrentLinkedQueue();     //线程名 private String threadName;     //线程调度器 protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } private void openSelector() { try { this.selector = Selector.open(); //打开消息选择器 } catch (IOException e) { e.printStackTrace(); }         //把线程放入线程池,开始执行run方法 executor.execute(this); }

public void run() {
    Thread._currentThread_().setName(this.threadName);

while (true) { try { wakeUp.set(false); //把消息选择器的状态定为未唤醒状态 select(selector); //消息选择器选择消息方式 processTaskQueue(); //因为在主程序中绑定端口的时候已经注册了接收通道任务线程,所以这里是读出任务。 process(selector); //任务处理,欢迎线程跟工作线程各不相同 } catch (IOException e) { e.printStackTrace(); } } }     //欢迎线程跟工作线程各自添加不同的线程,再把消息选择器唤醒 protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakeUp.compareAndSet(false,true)) { selector.wakeup(); } }else { taskQueue.remove(task); } } public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; }

private void processTaskQueue() {
    for (;;) {
        final Runnable task = taskQueue.poll();

if (task == null) { break; } task.run(); } } protected abstract int select(Selector selector) throws IOException; protected abstract void process(Selector selector) throws IOException; }

欢迎线程跟工作线程的具体实现

package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; _/** _ * Created by Administrator on 2018-05-17. */ public class NioServerBoss extends AbstractNioSelector implements Boss { public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); }     //注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型 public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { serverChannel.register(selector,SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); }

@Override

protected int select(Selector selector) throws IOException { return selector.select(); }     //NIO操作,开始接收,接收后再启用工作线程,接收线程依然存在,而且工作线程也不断给到线程池未使用线程     //具体看初始化的时候初始了多少工作线程,但是是几个连接对应一个工作线程。 @Override protected void process(Selector selector) throws IOException { Set selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator i = selectedKeys.iterator();i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); Worker nextWorker = getSelectorRunnablePool().nextWorker(); nextWorker.registerNewChannelTask(channel); System.out.println("新客户端连接"); } } }

package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; _/** _ * Created by Administrator on 2018-05-17. */ public class NioServerWorker extends AbstractNioSelector implements Worker { public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); }

public void registerNewChannelTask(final SocketChannel channel) {
    final Selector selector = this.selector;

registerTask(new Runnable() { public void run() { try { channel.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); }

@Override

protected int select(Selector selector) throws IOException { return selector.select(500); }

@Override

protected void process(Selector selector) throws IOException { Set selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey)ite.next(); ite.remove(); SocketChannel channel = (SocketChannel)key.channel(); int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); try { ret = channel.read(buffer); failure = false; } catch (IOException e) { e.printStackTrace(); } if (ret <= 0 || failure) { key.cancel(); System.out.println("客户端断开连接"); }else { System.out.println("收到数据:" + new String(buffer.array())); ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer); } } } }

服务类

package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; _/** _ * Created by Administrator on 2018-05-17. */ public class ServerBootstap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public void bind(final SocketAddress localAddress) { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(localAddress); Boss nextBoss = selectorRunnablePool.nextBoss(); nextBoss.registerAcceptChannelTask(serverChannel); } catch (IOException e) { e.printStackTrace(); } }

}

主程序

package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.net.InetSocketAddress; import java.util.concurrent.Executors; _/** _ * Created by Administrator on 2018-05-17. */ public class Start { public static void main(String[] args) { NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }

其实最主要的就是在线程调度器中,各种线程已经被初始化存在于线程池内存中了,所以后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这