NIO非阻塞的IO

Wesley13
• 阅读 590

NIO非阻塞的IO

1、TimeClient(客户端)

/**
 * @FileName TimeClient.java
 * @Description: 
 *
 * @Date 2016年3月1日 
 * @author Administroter
 * @version 1.0
 * 
 */
public class TimeClient {
 public static void main(String[] args) {
  int port = 8080;
  if (args != null && args.length>0) {
   try {
    port = Integer.valueOf(args[0]);
   } catch (NumberFormatException e) {
    //采用默认的值
    e.printStackTrace();
   }
  }
  new Thread(new TimeClientHandle("127.0.0.1", port),"TimeClient-001").start();;
 }
}

2、TimeClientHandle(客户端)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
 * @FileName TimeClientHandle.java
 * @Description: 
 *
 * @Date 2016年2月29日 
 * @author Administroter
 * @version 1.0
 * 
 */
public class TimeClientHandle implements Runnable{
 private String host;
 private int port;
 private Selector selector;
 private SocketChannel socketChannel;
 private volatile boolean stop;
 
 /**
  * Title: <br />
  * Description: 构造函数,初始化参数
  * @param host
  * @param port
  */
 public TimeClientHandle(String host,int port) {
  //服务器地址(IP)
  this.host = host == null ? "127.0.0.1" : host;
  //端口号
  this.port = port;
  try {
   //打开多路复用器
   selector = Selector.open();
   //打开管道
   socketChannel = SocketChannel.open();
   //设置管道为异步非阻塞的
   socketChannel.configureBlocking(false);
  } catch (IOException e) {
   //正常结束当前的java虚拟机
   System.exit(1);
   e.printStackTrace();
  }
 }
 
