1、TimeClient(客户端)
/**
* @FileName TimeClient.java
* @Description:
*
* @Date 2016年3月1日
* @author Administroter
* @version 1.0
*
*/
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length>0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
//采用默认的值
e.printStackTrace();
}
}
new Thread(new TimeClientHandle("127.0.0.1", port),"TimeClient-001").start();;
}
}
2、TimeClientHandle(客户端)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @FileName TimeClientHandle.java
* @Description:
*
* @Date 2016年2月29日
* @author Administroter
* @version 1.0
*
*/
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
/**
* Title: <br />
* Description: 构造函数,初始化参数
* @param host
* @param port
*/
public TimeClientHandle(String host,int port) {
//服务器地址(IP)
this.host = host == null ? "127.0.0.1" : host;
//端口号
this.port = port;
try {
//打开多路复用器
selector = Selector.open();
//打开管道
socketChannel = SocketChannel.open();
//设置管道为异步非阻塞的
socketChannel.configureBlocking(false);
} catch (IOException e) {
//正常结束当前的java虚拟机
System.exit(1);
e.printStackTrace();
}
}
public void run() {
try {
//建立与服务器的链接
this.doConnect();
} catch (IOException e) {
System.exit(1);
e.printStackTrace();
}
while(!stop){
try {
//一秒唤醒一次线程
selector.select(1000);
Set<SelectionKey> selectionKey = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKey.iterator();
SelectionKey key = null;
//迭代就绪的管道
while(it.hasNext()){
key = it.next();
it.remove();
try {
//执行读写的操作
this.handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
System.exit(1);
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @Title: handleInput
* @Description:服务器和客户端的读写操作
* @param key
* @throws IOException
* @author Administroter
* @date 2016年3月1日
*/
private void handleInput(SelectionKey key) throws IOException{
//判断是否处于链接状态
if (key.isValid()) {
SocketChannel sc = (SocketChannel)key.channel();
//判断客户端是否链接成功
if (sc.finishConnect()) {
//管道注册到多路复用器,监听网络读操作
sc.register(selector, SelectionKey.OP_READ);
this.doWrite(sc);
}else{
//IO异常,结束进程
System.exit(1);
}
System.out.println("如果"+key.isReadable()+"开始读取服务器返回结果!");
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes>0) {
/**
* 将当前缓冲区的limit设置为position = 0,用于后续对缓冲区的读取操作
* (将缓存字节数组的指针设置为数组的开始序列即数组下标0,这样就可以从buffer开头,对该buffer进行遍历(读取)了)
* 如果这里不进行设置,读取出来的会是空的
*/
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"utf-8");
System.out.println("服务器响应结果:"+body);
this.stop = true;
}else if(readBytes<0){
key.cancel();
sc.close();
}else{
}
}
}
}
/**
* @Title: doConnect
* @Description:链接到服务器
* @throws IOException
* @author Administroter
* @date 2016年3月1日
*/
private void doConnect() throws IOException{
//判断链接成功
if (socketChannel.connect(new InetSocketAddress(host,port))) {
//将管道注册OP_READ到多路复用器上
socketChannel.register(selector, SelectionKey.OP_READ);
//将客户端请求通过管道写出,转到服务器
this.doWrite(socketChannel);
}else{
//将管道注册OP_CONNECT到多路复用器上
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
/**
* @Title: doWrite
* @Description:编写发送服务器的码流
* @param sc
* @throws IOException
* @author Administroter
* @date 2016年3月1日
*/
private void doWrite(SocketChannel sc) throws IOException{
//请求服务器的参数
byte[] request = "我要获取服务器当前时间".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(request.length);
writeBuffer.put(request);
//使得数据从buffer的开头遍历读取
writeBuffer.flip();
//写出是异步的,可能存在半写包的情况
sc.write(writeBuffer);
//判断缓冲区的消息是否全部发送完成
if (!writeBuffer.hasRemaining()) {
System.out.println("缓冲区消息全部发送完成");
}
}
}
3、TimeService(服务端)
/**
* @FileName TimeService.java
* @Description:
*
* @Date 2016年2月29日
* @author Administroter
* @version 1.0
*
*/
public class TimeService {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length>0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
//采用默认的值
e.printStackTrace();
}
}
TimeServerHandle timeServer = new TimeServerHandle(port);
//MultiplexerTimeServer是一个独立的线程,用户轮询多路复用器Selctor,可以处理多个客户的并发接入
new Thread(timeServer,"TimeServer-001").start();
}
}
4、MultiplexerTimeServer(服务端)
/*
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import com.lxf.netty.common.StrKit;
/**
* @FileName MultiplexerTimeServer.java
* @Description: NIO 多路复用类:负责轮询多路复用器Selctor,处理多个客户端的并发接入
*
* @Date 2016年2月29日
* @author Administroter
* @version 1.0
*
*/
public class TimeServerHandle implements Runnable{
//多路复用器
private Selector selector;
//通道
private ServerSocketChannel servChannel;
private volatile boolean stop;
/**
* Title: <br />
* Description: 构造方法,资源初始化,创建Selector,ServerSocketChannel,对Channel和TCP参数进行设置
* @param port 端口号
*/
public TimeServerHandle(int port) {
try {
//打开多路复用器
selector = Selector.open();
//打开管道
servChannel = ServerSocketChannel.open();
//设置管道为非阻塞的
servChannel.configureBlocking(false);
//绑定主机,并设置backlog为1024
servChannel.socket().bind(new InetSocketAddress(port), 1024);
//将管道注册到复用器上,并监听SelectionKey.OP_ACCEPT操作位
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("开启时间服务器,端口号:"+port);
} catch (IOException e) {
//正常结束当前正在运行中的java虚拟机
System.exit(1);
e.printStackTrace();
}
}
public void stop(){
this.stop = true;
}
/**
* 循环遍历多路复用器,当有处于就绪状态channel时,复用器将会返回就绪状态的channel
*/
public void run() {
while(!stop){
try {
//设置休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都会被唤醒一次
selector.select(1000);
//获取selector含有就绪状态的channel集合
Set<SelectionKey> selectedKey = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKey.iterator();
SelectionKey key = null;
//迭代channel集合
while(it.hasNext()){
key = it.next();
it.remove();
try {
//进行网络异步的读写操作
this.handleInput(key);
} catch (Exception e) {
System.out.println(e.getMessage()+"-----------"+key);
if(key != null){
key.cancel();
if (key.channel() != null) {
//关闭Channel
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
//多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
//关闭多路复用器
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @Title: handleInput
* @Description:网络异步的读写操作
* @param key
* @throws IOException
* @author Administroter
* @date 2016年2月29日
*/
public void handleInput(SelectionKey key)throws IOException{
System.out.println("判断是否链接成功:"+key.isValid());
if(key.isValid()){
//处理新接入的请求消息,根据key判断操作位判断网络事件的类型
if (key.isAcceptable()) {
//接收客户端的请求
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
//创建SocketChannel实例
SocketChannel sc = ssc.accept();
//设置异步非阻塞
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
//读取客户端请求消息
SocketChannel sc = (SocketChannel)key.channel();
//创建ByteBuffer并设置大小为1K(此处大小可以更改,根据客户端发送的码流大小决定)
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取请求的码流(前面设置了SocketChannel为异步非阻塞的,所以这里的read是非阻塞的)
int readBytes = sc.read(readBuffer);
//判断读取的字节数(等于0-没有读到字节,属于正常场景,忽略,大于0-读到字节,对字节进行编码或者解码,等于-1-链路已经关闭,需要关闭SocketChannel,释放资源)
if (readBytes > 0) {
/**
* 将当前缓冲区的limit设置为position = 0,用于后续对缓冲区的读取操作
* (将缓存字节数组的指针设置为数组的开始序列即数组下标0,这样就可以从buffer开头,对该buffer进行遍历(读取)了)
* 如果这里不进行设置,读取出来的会是空的
*/
readBuffer.flip();
//创建字节数据
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"utf-8");
System.out.println("接收到的客户端请求的数据:"+body);
//判断客户端指令如果为QUERY TIME ORDER,则将系统当前的时间返回给客户端,此处也就相当于我们的业务逻辑处理部分
Date date = new Date();
String currentTime = StrKit.getDateToString(date);
String temp = "我要获取服务器当前时间".equalsIgnoreCase(body)?currentTime:"指令不一致";
this.doWrite(sc, temp);
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
}
}
}
}
private void doWrite(SocketChannel channel,String response)throws IOException{
if (response != null && response.trim().length()>0) {
//将服务器响应的字符串编码成字节数组
byte[] bytes = response.getBytes();
//根据字节数组的容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//使得数据从buffer的开头遍历读取
writeBuffer.flip();
//调用SocketChannel管道的write方法将客户端响应的字符串信息发送出去
channel.write(writeBuffer);
}
}
}