接前两篇关于NIO系列的学习文章:核心概念及基本读写 及缓冲区内部实现机制 ,本文继续探讨和学习连网和非阻塞IO相关的内容。
6. 连网和异步IO
1) 概述:
连网是学习异步I/O的很好基础,而异步I/O对于在Java语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO中的连网与NIO中的其他任何操作没有什么不同,它依赖通道和缓冲区,而您通常使用InputStream和OutputStream来获得通道。
本节首先介绍异步I/O的基础:它是什么以及它不是什么,然后转向更实用的、程序性的例子。
2) 异步 I/O
异步I/O是一种“没有阻塞地读写数据”的方法。通常,在代码进行read()调用时,代码会阻塞直至有可供读取的数据。同样, write()调用将会阻塞直至数据能够写入。 但异步I/O调用不会阻塞。相反,您可以注册对特定I/O事件的兴趣:如可读的数据的到达、新的套接字连接等等,而在发生这样的事件时,系统将会告诉您。
异步I/O的一个优势在于,它允许您同时根据大量的输入和输出执行I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步I/O,您可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。
我们来看一个基于非阻塞I/O的服务器端的处理流程,它接受网络连接并向它们回响它们可能发送的数据。在这里假设它能同时监听多个端口,并处理来自所有这些端口的连接。下面是其主方法:
private void execute () throws IOException {
// 创建一个新的选择器
Selector selector = Selector.open();
// 打开在每个端口上的监听,并向给定的选择器注册此通道接受客户端连接的I/O事件。
for (int i = 0; i < ports.length; i++) {
// 打开服务器套接字通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置此通道为非阻塞模式
ssc.configureBlocking(false);
// 绑定到特定地址
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
ss.bind(address);
// 向给定的选择器注册此通道的接受连接事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Going to listen on " + ports[i]);
}
while (true) {
// 这个方法会阻塞,直到至少有一个已注册的事件发生。
// 当一个或者更多的事件发生时,此方法将返回所发生的事件的数量。
int num = selector.select();
// 迭代所有的选择键,以处理特定的I/O事件。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
SocketChannel sc;
while (iter.hasNext()) {
SelectionKey key = iter.next();
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
// 接受服务器套接字撒很能够传入的新的连接,并处理接受连接事件。
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
sc = ssc.accept();
// 将新连接的套接字通道设置为非阻塞模式
sc.configureBlocking(false);
// 接受连接后,在此通道上从新注册读取事件,以便接收数据。
sc.register(selector, SelectionKey.OP_READ);
// 删除处理过的选择键
iter.remove();
System.out.println("Got connection from " + sc);
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
// 处理读取事件,读取套接字通道中发来的数据。
sc = (SocketChannel) key.channel();
// 读取数据
int bytesEchoed = 0;
while (true) {
echoBuffer.clear();
int r = sc.read(echoBuffer);
if (r == -1) {
break;
}
echoBuffer.flip();
sc.write(echoBuffer);
bytesEchoed += r;
}
System.out.println("Echoed " + bytesEchoed + " from " + sc);
// 删除处理过的选择键
iter.remove();
}
}
}
}
下面我们就此例来一步一步的学习异步IO的相关知识。
3) Selectors
Selector是异步I/O中的核心对象。Selector就是您注册对各种I/O事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。所以,我们需要做的第一件事就是创建一个Selector:
Selector selector = Selector.open();
然后,我们将对不同的通道对象调用register()方法,以便注册我们对这些对象中发生的I/O事件的兴趣。register()的第一个参数就是这个Selector对象。
4) 打开一个ServerSocketChannel
在服务端为了接收连接,我们需要一个ServerSocketChannel。 事实上,我们要监听的每一个端口都需要有一个ServerSocketChannel。对于每一个端口,我们打开一个ServerSocketChannel, 如下所示:
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address );
第一行创建一个新的ServerSocketChannel,最后三行将它绑定到给定的端口。第二行将ServerSocketChannel设置为非阻塞的。我们必须对每一个要使用的套接字通道调用这个方法,否则异步I/O就不能工作。
5) 选择键
下一步是将新打开的ServerSocketChannels注册到Selector上。为此我们使用ServerSocketChannel.register()方法,如下所示:
SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
register()方法的第一个参数总是这个Selector。第二个参数是OP_ACCEPT,这里它指定我们想要监听accept事件,也就是在新的连接建立时所发生的事件。这是适用于ServerSocketChannel的唯一事件类型。
请注意对register()的调用的返回值。SelectionKey代表这个通道在此Selector上的这个注册。当某个Selector通知您某个传入事件时,它是通过提供对应于该事件的SelectionKey来进行的。SelectionKey还可以用于取消通道的注册。
6) 内部循环
现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... 处理I/O事件...
}
首先,我们调用Selector的select()方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时,select()方法将返回所发生的事件的数量。
接下来,我们调用Selector的selectedKeys()方法,它返回发生了事件的SelectionKey对象的一个集合。
我们通过迭代SelectionKeys并依次处理每个SelectionKey来处理事件。对于每一个SelectionKey,您必须确定发生的是什么I/O事件,以及这个事件影响哪些I/O对象。
7) 监听新连接
程序执行到这里,我们仅注册了ServerSocketChannel, 并且仅注册它们“接收”事件。为确认这一点,我们对SelectionKey调用readyOps()方法,并检查发生了什么类型的事件:
if ((key.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT) {
// ...
}
可以肯定地说,readOps()方法告诉我们该事件是新的连接。
8) 接受新的连接
因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心accept()操作会阻塞:
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
下一步是将新连接的SocketChannel配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将SocketChannel注册到Selector上,如下所示:
sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
注意我们使用register()的OP_READ参数,将SocketChannel注册用于“读取”而不是“接受”新连接。
9) 删除处理过的SelectionKey
在处理SelectionKey之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的SelectionKey从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的remove()方法来删除处理过的SelectionKey:
it.remove();
现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的I/O事件)了。
10) 传入的I/O
当来自一个套接字的数据到达时,它会触发一个I/O事件。这会导致在主循环中调用Selector.select(),并返回一个或者多个I/O事件。这一次, SelectionKey将被标记为OP_READ事件,如下所示:
} else if ((key.readyOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel)key.channel();
// ...
}
与以前一样,我们取得发生I/O事件的通道并处理它。在本例中,由于这是一个echo server,我们只希望从套接字中读取数据并马上将它发送回去。关于这个过程的细节,请参见附件中的源代码 (MultiPortEcho.java)。
11) 回到主循环
每次返回主循环,我们都要调用select的Selector()方法,并取得一组SelectionKey。每个键代表一个I/O事件。我们处理事件,从选定的键集中删除SelectionKey,然后返回主循环的顶部。
说明: 这个程序有点过于简单,因为它的目的只是展示异步I/O所涉及的技术。在现实的应用程序中,您需要通过将通道从Selector中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责I/O事件处理中的耗时部分会更有意义。
后续: 到此,我们已学习了NIO的核心内容,在下一篇文章中,会介绍NIO提供的一些其他特性,如:缓冲区的分片、包装,分散和聚集、文件锁定、字符集等知识。有兴趣的可以共同学习、讨论。