 public void run() {
  try {
   //建立与服务器的链接
   this.doConnect();
  } catch (IOException e) {
   System.exit(1);
   e.printStackTrace();
  }
  while(!stop){
   try {
    //一秒唤醒一次线程
    selector.select(1000);
    Set<SelectionKey> selectionKey = selector.selectedKeys();
    Iterator<SelectionKey> it = selectionKey.iterator();
    SelectionKey key = null;
    //迭代就绪的管道
    while(it.hasNext()){
     key = it.next();
     it.remove();
     try {
      //执行读写的操作
      this.handleInput(key);
     } catch (Exception e) {
      if (key != null) {
       key.cancel();
       if (key.channel() != null) {
        key.channel().close();
       }
      }
     }
    }
   } catch (IOException e) {
    System.exit(1);
    e.printStackTrace();
   }
  }
  if (selector != null) {
   try {
    selector.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }
 /**
  * @Title: handleInput 
  * @Description:服务器和客户端的读写操作
  * @param key
  * @throws IOException 
  * @author Administroter
  * @date 2016年3月1日
  */
 private void handleInput(SelectionKey key) throws IOException{
  //判断是否处于链接状态
  if (key.isValid()) {
   SocketChannel sc = (SocketChannel)key.channel();
   //判断客户端是否链接成功
   if (sc.finishConnect()) {
    //管道注册到多路复用器,监听网络读操作
    sc.register(selector, SelectionKey.OP_READ);
    this.doWrite(sc);
   }else{
    //IO异常,结束进程
    System.exit(1);
   }
   System.out.println("如果"+key.isReadable()+"开始读取服务器返回结果!");
   if (key.isReadable()) {
    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    int readBytes = sc.read(readBuffer);
    if (readBytes>0) {
     /**
      * 将当前缓冲区的limit设置为position = 0,用于后续对缓冲区的读取操作
      * (将缓存字节数组的指针设置为数组的开始序列即数组下标0,这样就可以从buffer开头,对该buffer进行遍历(读取)了)
      * 如果这里不进行设置,读取出来的会是空的
      */
     readBuffer.flip();
     byte[] bytes = new byte[readBuffer.remaining()];
     readBuffer.get(bytes);
     String body = new String(bytes,"utf-8");
     System.out.println("服务器响应结果:"+body);
     this.stop = true;
    }else if(readBytes<0){
     key.cancel();
     sc.close();
    }else{
     
    }
   }
  }
 }
 /**
  * @Title: doConnect 
  * @Description:链接到服务器
  * @throws IOException 
  * @author Administroter
  * @date 2016年3月1日
  */
 private void doConnect() throws IOException{
  //判断链接成功
  if (socketChannel.connect(new InetSocketAddress(host,port))) {
   //将管道注册OP_READ到多路复用器上
   socketChannel.register(selector, SelectionKey.OP_READ);
   //将客户端请求通过管道写出,转到服务器
   this.doWrite(socketChannel);
  }else{
   //将管道注册OP_CONNECT到多路复用器上
   socketChannel.register(selector, SelectionKey.OP_CONNECT);
  }
 }
 /**
  * @Title: doWrite 
  * @Description:编写发送服务器的码流
  * @param sc
  * @throws IOException 
  * @author Administroter
  * @date 2016年3月1日
  */
 private void doWrite(SocketChannel sc) throws IOException{
  //请求服务器的参数
  byte[] request = "我要获取服务器当前时间".getBytes();
  ByteBuffer writeBuffer = ByteBuffer.allocate(request.length);
  writeBuffer.put(request);
  //使得数据从buffer的开头遍历读取
  writeBuffer.flip();
  //写出是异步的,可能存在半写包的情况
  sc.write(writeBuffer);
  //判断缓冲区的消息是否全部发送完成
  if (!writeBuffer.hasRemaining()) {
   System.out.println("缓冲区消息全部发送完成");
  }
 }

}

3、TimeService(服务端)

/**
 * @FileName TimeService.java
 * @Description: 
 *
 * @Date 2016年2月29日 
 * @author Administroter
 * @version 1.0
 * 
 */
public class TimeService {
 public static void main(String[] args) {
  int port = 8080;
  if (args != null && args.length>0) {
   try {
    port = Integer.valueOf(args[0]);
   } catch (NumberFormatException e) {
    //采用默认的值
    e.printStackTrace();
   }
  }
  TimeServerHandle timeServer = new TimeServerHandle(port);
  //MultiplexerTimeServer是一个独立的线程,用户轮询多路复用器Selctor,可以处理多个客户的并发接入
  new Thread(timeServer,"TimeServer-001").start();
 }
}

4、MultiplexerTimeServer(服务端)

/*

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import com.lxf.netty.common.StrKit;
/**
 * @FileName MultiplexerTimeServer.java
 * @Description: NIO 多路复用类:负责轮询多路复用器Selctor,处理多个客户端的并发接入
 *
 * @Date 2016年2月29日 
 * @author Administroter
 * @version 1.0
 * 
 */
public class TimeServerHandle implements Runnable{
 //多路复用器
 private Selector selector;
 //通道
 private ServerSocketChannel servChannel;
 private volatile boolean stop;
 /**
  * Title: <br />
  * Description: 构造方法,资源初始化,创建Selector,ServerSocketChannel,对Channel和TCP参数进行设置
  * @param port 端口号
  */
 public TimeServerHandle(int port) {
  try {
   //打开多路复用器
   selector = Selector.open();
   //打开管道
   servChannel = ServerSocketChannel.open();
   //设置管道为非阻塞的
   servChannel.configureBlocking(false);
   //绑定主机,并设置backlog为1024
   servChannel.socket().bind(new InetSocketAddress(port), 1024);
   //将管道注册到复用器上,并监听SelectionKey.OP_ACCEPT操作位
   servChannel.register(selector, SelectionKey.OP_ACCEPT);
   System.out.println("开启时间服务器,端口号:"+port);
  } catch (IOException e) {
   //正常结束当前正在运行中的java虚拟机
   System.exit(1);
   e.printStackTrace();
  }
 }
 public void stop(){
  this.stop = true;
 }
 /**
  * 循环遍历多路复用器,当有处于就绪状态channel时,复用器将会返回就绪状态的channel
  */
 public void run() {
  while(!stop){
   try {
    //设置休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都会被唤醒一次
    selector.select(1000);
    //获取selector含有就绪状态的channel集合
    Set<SelectionKey> selectedKey = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKey.iterator();
    SelectionKey key = null;
    //迭代channel集合
    while(it.hasNext()){
     key = it.next();
     it.remove();
     try {
      //进行网络异步的读写操作
      this.handleInput(key);
     } catch (Exception e) {
      System.out.println(e.getMessage()+"-----------"+key);
      if(key != null){
       key.cancel();
       if (key.channel() != null) {
        //关闭Channel
        key.channel().close();
       }
      }
     }
    }
   } catch (Throwable t) {
    t.printStackTrace();
   }
  }
  //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
  if (selector != null) {
   try {
    //关闭多路复用器
    selector.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }
 /**
  * @Title: handleInput 
  * @Description:网络异步的读写操作
  * @param key
  * @throws IOException 
  * @author Administroter
  * @date 2016年2月29日
  */
 public void handleInput(SelectionKey key)throws IOException{
  System.out.println("判断是否链接成功:"+key.isValid());
  if(key.isValid()){
   //处理新接入的请求消息,根据key判断操作位判断网络事件的类型
   if (key.isAcceptable()) {
    //接收客户端的请求
    ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
    //创建SocketChannel实例
    SocketChannel sc = ssc.accept();
    //设置异步非阻塞
    sc.configureBlocking(false);
    sc.register(selector, SelectionKey.OP_READ);
   }
   if (key.isReadable()) {
    //读取客户端请求消息
    SocketChannel sc = (SocketChannel)key.channel();
    //创建ByteBuffer并设置大小为1K(此处大小可以更改,根据客户端发送的码流大小决定)
    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    //读取请求的码流(前面设置了SocketChannel为异步非阻塞的,所以这里的read是非阻塞的)
    int readBytes = sc.read(readBuffer);
    //判断读取的字节数(等于0-没有读到字节,属于正常场景,忽略,大于0-读到字节,对字节进行编码或者解码,等于-1-链路已经关闭,需要关闭SocketChannel,释放资源)
    if (readBytes > 0) {
     /**
      * 将当前缓冲区的limit设置为position = 0,用于后续对缓冲区的读取操作
      * (将缓存字节数组的指针设置为数组的开始序列即数组下标0,这样就可以从buffer开头,对该buffer进行遍历(读取)了)
      * 如果这里不进行设置,读取出来的会是空的
      */
     readBuffer.flip();
     //创建字节数据
     byte[] bytes = new byte[readBuffer.remaining()];
     readBuffer.get(bytes);
     String body = new String(bytes,"utf-8");
     System.out.println("接收到的客户端请求的数据:"+body);
     //判断客户端指令如果为QUERY TIME ORDER,则将系统当前的时间返回给客户端,此处也就相当于我们的业务逻辑处理部分
     Date date = new Date();
     String currentTime = StrKit.getDateToString(date);
     String temp = "我要获取服务器当前时间".equalsIgnoreCase(body)?currentTime:"指令不一致";
     this.doWrite(sc, temp);
    }else if(readBytes < 0){
     key.cancel();
     sc.close();
    }else{
     
    }
   }
  }
 }
 private void doWrite(SocketChannel channel,String response)throws IOException{
  if (response != null && response.trim().length()>0) {
   //将服务器响应的字符串编码成字节数组
   byte[] bytes = response.getBytes();
   //根据字节数组的容量创建ByteBuffer
   ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
   //将字节数组复制到缓冲区
   writeBuffer.put(bytes);
   //使得数据从buffer的开头遍历读取
   writeBuffer.flip();
   //调用SocketChannel管道的write方法将客户端响应的字符串信息发送出去
   channel.write(writeBuffer);
  }
 }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这