NIO和IO
NIO的四个关键数据类型
- Buffer:它包含数据且用于读写的线性表结构,还提供一个特殊类用于内存映射的I/O操作。
- Charset:提供Unicode字符串映射到字节序列以及逆映射的操作。
- Channels:包含socket,file和pip三种,是一种双向交通的通道。
- Selectors:将多元异步I/O操作集中到一个或多个线程中(类似于linux的select函数)
传统IO
服务端:
ServerSocket server = new ServerSocket(1000);
Socket conn = server.accept();
InputStream in = conn.getInputStream();
InputStreamReader reader = new BufferedReader(reader);
Request request = new Request();
while(!request.isComplete()){
String line = reader.readLine();
request.addLine(line);
}
上述操作有两个问题:
- BufferedReader类的readLine() 方法在其缓冲未满时,会一直阻塞,只有一定的数据填满缓冲或者client关闭连接,此方法才能返回。
- BufferedReader会产生大量的垃圾需要GC。BufferedReader需要创建缓冲区来从client读取数据,但是同样创建了一些字符串存储这些数据。
BufferedReader默认有$2^{13}$次方字符的缓冲大小。
类似的,在处理写操作时,一次写入一个字符,效率很低,也需要使用缓冲写,但是这也会插死你横更多的垃圾。
在传统的I/O中要使用大量的线程,通常使用线程池实现来处理请求。 但是即使这样,还是会有很多时间阻塞在I/O上,没有有效的利用CPU
NIO
Buffer
传统的I/O使用String来操作,浪费资源,新I/O通过使用Buffer读写数据避免浪费。
Buffer对象是线性的,有序的数据集合,他根据其类别只包含唯一的数据类型。
java.nio.Buffer
: 类描述java.nio.ByteBuffer
:字节类型。可以从ReadableByteChannel
中读,在WritableByteChannel
中写java.nio.CharBuffer
:字符类型,不能写入通道java.nio.DoubleBuffer
:double类型,不能写入通道java.nio.FloatBuffer
:float类型java.nio.IntBuffer
:int类型java.nio.LongBuffer
:long类型java.nio.ShortBuffer
:short类型
可以使用allocate(int capacity)
方法或者allocateDirect(int capacity)
方法分配一个Buffer。
特别的,可以通过调用FileChannel.map(int mode, long position, int size)
创建MappedByteBuffer
。
Direct Buffer 在内存中分配一段连续的块并使用本地访问方法读写数据。non direct Buffer用过java中的数组读写数据。
有时间必须使用非直接的缓冲,例如使用任何wrap方法(如ButeBuffer.wrap(byte[]))在java数据自出上创建buffer。
字符编码
向ByteBuffer
中存放数据涉及两个问题:字节的顺序和字符转换。ByteBuffer
内部通过ByteOrder
类处理了字节顺序问题,但是并未解决字符转换的问题。ByteBuffer没有提供方法读写String。
java.nio.charset.Charset
处理字符转换的问题。通过构造CharsetEncoder和CharsetDecoder将字符序列转为字节和逆转换。
通道
java.io
类中没有一个类可以读写Buffer类型,nio提供Channel读写Buffer。channel可以认为是一种连接,可以使到特定的设备,程序或者是网络。 channel类的等级结构如下: Channel(interface)->ReadableByteChannel(interface)->ScatteringByteChannel(interface) Channel(interface)->WritableByteChannel(interface)->GatherByteChannel(interface)
ByteChannel(interface)继承自: ReadableByteChannel(interface) WritableByteChannel(interface)
GatherByteChannel可以一次将多个Buffer中的数据写入通道,相反的ScatteringByteChannel可以一次将数据从通道中读入多个buffer中。还可以设置通道使其为阻塞或非阻塞I/O操作服务。
为了使通道与传统I/O兼容,Channel提供了静态的Stream或Reader。
Selector
在过去的阻塞I/O中,我们一般知道什么时候可以向stream中读或写,因为方法调用直到stream准备好时返回。但是使用非阻塞通道,我们需要一些方法来知道什么时候通道准备好了。在NIO包中,设计Selector就是为了这个目的。
SelectableChannel
可以注册特定的事件,而不是在事件发生时通知应用,通道跟踪事件。然后,当应用调用Selector上的任意一个selection方法时,它查看注册了的通道看是否有任何感兴趣的事件发生。
并不是所有的通道都支持所有的操作。SelectionKey
类定义了所有可能的操作位,将要用两次。
- 当应用调用
SelectableChannel.register(Selector sel,int op)
方法注册通道时,它将所需操作作为第二个参数传递到方法中。 - 一旦
SelectionKey
被选中了,SelectionKey
的readyOps()
方法返回所有通道支持操作的位数的和。SelectableChannel
的validOps
方法返回每个通道允许的操作。
注册通道不支持的操作将引发
IllegalArgumentException
异常.
SelectableChannel
子类支持的操作:
ServerSocketChannel OP_ACCEPT
SocketChannel OP_CONNECT, OP_READ, OP_WRITE
DatagramChannel OP_READ, OP_WRITE
Pipe.SourceChannel OP_READ
Pipe.SinkChannel OP_WRITE
例子
- 简单网页内容下载
- 简单加法服务器和客户端
- 非阻塞加法服务器
简单网页下载
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class WebDownload {
private final static Charset charset = Charset.forName("UTF-8");
private SocketChannel clientChannel;
public void download() {
connect();
sendRequest();
readResponse();
}
//发送GET请求到CSDN的文档中心
private void sendRequest() {
//使用channel.write方法,它需要CharByte类型的参数,使用
//Charset.encode(String)方法转换字符串。
try {
clientChannel.write(charset.encode("GET / HTTP/1.1\r\n\r\n"));
} catch (IOException e) {
e.printStackTrace();
}
}
private void readResponse(){
ByteBuffer buff = ByteBuffer.allocate(1024);//创建1024字节的缓冲
try {
// -1 if the channel has reached end-of-stream
while(clientChannel.read(buff)!=-1){
buff.flip();//flip方法在读缓冲区字节操作之前调用。
System.out.println(charset.decode(buff));
buff.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private boolean connect() {
InetSocketAddress socketAddr = new InetSocketAddress("www.baidu.com", 80);
try {
clientChannel = SocketChannel.open();
clientChannel.connect(socketAddr);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
new WebDownload().download();
}
}
简单加法服务器和客户端
server端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class AddServer {
private ServerSocketChannel server = null;
private SocketChannel client = null;
private ByteBuffer buff = ByteBuffer.allocate(8);
private IntBuffer intBuff = buff.asIntBuffer();
public void connect(){
try {
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(80));
System.out.println("channel open!");
} catch (IOException e) {
e.printStackTrace();
}
}
public void waitForConnection(){
try {
client = server.accept();
if(client!=null){
System.out.println("client connect!");
processRequest();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void processRequest(){
buff.clear();
try {
client.read(buff);
int result = intBuff.get(0)+intBuff.get(1);
buff.flip();
buff.clear();
intBuff.put(0, result);
client.write(buff);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run(){
this.connect();
this.waitForConnection();
this.processRequest();
}
/**
* [@param](http://my.oschina.net/u/2303379) args
*/
public static void main(String[] args) {
new AddServer().run();
}
}
client 端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.SocketChannel;
public class AddClient {
private SocketChannel client = null;
private ByteBuffer buff = ByteBuffer.allocate(8);
private IntBuffer intBuff = buff.asIntBuffer();
public void connect() {
try {
client = SocketChannel.open();
client.connect(new InetSocketAddress("localhost", 80));
} catch (IOException e) {
e.printStackTrace();
}
}
public void request(int a, int b) {
buff.clear();
intBuff.put(0, a);
intBuff.put(1, b);
try {
client.write(buff);
System.out.println("send request :" + a + "+" + b);
} catch (IOException e) {
e.printStackTrace();
}
}
public int getresult() {
buff.clear();
int result = 0;
try {
client.read(buff);
result = buff.getInt(0);
} catch (IOException e) {
e.printStackTrace();
}
finally{
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return result;
}
public int start(int a, int b){
this.connect();
this.request(a, b);
return this.getresult();
}
/**
* [@param](http://my.oschina.net/u/2303379) args
*/
public static void main(String[] args) {
System.out.println(new AddClient().start(123, 345));
}
}
非阻塞加法服务器
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
public class AddServer {
private ServerSocketChannel server = null;
private SocketChannel client = null;
private ByteBuffer buff = ByteBuffer.allocate(8);
private IntBuffer intBuff = buff.asIntBuffer();
public void connect(){
try {
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(80));
server.configureBlocking(false);
System.out.println("channel open!");
} catch (IOException e) {
e.printStackTrace();
}
}
public void waitForConnection(){
Selector acceptSelector;
try {
acceptSelector = SelectorProvider.provider().openSelector();
SelectionKey acceptKey = server.register(acceptSelector, SelectionKey.OP_ACCEPT);
int keyadded = 0;
while( (keyadded = acceptSelector.select()) > 0){
Set readyKeys = acceptSelector.selectedKeys();
Iterator it = readyKeys.iterator();
while(it.hasNext()){
SelectionKey sk = (SelectionKey)it.next();
it.remove();
ServerSocketChannel nextReaedy = (ServerSocketChannel)sk.channel();
client = nextReaedy.accept();
processRequest();
}
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
public void processRequest(){
buff.clear();
try {
client.read(buff);
int result = intBuff.get(0)+intBuff.get(1);
buff.flip();
buff.clear();
intBuff.put(0, result);
client.write(buff);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run(){
this.connect();
this.waitForConnection();
this.processRequest();
}
/**
* [@param](http://my.oschina.net/u/2303379) args
*/
public static void main(String[] args) {
new AddServer().run();
}
}
非阻塞的加法服务器首先通过SelectorProvider
工厂方法建立选择器
acceptSelector = SelectorProvider.provider().openSelector();
然后在ServerSocketChannel
上注册选择器和对应的事件。
SelectionKey acceptKey = server.register(acceptSelector, SelectionKey.OP_ACCEPT);
通过选择器获取当前是否有client连接到server:
acceptSelector.select()>0
然后将有client链接到server,获取成功连接的迭代器。遍历已经准备好的连接,分别处理请求。