Java标准IO 与 Java NIO 的简单差异示意:
Java标准IO
Java NIO
API调用
简单
复杂
底层实现
面向流(stream),单向
面向通道(channel),释放CPU、内存压力
成效
同步阻塞
同步非阻塞
数据窥视
阻塞读取,要么足够,要么没有
使用缓冲区(Buffer), 读数据时需要检查是否足够
处理数据的线程数
1:1(一个线程处理一个流)
1:N(选择器(Selector),多路复用,可以一个或几个少量线程管理多个通道)
Java NIO知识体系图:
1、NIO是什么?
Java NIO(New IO)提供一种替代标准Java IO的API(从Java1.4开始),包名为 java.nio.*。
2、NIO提供了什么特性?
Java NIO提供了同步非阻塞IO的特性,使得IO不再依赖stream(流),而借助channel(通道)实现非阻塞式响应,避免线程上下文切换的开销。
3、NIO基本概念
Java NIO由三个核心部分组成:Buffer(缓冲区)、Channel(通道)、Selector(选择器)。
另外的Charset(字符集),提供提供Unicode字符串编码转换到字节序列以及反编码的API。
4、Buffer(缓冲区)
Buffer,顾名思义为缓冲区,实际上是一个线性且有限的数组内存容器。
可以从channel(通道)中读取数据到Buffer,也可以从Buffer写数据到channel(通道)。
实现大体有 ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。
(1)基本属性
容量(capacity):所包含的元素个数。缓存区的容量不能为负,且一旦定义无法变更。
限制(limit):第一个不应该被读取或写入的索引。缓存区的限制不能为负,且不能大于容量(capacity)。
位置(position):下一个要读取或写入的索引。不能为负数,也不能超过限制(limit)。
标记(mark):标记当前位置的索引,暂存。如果定义了标记,则在将位置或限制调整为小于该标记的值时,该标记将被丢弃。
剩余(remaining):当前位置与限制之间的元素数 (limit - position)。
标记、位置、限制和容量值遵守以下:
0《= 标记(mark)《=位置(position)《=限制(limit)《=容量(capacity)
位置(position)、限制(limit)、容量(capacity)在读写模式下的示意图:
capacity限定你可以操作的内存块大小,capacity个byte、long,char等类型。
在读模式下,limit表示你最多可以读取多少数据。在切换读模式时,limit会设置为写模式当前的position。这样,你就可以读取之前写入的所有数据。
在写模式下,limit等于capacity,表示最多可以写入capacity个数据。
(2)Buffer的基本用法
使用Buffer读写数据一般遵循以下四个步骤:
1、创建并分配指定大小的buffer空间
2、写入数据到Buffer
3、调用flip()方法
4、从Buffer中读取数据 (如有必要,需要检查是否足够)
5、调用clear()方法或者compact()方法
示例:
java.nio.ByteBuffer byteBuffer = ByteBuffer.allocate(5);
//清空数据,切换写模式,准备写数据
byteBuffer.clear();
//byteBuffer.put((byte) 1);
//切换读模式,准备读数据
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
//操作数据
}
(3)实例化方法
字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。 使用allocateDirect方法可以一次性分配capacity大小的连续字节空间。通过allocateDirect方法来创建具有连续空间的ByteBuffer对象虽然可以在一定程度上提高效率,在一些操作系统平台上会使效率大幅度提高,而在另一些操作系统平台上,性能会表现得非常差。谨慎使用直接缓冲区,除非你明确了确实有性能提升。
a、非直接缓冲区 - 使用allocate创建
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(10);
b、直接缓冲区 - 使用allocateDirect创建
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocateDirect(10);
c、静态warp方法创建
byte[] bytes = new byte[]{1, 2, 3, 4, 5};
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.wrap(bytes);
(4)从Buffer中读数据
有两种方式从Buffer中读数据:
a、从Buffer中读取数据写到channel(通道)
int bytesWritten = inChannel.write(buf);
b、使用Buffer的get方法
byte aByte = buf.get();
(5)向Buffer写数据
有两种方式向Buffer写数据:
a、从channel(通道)中读取数据写到Buffer
int bytesRead = inChannel.read(buf); //read into buffer.
b、使用Buffer的put方法
buf.put(127);
(6)读写模式切换
Buffer存在以下方法可以切换读写模式:
清除(clear):(position=0,limit=capacity,mark=-1) 位置(position)置为0,限制(limit)设为容量(capacity),并丢弃标记(mark)。
反转(flip): (limit=position,position=0,mark=-1) 限制(limit)置为当前位置(position),然后位置(position)置为0,并丢弃标记(mark)。
重绕(rewind):(position=0,mark=-1) 位置(position)置为0,限制(limit)保持不变,并丢弃标记(mark)。
_压缩(compact):(copy未读数据到前端,position=remaining,limit=capacity,mark=-1)_将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处,然后将缓冲区的位置设置为_remaining_,限制(limit)设为容量(capacity),并丢弃标记(mark)。
因此,从读模式切换写模式,使用清除(clear)。
从写模式切换读模式,使用反转(flip)。
读写模式混用,使用重绕(rewind)。
(7)标记与重置
标记(mark):当前位置设为标记(mark=position)
重置(reset):位置设置为以前标记的位置(position=mark)
(8)共享缓冲区
duplicate - 共享底层缓冲区,内容互相可见,位置、限制和标记互相独立
slice - 共享缓冲区子部分(从共享发生位置起),该部分内容互相可见,位置、限制和标记互相独立
warp - 包装,将byte数组引用为缓存区数组,如果缓存区内容变更,byte数组也相应变更
as视图 :
共享部分或者全部缓冲区,内容互相可见,位置、限制和标记互相独立。
remaining>1 (除以2)
asCharBuffer
asShortBuffer
remaining>2 (除以4)
asIntBuffer
asFloatBuffer
remaining>3 (除以8)
asLongBuffer
asDoubleBuffer
asReadOnlyBuffer(只读)
示例:
public static void main(String[] args) {
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(31);
printBuffer(byteBuffer1, "byteBuffer1");
/**
* byteBuffer1的remaining>1 (除以2)
*/
CharBuffer charBuffer = byteBuffer1.asCharBuffer();
printBuffer(charBuffer, "charBuffer");
charBuffer.put('a');
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>1 (除以2)
*/
ShortBuffer shortBuffer = byteBuffer1.asShortBuffer();
printBuffer(shortBuffer, "shortBuffer");
shortBuffer.put((short) 3);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>2 (除以4)
*/
IntBuffer intBuffer = byteBuffer1.asIntBuffer();
printBuffer(intBuffer, "intBuffer");
intBuffer.put(4);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>3 (除以8)
*/
LongBuffer longBuffer = byteBuffer1.asLongBuffer();
printBuffer(longBuffer, "longBuffer");
longBuffer.put(120);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>2 (除以4)
*/
FloatBuffer floatBuffer = byteBuffer1.asFloatBuffer();
printBuffer(floatBuffer, "floatBuffer");
floatBuffer.put(9f);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* byteBuffer1的remaining>3 (除以8)
*/
DoubleBuffer doubleBuffer = byteBuffer1.asDoubleBuffer();
printBuffer(doubleBuffer, "doubleBuffer");
doubleBuffer.put(1);
if (byteBuffer1.hasArray())
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
}
private static void printBuffer(Buffer buffer, String name) {
System.out.println((name != null && !name.isEmpty() ? name + " " : "") + "position="
+ buffer.position() + ",limit=" + buffer.limit()
+ ",remaining=" + buffer.remaining() + ",capacity=" + buffer.capacity());
}
(9)字节序(ByteOrder)
在计算机科学领域中,字节序是指存放多字节数据的字节(byte)的顺序,典型的情况是整数在内存中的存放方式和网络传输的传输顺序。
在不同处理器,机器的字节序是可能不一致的,在跨平台处理数据的时候,字节序的调整也就变得有必要了。
public static void main(String[] args) {
String string = "abcde";
java.nio.ByteBuffer byteBuffer1 = ByteBuffer.allocate(10);
System.out.println(byteBuffer1.order());
byteBuffer1.rewind();//位置设置为 0 并丢弃标记
byteBuffer1.order(ByteOrder.BIG_ENDIAN);
byteBuffer1.asCharBuffer().put(string);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
byteBuffer1.rewind();//位置设置为 0 并丢弃标记
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
byteBuffer1.asCharBuffer().put(string);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* 无效用法1,只更改order,不重新填充数据,存储是不会改变的,只有下次才生效
*/
byteBuffer1.rewind();//位置设置为 0 并丢弃标记
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
/**
* 无效用法2,填充完数据再改order,存储是不会改变的,只有下次才生效
*/
byteBuffer1.rewind();//位置设置为 0 并丢弃标记
byteBuffer1.asCharBuffer().put(string);
byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);
System.out.println("byteBuffer1 data=" + Arrays.toString(byteBuffer1.array()));
}
(10)其他一些坑的记录
ByteBuffer 的 array()方法与其他的,如CharBuffer不是一个样子处理:
ByteBuffer的array()会把所有容量元素返回,正确做法如下:
/**
* 转化byte数组
*
* @param byteBuffer
* @return
*/
public static byte[] readToBytes(ByteBuffer byteBuffer) {
byteBuffer.flip();
// Retrieve bytes between the position and limit
// (see Putting Bytes into a ByteBuffer)
byte[] bytes = new byte[byteBuffer.remaining()];
// transfer bytes from this buffer into the given destination array
byteBuffer.get(bytes, 0, bytes.length);
byteBuffer.clear();
return bytes;
}
5、Channel(通道)
(1)工作原理
在操作系统知识中,通道指的是独立于CPU的专管I/O的控制器,控制外围I/O设备与内存进行信息交换。在采用通道方式的指令系统中,除了供CPU编程使用的机器指令系统外,还设置另外供通道专用的一组通道指令,用通道指令编制通道程序,读取或存入I/O设备。当需要进行I/O操作时,CPU只需启动通道,然后可以继续执行自身程序,通道则执行通道程序,管理与实现I/O操作,当完成时通道再汇报CPU即可。
如此,通道使得CPU、内存与I/O操作之间达到更高的并行程度。
(2)Channel(通道) vs. Stream(流)
Stream(流),是单向的,不能在一个流中读写混用,要么一路读直到关闭,要么一路写直到关闭。同时流是直接依赖CPU指令,与内存进行I/O操作,CPU指令会一直等I/O操作完成。
而Channel(通道) ,是双向的,可以借助Buffer(缓冲区)在一个通道中读写混用,可以交叉读数据、写数据到通道,而不用在读写操作后立刻关闭。另外还可以在两个通道中直接对接传输。通道不依赖CPU指令,有专用的通道指令,在接收CPU指令,就可以独立与内存完成I/O操作,只有在I/O操作完成后通知CPU,在此期间CPU是不用一直等待。
(3)通道的分类
Java Channel(通道) ,提供了各种I/O实体的连接,主要涵盖文件、网络(TCP、UDP)、管道三个方面。
大体实现:FileChannel、ServerSocketChannel、SocketChannel、DatagramChannel、Pipe.SinkChannel、 Pipe.SourceChannel。
文件通道
网络(套接字)通道
管道
实现
FileChannel
ServerSocketChannel、
SocketChannel、
DatagramChannel
Pipe.SinkChannel、
Pipe.SourceChannel
继承的抽象类
源自AbstractInterruptibleChannel,可中断通道。
源自AbstractSelectableChannel,提供注册、注销、关闭,设置阻塞模式,管理当前选择键集,支持“多路复用”功能。
同左
实例化方法
通过FileInputStream、FileOutputStream、RandomAccessFile获取文件通道
通过Selector选择器注册监听,返回一个表示该通道已向选择器注册的新 SelectionKey 对象。
同左
6、文件通道(FileChannel)
(1)打开FileChannel(实例化)
在使用FileChannel之前,需要先打开它。但是我们无法直接打开一个FileChannel,需要通过FileInputStream、 FileOutputStream、RandomAccessFile获取FileChannel实例。
java.nio.channels.FileChannel channel = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(path);
channel = fis.getChannel();
} finally {
if (channel != null)
channel.close();
if (fis != null)
fis.close();
}
(2)从FileChannel读取数据、向FileChannel写数据
read与write:与其他Channel一样,读写借助Buffer传输,需要注意的是往Channel写数据(即Buffer读数据写入Channel)需要检查数据是否足够。
position:获取、设置当前位置
size:获取此FileChannel的文件的当前大小
force(boolean) :强制将所有对此通道的文件更新写入包含该文件的存储设备中。保证文件及时更新到存储设备,特别是写数据时。
truncate:将此通道的文件截取为给定大小。
public static void main(String[] args) throws IOException {
final String path = "file.txt";
write(path);//写文件
write2(path);//特定位置读写
read(path);//读文件
System.out.println();
truncate(path);//文件截取
read(path);//读文件
}
/**
* FileChannel 读文件
*/
private static void read(String path) throws IOException {
java.nio.channels.FileChannel channel = null;
FileInputStream fis = null;
try {
fis = new FileInputStream(path);
channel = fis.getChannel();
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
// 从入channel读取数据到buffer
buffer1.rewind();
while (channel.read(buffer1) > 0) {
//读取buffer
buffer1.flip();
Charset charset = Charset.defaultCharset();
CharBuffer charBuffer = charset.decode(buffer1);
System.out.print(charBuffer);
}
} finally {
if (channel != null)
channel.close();
if (fis != null)
fis.close();
}
}
/**
* FileChannel 写文件
*/
private static void write(String path) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap("赵客缦胡缨,吴钩霜雪明。银鞍照白马,飒沓如流星。\n".getBytes());
java.nio.channels.FileChannel channel = null;
FileOutputStream fos = null;
try {
fos = new FileOutputStream(path);
channel = fos.getChannel();
//强制刷出到内存
channel.force(true);
// 从buffer读取数据写入channel
buffer.rewind();
channel.write(buffer);
} finally {
if (channel != null)
channel.close();
if (fos != null)
fos.close();
}
}
/**
* FileChannel 特定位置读写
*/
private static void write2(String path) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap("十步杀一人,千里不留行。事了拂衣去,深藏身与名。\n".getBytes());
java.nio.channels.FileChannel channel = null;
RandomAccessFile file = null;
try {
file = new RandomAccessFile(path, "rw");
channel = file.getChannel();
channel.position(channel.size()); //定位到文件末尾
//强制刷出到内存
channel.force(true);
// 从buffer读取数据写入channel
buffer.rewind();
channel.write(buffer);
} finally {
if (channel != null)
channel.close();
if (file != null) {
file.close();// 关闭流
}
}
}
/**
* FileChannel 文件截取
*/
private static void truncate(String path) throws IOException {
java.nio.channels.FileChannel channel = null;
RandomAccessFile file = null;
try {
file = new RandomAccessFile(path, "rw");
channel = file.getChannel();
/**
* 截取文件前36byte
*/
channel.truncate(36);
} finally {
if (channel != null)
channel.close();
if (file != null) {
file.close();// 关闭流
}
}
}
(3)独占锁定
lock() 与 tryLock() - 获取或尝试获取对此通道的文件的独占锁定。
FileLock lock(long position, long size, boolean shared) 与 FileLock tryLock(long position, long size, boolean shared) 获取或尝试获取此通道的文件给定区域上的锁定。 shared的含义:是否使用共享锁,一些不支持共享锁的操作系统,将自动将共享锁改成排它锁。可以通过调用 isShared() 方法来检测获得的是什么类型的锁。
- 共享锁与独占锁的区别:
共享锁: 如果一个线程获得一个文件的共享锁,那么其它线程可以获得同一文件的共享锁或同一文件部分内容的共享锁,但不能获取独占锁。
独占锁: 只有一个读或一个写(读和写都不能同时)。独占锁防止其他程序获得任何类型的锁。
- lock()和tryLock()的区别:
lock()阻塞式的,它要阻塞进程直到锁可以获得,或调用 lock() 的线程中断,或调用 lock() 的通道关闭。__锁定范围可以随着文件的增大而增加。无参lock()默认为独占锁;有参lock(0L, Long.MAX_VALUE, true)为共享锁。
tryLock()非阻塞,当未获得锁时,返回null.
- FileLock是线程安全的
- FileLock的生命周期:在调用FileLock.release(),或者Channel.close(),或者JVM关闭
注意:
- 同__一进程内,在文件锁没有被释放之前,不可以再次获取。即在release()方法调用前,只能lock()或者tryLock()一次。
- 文件锁定以整个 Java 虚拟机来保持。但它们不适用于控制同一虚拟机内多个线程对文件的访问。
- 对于一个只读文件或是只读channel通过任意方式加锁时会报NonWritableChannelException异常
- 对于一个不可读文件或是不可读channel通过任意方式加锁时会报NonReadableChannelException__异常
示例:
---------同一进程 - 读读重叠
两个读线程都是阻塞式获取共享锁。
lock = channel.lock(0L, Long.MAX_VALUE, true);
同一进程,即使是共享锁,同时读并且重叠,一样 文件重叠锁异常【OverlappingFileLockException】
---------不同进程共享锁- 读读
两个进程读线程都是阻塞式获取共享锁。
lock = channel.lock(0L, Long.MAX_VALUE, true);
根据结果,我们看到第二进程读的时候,获取共享锁(18:46:03获取),第一进程的共享锁还没释放(18:46:05释放)。
验证了共享锁允许同时读的特性。
---------不同进程-读写
FileInputStream读进程:lock = channel.lock(0L, Long.MAX_VALUE, true); 阻塞获取共享锁
FileOutputStream写进程:lock = channel.lock(); 阻塞获取独占锁
我们尝试先后不同启动读进程、写进程,发现两者都没有异常,同时都是等待另外一个进程完成并释放锁再获取文件锁。
---------不同进程-写写
类似的,验证了独占锁的特性。
7、网络TCP通道(ServerSocketChannel、SocketChannel)
7.1、ServerSocketChannel基本用法
(1)打开 ServerSocketChannel
通过调用 ServerSocketChannel.open() 方法来打开ServerSocketChannel
ServerSocketChannel channel= ServerSocketChannel.open();
(2)关闭 ServerSocketChannel
通过调用ServerSocketChannel.close() 方法来关闭ServerSocketChannel. 如:
//关闭 ServerSocketChannel
if (channel != null) {
channel.close();
}
(3)将 ServerSocket 绑定到特定地址(IP 地址和端口号)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
(4)监听新进来的连接
在阻塞模式下,accept()方法会一直阻塞到有新连接 SocketChannel到达。
在while循环中调用 accept()方法. 如:
while (true) {
// 监听新进来的连接
java.nio.channels.SocketChannel socketChannel = channel.accept();
//do something with socketChannel...
}
(5)非阻塞模式
ServerSocketChannel可以设置成非阻塞模式。
在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。
因此,需要检查返回的SocketChannel是否是null.如:
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
while (true) {
// 监听新进来的连接
java.nio.channels.SocketChannel socketChannel = channel.accept();
if (socketChannel == null) {
// System.out.println("没有客户端连接");
TimeUnit.SECONDS.sleep(1);
continue;
}
//do something with socketChannel...
}
7.2、SocketChannel基本用法
SocketChannel 与 ServerSocketChannel类似,区别只在于需要指定连接的服务器:
// 打开SocketChannel
SocketChannel channel = SocketChannel.open();
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
// tcp连接网络
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
if (channel.finishConnect()) {// 连接服务器成功
//do something
}
7.3、一个简单的模拟TCP接口的Demo
这里为方便学习交流,仅使用基本api,暂时没有使用Selector(选择器)。
服务端:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.TimeUnit;
/**
* ServerSocketChannel示例
*
* @author Sven Augustus
*/
public class ServerSocketChannelTest {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
java.nio.channels.ServerSocketChannel channel = null;
try {
// 打开 ServerSocketChannel
channel = ServerSocketChannel.open();
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
// 将 ServerSocket 绑定到特定地址(IP 地址和端口号)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
while (true) {
// 监听新进来的连接
java.nio.channels.SocketChannel socketChannel = channel.accept();
if (socketChannel == null) {
// System.out.println("没有客户端连接");
TimeUnit.SECONDS.sleep(1);
continue;
}
System.out.println("准备读:");
// 读取客户端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
socketChannel.read(buffer);
buffer.flip();// 读取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
// 往客户端写数据
String serializable = "您好,客户端" + socketChannel.getRemoteAddress();
System.out.println("准备写:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
}
} finally {
//关闭 ServerSocketChannel
if (channel != null) {
channel.close();
}
}
}
}
客户端:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* SocketChannel示例
*
* @author Sven Augustus
*/
@SuppressWarnings("unused")
public class SocketChannelTest {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
java.nio.channels.SocketChannel channel = null;
try {
// 打开SocketChannel
channel = SocketChannel.open();
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
// tcp连接网络
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
if (channel.finishConnect()) {// 连接服务器成功
/**
* 往服务端写数据
*/
String serializable = "您好,ServerSocketChannel。";
System.out.println("准备写:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
channel.write(byteBuffer);
System.out.println("准备读:");
// 读取服务端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int numBytesRead = -1;
while ((numBytesRead = channel.read(buffer)) != -1) {
if (numBytesRead == 0) {// 如果没有数据,则稍微等待一下
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
buffer.flip();// 读取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
buffer.clear(); // 复位,清空
}
} else {
System.out.println("连接失败,服务器拒绝服务");
return;
}
} finally {
// 关闭SocketChannel
if (channel != null) {
channel.close();
}
}
}
}
8、网络UDP通道(DatagramChannel)
(1)TCP、UDP的区别
可以参考以下文章:http://www.cnblogs.com/visily/archive/2013/03/15/2961190.html
我们可以简单了解和总结:
协议
基于
数据模式
资源要求
数据正确性
数据顺序性
适用场景
TCP
连接
流或通道
较多
保证
保证
精算计算的场景
UDP
无连接
数据报
较少
不保证,可能丢包(当然内网环境几乎不存在)
不保证
服务系统内部的通讯
(2)打开 DatagramChannel
通过调用 DatagramChannel.open() 方法来打开DatagramChannel
DatagramChannel channel= DatagramChannel.open();
注意的是DatagramChannel的open()方法只是打开获得通道,但此时尚未连接。尽管DatagramChannel无需建立远端连接,但仍然可以通过isConnect()检测当前的channel是否声明了远端连接地址。
(3)关闭 DatagramChannel
//关闭DatagramChannel
if (channel!= null) {
channel.close();
}
(4)接收数据
通过receive()方法从DatagramChannel接收数据,返回一个SocketAddress对象以指出数据来源。在阻塞模式下,receive()将会阻塞至有数据包到来,非阻塞模式下,如果没有可接受的包则返回null。如果包内的数据大小超过缓冲区容量时,多出的数据会被悄悄抛弃。
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
byteBuffer.clear();
SocketAddress address = channel.receive(byteBuffer);//receive data
非阻塞模式:
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.clear();
SocketAddress socketAddress = channel.receive(byteBuffer);
if (socketAddress == null) {
// System.out.println("没有客户端连接");
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
//do something with DatagramChannel...
}
(5)发送数据
通过send()方法从DatagramChannel发送数据到指定的SocketAddress对象所描述的地址。在阻塞模式下,调用线程会被阻塞至有数据包被加入传输队列。非阻塞模式下,如果发送内容为空则返回0,否则返回发送的字节数。
请注意send()方法返回的非零值并不表示数据报到达了目的地,仅代表数据报被成功加到本地网络层的传输队列。
ByteBuffer byteBuffer = ByteBuffer.wrap(new String("i 'm client").getBytes());
int bytesSent = channel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
非阻塞模式:
Serializable serializable = "您好,DatagramChannel。";
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
// 发送数据,以下为简单模拟非阻塞模式重发3次机制
final int TIMES = 3;
int bytesSent = 0;
int sendTime = 1;
while (bytesSent == 0 && sendTime <= TIMES) {
bytesSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
sendTime++;
}
(6)连接到特定的地址
可以将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。当连接后,也可以使用read()和write()方法,就像在用传统的通道一样。只是在数据传送方面没有任何保证。
int bytesRead = channel.read(buf);
int bytesWritten = channel.write(but);
当通道不是已连接状态时调用read()或write()方法,都将产生NotYetConnectedException异常。
(7)一个简单的模拟UDP接口的Demo
这里为方便学习交流,仅使用NIO基本api,暂时没有使用Selector(选择器)。
接收方:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.TimeUnit;
/**
* 网络UDP通道(DatagramChannel)测试 --作为服务端
*
* @author Sven Augustus
*/
public class DatagramChannelServerTest {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
DatagramChannel channel = null;
try {
//打开DatagramChannel
channel = DatagramChannel.open();
//非阻塞模式
channel.configureBlocking(false);
//将 UDP 绑定到特定地址(IP 地址和端口号),作为服务端监听端口
channel.bind(new InetSocketAddress("127.0.0.1", 9898));
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
buffer.clear();
SocketAddress socketAddress = channel.receive(buffer);
if (socketAddress == null) {
// System.out.println("没有客户端连接");
TimeUnit.MILLISECONDS.sleep(1);
continue;
}
System.out.println("准备读:"+ socketAddress);
buffer.flip();//切换读模式
Serializable object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
// 往客户端写数据
String serializable = "您好,客户端" + socketAddress.toString();
System.out.println("准备写:" + serializable);
ByteBuffer byteBuffer =
ByteBufferUtils.writeObject(serializable);
channel.send(byteBuffer,socketAddress);
}
} finally {
//关闭DatagramChannel
if (channel != null) {
channel.close();
}
}
}
}
发送方:
package io.flysium.nio.c2_channel.socket;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.TimeUnit;
/**
* 网络UDP通道(DatagramChannel)测试 --作为客户端,发送数据
*
* @author Sven Augustus
*/
public class DatagramChannelClientTest {
public static void main(String[] args) throws IOException, ClassNotFoundException {
java.nio.channels.DatagramChannel channel = null;
try {
//打开DatagramChannel
channel = DatagramChannel.open();
//非阻塞模式
channel.configureBlocking(false);
// 发送数据将不用提供目的地址而且接收时的源地址也是已知的(这点类似SocketChannel),
// 那么此时可以使用常规的read()和write()方法
channel.connect(new InetSocketAddress("127.0.0.1", 9898));
Serializable serializable = "您好,DatagramChannel。";
System.out.println("准备写:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
// 发送数据,以下为简单模拟非阻塞模式重发3次机制
final int TIMES = 3;
int bytesSent = 0;
int sendTime = 1;
while (bytesSent == 0 && sendTime <= TIMES) {
//bytesSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 9898));
bytesSent = channel.write(byteBuffer);
sendTime++;
}
if (bytesSent > 0) {
System.out.println("发送成功。");
} else {
System.out.println("发送失败。");
}
byteBuffer.clear();
// 读取服务端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int numBytesRead = -1;
while ((numBytesRead = channel.read(buffer)) != -1) {
if (numBytesRead == 0) {// 如果没有数据,则稍微等待一下
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
buffer.flip();// 读取buffer
Object object = ByteBufferUtils.readObject(buffer);
System.out.println(object);
buffer.clear(); // 复位,清空
}
} finally {
//关闭DatagramChannel
if (channel != null) {
channel.close();
}
}
}
}
9、管道(Pipe.SinkChannel、 Pipe.SourceChannel)
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
以下是Pipe原理的图示:
(1)创建管道
通过Pipe.open()方法打开管道。例如:
Pipe pipe = Pipe.open();
(2)往管道写数据
要向管道写数据,需要访问sink通道。像这样:
Pipe.SinkChannel sinkChannel = pipe.sink();
然后可以调用write方法。
(3)从管道读数据
从读取管道的数据,需要访问source通道,像这样:
Pipe.SourceChannel sourceChannel = pipe.source();
然后可以调用read方法。
(4)简单示例
package io.flysium.nio.c2_channel.pipe;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
/**
* 管道测试
* @author Sven Augustus
*/
public class PipeTest {
static class Input implements Runnable {
private final Pipe pipe;
public Input(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try {
Pipe.SourceChannel sourceChannel = pipe.source();
System.out.println("管道读取准备。");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int bytesRead = sourceChannel.read(byteBuffer);
byteBuffer.flip();//切换读模式
Serializable serializable =
ByteBufferUtils.readObject(byteBuffer);
System.out.println("管道读取结果:" + serializable);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
static class Output implements Runnable {
private final Pipe pipe;
public Output(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try {
Pipe.SinkChannel sinkChannel = pipe.sink();
System.out.println("管道写出准备。");
Serializable object = "您好啊,Pipe。";
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(object);
while (byteBuffer.hasRemaining()) {
sinkChannel.write(byteBuffer);
}
System.out.println("管道写出完成:"+object);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
Pipe pipe = Pipe.open();
new Thread(new Input(pipe)).start();
new Thread(new Output(pipe)).start();
}
}
10、Selector(选择器)
(1)Selector模式
Selector对象本质上是一个观察者,会监视已注册的各种通道及其事件,当应用select机制后且某通道有事件发生时,会报告该信息。
这是一种网络事件驱动模型。分为三部分:
注册事件:通道将感兴趣的事件注册到Selector上。
select机制:主动应用select机制,当有事件发生时,返回一组SelectionKey(键)。
事件处理:从SelectionKey(键)中获取事件集合、就绪IO集合、注册的通道等信息,进行I/O操作。
(2)创建Selector
Selector selector = Selector.open();
(3)向Selector注册通道感兴趣的事件
为了将Channel和Selector配合使用,必须将channel注册到selector上。如下:
与Selector一起使用时,Channel必须处于非阻塞模式下。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
register第二个参数为“interest集合”,意思是通道感兴趣的事件集合,亦指定Selector监听该通道什么事件的发生。
事件主要分四类:
SelectionKey.OP_CONNECT
连接就绪,channel成功连接另一个服务器。
SelectionKey.OP_ACCEPT
接收就绪,server channel成功接受到一个连接。
SelectionKey.OP_READ
读就绪,channel通道中有数据可读。
SelectionKey.OP_WRITE
写就绪,channel通道等待写数据。
如果你对不止一件事件感兴趣,可以使用位移操作:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
(4)通过Selector选择通道
应用select机制,可以返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。
下面是select()方法:
- int select() 一直阻塞直到至少有一个通道注册的事件发生了。
- int select(long timeout) 一直阻塞直到至少有一个通道注册的事件发生了,或者已经超时timeout毫秒。
- int selectNow() 不会阻塞,不管有没有通道就绪都立刻返回。
返回值为就绪的通道数。
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:
Set selectedKeys = selector.selectedKeys();
每个元素SelectionKey(键),包含
- interest集合 即下一次感兴趣的事件集合,确定了下一次调用某个选择器的选择方法时,将测试哪类操作的准备就绪信息。创建该键时使用给定的值初始化 interest 集合;之后可通过 interestOps(int) 方法对其进行更改。
- ready集合 即通道已经准备就绪的事件的集合。
- Channel 即注册的通道实例。
- Selector对象
- 附加的对象(可选) 即注册时附加的对象。
可以遍历这个已选择的键集合来访问就绪的通道。如下:
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if(key.isAcceptable()) {
// 接收就绪,server channel成功接受到一个连接。
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
//连接就绪,channel成功连接另一个服务器。
// a connection was established with a remote server.
} else if (key.isReadable()) {
//读就绪,channel通道中有数据可读。
// a channel is ready for reading
} else if (key.isWritable()) {
//写就绪,channel通道等待写数据。
// a channel is ready for writing
}
}
每次迭代的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
(5)示例
现在简单模拟一下客户端向服务器循环发送接口请求,请求参数是整数,服务端会计算好(其实是乘以2)返回给客户端。
启动若干个客户端测试。
服务端:
package io.flysium.nio.c3_selector;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* ServerSocketChannel示例,使用Selector模式
*
* @author Sven Augustus
*/
public class ServerSocketChannelTest2 {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
ServerSocketChannel channel = null;
Selector selector = null;
try {
// 打开 ServerSocketChannel
channel = ServerSocketChannel.open();
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
// 将 ServerSocket 绑定到特定地址(IP 地址和端口号)
channel.bind(new InetSocketAddress("127.0.0.1", 9595));
// 创建Selector选择器
selector = Selector.open();
// 注册事件,监听客户端连接请求
channel.register(selector, SelectionKey.OP_ACCEPT);
final int timeout = 1000;//超时timeout毫秒
while (true) {
if (selector.select(timeout) == 0) {//无论是否有事件发生,selector每隔timeout被唤醒一次
continue;
}
Set selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {// 接收就绪,server channel成功接受到一个连接。
SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
socketChannel.configureBlocking(false);// 设置非阻塞模式
// 注册读操作 , 以进行下一步的读操作
socketChannel.register(key.selector(), SelectionKey.OP_READ);
} else if (key.isConnectable()) {//连接就绪,channel成功连接另一个服务器。
} else if (key.isReadable()) {//读就绪,channel通道中有数据可读。
SocketChannel socketChannel = (SocketChannel) key.channel();
//System.out.println("准备读:");
// 读取客户端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int readBytes = socketChannel.read(buffer);
if (readBytes >= 0) {// 非阻塞,立刻读取缓冲区可用字节
buffer.flip();// 读取buffer
Object object = ByteBufferUtils.readObject(buffer);
//System.out.println(object);
//附加参数
key.attach(object);
// 切换写操作 , 以进行下一步的写操作
key.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) { //客户端连接已经关闭,释放资源
System.out.println("客户端" + socketChannel.socket().getInetAddress()
+ "端口" + socketChannel.socket().getPort() + "断开...");
socketChannel.close();
}
} else if (key.isValid() && key.isWritable()) {//写就绪,channel通道等待写数据。
SocketChannel socketChannel = (SocketChannel) key.channel();
// 计算
Integer integer = Integer.parseInt(String.valueOf(key.attachment()));
String serializable = String.valueOf(integer * 2);
// 往客户端写数据
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
System.out.println("客户端服务器:" + integer + ",响应:" + serializable);
// 切换读操作 , 以进行下一次的接口请求,即下一次读操作
key.interestOps(SelectionKey.OP_READ);
}
}
}
} finally {
//关闭 ServerSocketChannel
if (channel != null) {
channel.close();
}
if (selector != null) {
selector.close();
}
}
}
}
客户端:
package io.flysium.nio.c3_selector;
import io.flysium.nio.ByteBufferUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* SocketChannel示例,使用Selector模式
*
* @author Sven Augustus
*/
@SuppressWarnings("unused")
public class SocketChannelTest2 {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
new Thread(new ClientRunnable("A")).start();
new Thread(new ClientRunnable("B")).start();
new Thread(new ClientRunnable("C")).start();
new Thread(new ClientRunnable("D")).start();
}
private static class ClientRunnable implements Runnable {
private final String name;
private ClientRunnable(String name) {
this.name = name;
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
// 打开SocketChannel
channel = SocketChannel.open();
// 设置非阻塞模式,read的时候就不再阻塞
channel.configureBlocking(false);
// tcp连接网络
channel.connect(new InetSocketAddress("127.0.0.1", 9595));
// 创建Selector选择器
selector = Selector.open();
// 注册事件,监听读/写操作
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
final int timeout = 1000;//超时timeout毫秒
if (channel.finishConnect()) {// 连接服务器成功
while (true) {
if (selector.select(timeout) == 0) {
continue;
}
Set selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isValid() && key.isWritable()) {//写就绪,channel通道等待写数据。
TimeUnit.SECONDS.sleep(3);
SocketChannel socketChannel = (SocketChannel) key.channel();
// 往服务端写数据
String serializable = String.valueOf(new Random().nextInt(1000));
//System.out.println("准备写:" + serializable);
ByteBuffer byteBuffer = ByteBufferUtils.writeObject(serializable);
socketChannel.write(byteBuffer);
//附加参数
key.attach(serializable);
// 切换读操作 , 以进行下一次的接口请求,即下一次读操作
key.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {//读就绪,channel通道中有数据可读。
SocketChannel socketChannel = (SocketChannel) key.channel();
//System.out.println("准备读:");
// 读取服务端发送的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
int readBytes = socketChannel.read(buffer);
if (readBytes >= 0) {// 非阻塞,立刻读取缓冲区可用字节
buffer.flip();// 读取buffer
Object object = ByteBufferUtils.readObject(buffer);
//System.out.println(object);
buffer.clear(); // 复位,清空
Integer integer = Integer.parseInt(String.valueOf(key.attachment()));
System.out.println("线程-" + name
+ ",请求服务器:" + integer + ",响应:" + object);
// 切换写操作 , 以进行下一步的写操作,即接口请求
key.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) { //客户端连接已经关闭,释放资源
System.out.println("服务端断开...");
}
}
}
}
} else {
System.out.println("连接失败,服务器拒绝服务");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
// 关闭SocketChannel
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
11、发散/汇聚
分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。
聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel。
scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。
(1)分散
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
buffer首先被插入到数组,然后再将数组作为channel.read() 的输入参数。read()方法按照buffer在数组中的顺序将从channel中读取的数据写入到buffer,当一个buffer被写满后,channel紧接着向另一个buffer中写。分散不适用于动态消息的处理。
(2)聚集
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
//write data into buffers
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
buffers数组是write()方法的入参,write()方法会按照buffer在数组中的顺序,将数据写入到channel,注意只有position和limit之间的数据才会被写入。因此,如果一个buffer的容量为128byte,但是仅仅包含100byte的数据,那么这100byte的数据将被写入到channel中。聚集适用于动态消息的处理。
更多Demo:https://git.oschina.net/svenaugustus/MyJavaIOLab
本文只针对NIO的知识总结,其他IO总结姊妹篇(IO)请参见:
+ Java标准I/O流编程一览笔录: https://my.oschina.net/langxSpirit/blog/830620
@SvenAugustus(https://www.flysium.xyz/)
更多请关注微信公众号【编程不离宗】,专注于分享服务器开发与编程相关的技术干货: