简介
引入NIO的原因
- 因为BIO都是阻塞的IO,为了使Java能支持非阻塞I/O,JDK引入了NIO,可以将NIO理解成是Non-block I/O.(也有书说是new IO)
- BIO编程中,每当有一个新的客户端请求过来时,服务器端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理一个客户端连接,在并发量大的连接场景下,使用BIO的性能会非常低。
基本概念
BIO是基于字节流和字符流进行操作的,而NIO是基于通道Channel和缓冲区Buffer进行操作的,数据从通道读取到缓冲区中,或者从缓冲区写入到通道中。
NIO的类库位于java.lang.nio下,其中有如下一些基本的概念:
通道Channel:通道的作用与BIO中的流类似,主要不同的地方在于:
- Channel是双向的,支持同时读写操作,而Stream只能是单向的。
- 通道可以异步地读写。
- 通道中的数据总是要先读到一个Buffer,或者从一个Buffer中写入。 Channel又可以分为两大类——用于网络读写的SelectableChannel和用于文件操作的FileChannel。
缓冲区Buffer:在BIO中,可以直接将数据写入或者直接读取到流中,也可以通过装饰类添加缓冲的功能;而在NIO中,所有的数据都是用缓冲区处理的,任何时候使用NIO读取或者写入数据都是通过缓冲区进行的。缓冲区本质上是一块可以读写数据的内存,这块内存被包装成NIO Buffer对象,并提供了一组方法来访问该块内存。
分散/聚集scatter/gather:分散和聚集是用来描述从通道中读取或者写入通道的操作。分散从通道中读取是指读操作时将读取的数据写入多个缓冲区中;聚集写入通道是指写入操作时将多个缓冲区的数据写入到同一个通道中。分散/聚集通常用于需要将传输数据分开处理的场合,如传输一个消息可以将消息头和消息体分散到不同的buffer中。
选择器Selector:selector模型是NIO编程的基础,多路复用器Selector通过不断地轮询已经注册过的通道,检测出就绪的通道集合,从而可以实现一个线程管理多个通道,管理多个网络连接。
常用Channel实现与基本示例
java.nio包中常用的Channel实现类有:
- FileChannel:从文件中读写数据
- DatagramChannel:通过UDP读写网络中的数据
- SocketChannel:通过TCP读写网络中的数据
- ServerSocketChannel:监听新进来的TCP连接,对每一个新进来的连接都会创建一个SocketChannel
FileChannel示例
FileChannel无法设置为非阻塞模式,只能运行在阻塞模式下。使用FileChannel的几个基本步骤包括:打开FileChannel、从FileChannel读写数据、关闭FileChannel。
写入数据到文件
private static final int BUF_SIZE = 1024;
public static void main(String[] args) { // 打开FileChannel需要通过FileIntputStream或FileOutputStream或RandomAccessFile try (FileOutputStream out = new FileOutputStream(new File("d:\test.txt")); FileChannel channel = out.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); buffer.put("NIO学习:NIO FileChannel Demo".getBytes());// 先往buffer中写入数据 buffer.flip();// 调转buffer中读写指针position的位置 printBuffer(buffer); //打印buffer中内容 channel.write(buffer); // 将buffer中数据写入channel } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
private static void printBuffer(ByteBuffer buffer) { try { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer cBuf = decoder.decode(buffer); buffer.flip(); System.out.println(cBuf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
从文件中读数据
private static final int BUF_SIZE = 1024;
public static void main(String[] args) { //channel关联文件——>通过channel读取数据到buffer try (FileInputStream in = new FileInputStream(new File("d:\test.txt")); FileChannel channel = in.getChannel();) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); channel.read(buffer); buffer.flip(); printBuffer(buffer); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
private static void printBuffer(ByteBuffer buffer) { Charset charset = Charset.forName("UTF-8"); CharsetDecoder decoder = charset.newDecoder(); try { CharBuffer buf = decoder.decode(buffer); buffer.flip(); System.out.println(buf.toString()); } catch (CharacterCodingException e) { e.printStackTrace(); } }
**close()**:使用完FileChannel需要关闭channel,为简洁代码可以使用try-with-resources语法。
**position()**:position()方法可以获取FileChannel当前的位置,而position(long pos)可以设置FileChannel当前位置,但是如果将位置设置在文件结束符之后,调用position()将返回-1;调用position(pos)写入数据,则会把文件撑大到当前位置并写入数据,这样会导致磁盘上物理文件中写入的数据间有空隙。
long pos = channle.position(); channel.position(100L);
**size()**:channel.size()方法返回的是channel关联文件的大小。
truncate(long size):truncate()方法用来截取文件,此时文件中指定长度后面的部分将会被删除,如:
channel.truncate(100);// 将会保留前100个字节
**force(boolean flag)**:一般情况下出于性能考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel中的数据一定会立即写到磁盘上,此时,如果调用force()方法则能强制将channel中的数据立即写入磁盘。
DatagramChannel示例
DatagramChannel是用来收发UDP包的通道,因为UDP是无连接的网络协议,所以DatagramChannel收发的是UDP数据包。
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
// 打开DatagramChannel,监听UDP9999端口
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
ByteBuffer buffer = ByteBuffer.allocate(100);
// 通过channel的recevice方法接收UDP数据包
channel.receive(buffer);
buffer.flip();
printBuffer(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000); // 服务端先启动
new Thread(() -> {
try (DatagramChannel channel = DatagramChannel.open();) {
ByteBuffer buf = ByteBuffer.allocate(100);
buf.clear();
buf.put("DatagramChannel Demo".getBytes());
// 发送数据前注意要把buffer的position置为0
buf.flip();
// 调用send方法发送到指定IP地址的指定端口
channel.send(buf, new InetSocketAddress("localhost", 9999));
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
由于UDP是无连接的,当指定连接的IP地址或域名时并不会创建一个真正的连接,而是锁住了DatagramChannel,只能从锁定的地址收发数据,并且数据传送没有可靠性保证。
SocketChannel&ServerSocketChannel示例
public static void main(String[] args) throws InterruptedException {
Thread server = new Thread(() -> {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.socket().bind(new InetSocketAddress(44593));
SocketChannel socket = channel.accept();
ByteBuffer buffer = ByteBuffer.allocate(1024);
socket.read(buffer);
buffer.flip();
printBuffer(buffer);
} catch (IOException e) {
e.printStackTrace();
}
});
Thread client = new Thread(() -> {
try (SocketChannel channel = SocketChannel.open()) {
channel.socket().connect(new InetSocketAddress("127.0.0.1", 44593));
ByteBuffer buffer = ByteBuffer.allocate(1034);
buffer.put("socket channle demo".getBytes());
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
});
//启动顺序不影响结果
server.start();
client.start();
}
Buffer
Buffer作为数据的读写缓冲区,具备读和写两种模式。
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
......
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
......
}
capacity、position、limit、mark、flip()、clear()
capacity:buffer的容量,在申请buffer时指定大小,是固定不变的。
limit:buffer可以使用的上限——写模式下表示最多能往buffer中写入数据的边界,初始化时limit等于capacity;调用flip()切换为读模式后limit会等于当前的position;表示能读到的数据边界。当一次读写操作完成后,limit的值可能不会等于capacity,存在内存泄露的情况(这个不知道算不算设计不够友好),避免这种情况要在每一次读写操作完成后执行clear()方法清空buffer。
position:position可以看成是一个读写指针,指示当前读或写的位置,随put/get方法自动更新,当buffer中的数据准备好了,需要从写模式切换为读模式时,需要调用buffer.flip()方法,可以看到flip()方法会将当前写的最后一个位置赋值给limit,然后将position切换为0,即变成从0位置开始读,可以读到limit位置,反之从读模式切换为写模式也是如此。
mark: mark用来标记某个时刻的一个position,通过调用buffer.mark()方法可以记录当前的position,之后能通过buffer.reset()恢复到这个position。mark默认值是-1,并且其值必须小于position,如果调用buffer.position(index)时传入的index比mark小,则会将mark设置为-1使暂存的位置失效。
这4个属性的大小关系是$mark <= position <= limit <= capacity $
flip():flip方法用来切换读写模式:当buffer处于写模式时,每往buffer中写入一个数据position加1,调用flip()切换为读取模式时,会将当前的position赋值给limit,再把position赋值为0,这样就可以从索引为0的位置读取到limit处。当buffer处于读模式时,每往buffer中读出一个数据position-1,直到position等于limit时数据消费完。
clear():每完成一次读写后必须调用clear()方法才能再次使用buffer,否则可能造成内存泄露,因为当读取完数据调用flip()方法时limit不一定等于capacity,会使buffer的可用内存小于申请的内存大小。clear()方法会将position重置为0,将limit重置为capacity,mark重置为-1。
public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put("abcde".getBytes()); buffer.flip(); while (buffer.hasRemaining()) { System.out.print(buffer.get() + " "); } buffer.flip(); try { buffer.put("abcdef".getBytes()); //没有clear()将抛出BufferOverflowException } catch (Exception e) { e.printStackTrace(); } }
Selector
register
一个selector可以注册多个channel,并且seletor要求channel必须工作在非阻塞模式下,因此FileChannel不能结合selector使用,同时注册的channel需要调用 channel.configureBlocking(flase);
设置为非阻塞通道。
channel声明了一个抽象方法用来注册channel到selector:
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
SelectionKey:SelectionKey包含了许多有用的属性,如interest集合、ready集合、channel对象、selector对象等;通过SelectionKey返回值,可以进行各种操作。
ops:ops是一个int类型数,其实质表示的是事件类型的集合,即channel注册selector时告诉selector其对哪些事件感兴趣。SelectionKey中只定义了四种事件类型,分别用四个常量表示:
- SelectionKey.OP_CONNECT:接受请求操作
- SelectionKey.OP_ACCEPT :连接操作
- SelectionKey.OP_READ:读取操作
- SelectionKey.OP_WRITE:写操作
并且channel也不是四种事件都能注册,不同的channel只能注册validOps()方法中有定义的事件。
SocketChannel
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); }
DataGramchannel
public final int validOps() { return (SelectionKey.OP_READ | SelectionKey.OP_WRITE); }
ServerSocketChannel
public final int validOps() { return SelectionKey.OP_ACCEPT; }
att:att是一个附加的对象,可以不指定,也可以让我们更灵活的将更多的信息附加到SelectionKey上,比如attach一个buffer、attach一个唯一标识等等。
select
注册了selector的channel便能将原本由自己调用accept的工作交由selector来代替。selector通过select()方法根据channel注册时所关联的感兴趣的事件返回准备就绪的channel。此时,原本阻塞在channel.accept()上的操作变成了阻塞在selector.select()上。
当select()返回值大于0时,说明有channel准备就绪了,进一步处理可以按以下步骤进行:
- 调用selectKeys()方法获得就绪通道的键集
Set keys = selector.selectedKeys();
; - 遍历键集并检测每个键对应的channel所属的就绪事件;
- 使用SelectionKey.channel()方法获得具体的channel类型对数据进行处理;
- 处理完毕之后将已处理的键值从键集中移除。
selectNow() & wakeUp()
selectNow()和select()不同之处在于前者不会阻塞当前线程,而是直接返回。
wakeUp()是用来唤醒被select()阻塞的线程的,有的时候select()阻塞的线程,我们不想其一直被阻塞,而是一段时间内如果没有通道就绪就继续执行,那么这个时候可以在另外一个线程里调用selector.wakeUp(),但是这里有个“坑”就是如果当前的selector没有被阻塞在select上,那么下一次调用该selector对象的select方法会被立即唤醒。
简单示例
public class Server {
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(44593));
channel.register(selector, channel.validOps());
while (true) {
while (selector.select() == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100);
client.read(buffer);
buffer.flip();
BufferUtil.printBuffer(buffer);
client = (SocketChannel) key.channel();
client.register(selector, SelectionKey.OP_READ);
}
keys.clear();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Client {
public static void main(String[] args) {
try (Scanner sc = new Scanner(System.in);
Socket socket = new Socket("127.0.0.1", 44593);) {
String input = sc.nextLine();
while (input != null && !"".equals(input.trim())) {
socket.getOutputStream().write(input.getBytes());
input = sc.nextLine();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}