先介绍一下项目需求,主要是服务器接受客服端(电子柜台)传来的心跳信息,服务器也能主动发送信息给客户端
最近看了很多帖子,大多是服务器接受信息,然后被动回应客服端,这里我简单的做了一个管理客户的列表。用于指定发送
(当然就是本地操作成功,还未完全测试,应该还是存在很多Bug,仅供参考!!)
先说说Nio,这里就直接贴大神的链接啦:https://gitbook.cn/books/5b1792ad26a49a55324e782c/index.html
还有就是这里这套学校管理老师的比喻,就很形象,比较适合我这种菜鸟理解:https://www.cnblogs.com/wcyBlog/p/4716676.html
然后就是代码了!!!!!
服务器代码如下:
package com.js.archive.tcp;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.HashMap;import java.util.Iterator;public class TcpServer { //缓冲区长度 private static final int BUF_SIZE = 28000; //select 方法等待星道准备好的最长时间 private static final int timeout = 3000; public static Selector selector; /** * 发送消息的开关 */ public static boolean signal = false; /** * 发送消息的内容 */ public static String message= ""; public static void main(String[] args) throws IOException { init("192.168.0.166",1006); new ServerWriteThread(); tcpServer(); } public static void init(String hostname,Integer port) throws IOException { if (selector == null) { synchronized (TcpServer.class) { if (selector == null) { //创建一个选择器 selector = Selector.open(); //实例化一个信道 ServerSocketChannel socketChannel = ServerSocketChannel.open(); //将该信道绑定到指定端口 InetSocketAddress inetSocketAddress = new InetSocketAddress(hostname, port); socketChannel .bind(inetSocketAddress); System.out.println("服务端套接字"+socketChannel .socket().getLocalSocketAddress()); //配置信道为非阻塞模式 socketChannel.configureBlocking(false); //将选择器注册到各个信道 socketChannel.register(selector, SelectionKey.OP_ACCEPT); } } } } public static void tcpServer() throws IOException { //创建一个实现了协议接口的对象 TCPProtocol protocol = new EchoSelectorProtocol(BUF_SIZE); while (true) { //一直等待,直至到准备好了io操作 if (selector.select(timeout) == 0) { //等待TCP receive data: System.out.println("write..."); continue; } //获取准备好的信道所关联的Key集合的iterator实例 Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator(); //循环取得集合中的每个键值 while (keyIterable.hasNext()) { SelectionKey key = keyIterable.next(); // 处理事件 if (key.isAcceptable()) { protocol.handleAccept(key); } //读取信息 if (key.isReadable()) { protocol.handleRead(key); } //判断并发送信息 if(key.isValid() && key.isWritable()){ if(signal == true){ protocol.handleWrite(key); } } //这里需要手动从键集中移除当前的key keyIterable.remove(); } signal = false; } } }
package com.js.archive.tcp;import java.io.IOException;import java.nio.channels.SelectionKey;import java.util.HashMap;public interface TCPProtocol { /** * 接收一个SocketChannel的处理 * @param key * @throws IOException */ void handleAccept(SelectionKey key) throws IOException; /** * 从一个SocketChannel读取信息的处理 * @param key * @throws IOException */ void handleRead(SelectionKey key) throws IOException; /** * 向一个SocketChannel写入信息的处理 * @param key * @throws IOException */ void handleWrite(SelectionKey key) throws IOException;}
package com.js.archive.tcp;import com.js.archive.tcp.messageHandler.HeartMessageHandler;import com.js.archive.tcp.messageHandler.HumitureMessageHandler;import com.js.archive.tcp.messageHandler.RFBoardFailure;import com.js.archive.tcp.messageHandler.TcpMessage;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.List;@Slf4jpublic class EchoSelectorProtocol implements TCPProtocol { private int bufSize;//缓冲区长度 public EchoSelectorProtocol(int bufSize) { this.bufSize = bufSize; } /** * 服务端信道已经准备好了接收新的客户端连接 */ // 处理连接事件 @Override public void handleAccept(SelectionKey key) throws IOException { SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); channel.configureBlocking(false); System.out.println("客户端套接字:"+channel.getRemoteAddress()); //将选择器注册到连接到的客户端信道,并指定该信道key值的属性为OP_READ,同时为该信道指定关联的附件 channel.register(key.selector(), SelectionKey.OP_READ); } /** * 客户端信道已经准备好了从信道中读取数据到缓冲区 */ // 处理读事件 @Override public void handleRead(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buff = ByteBuffer.allocate(bufSize); //如果read()方法返回-1说明客户端关闭了链接,那么客户端已经接收到了与自己发送字节数 long bytesRead=0; try{ bytesRead = channel.read(buff); }catch(IOException e){ key.cancel(); channel.socket().close(); channel.close(); return; } if (bytesRead <=0) { channel.close(); } else if (bytesRead > 0) { buff.flip(); byte[] array = new byte[buff.remaining()]; buff.get(array); //todo xurunfei 连帧 $sdfasdf#$dsfdsf#处理 //n内网公网是否都可以 String dataSum = new String(array); List<String> datalist =getMessagelist(dataSum); for (String data:datalist ) { if (!data.contains("$") && !data.contains("#")) { handlerMessage(data,key); } else { log.error("tcp data format error {}", data); } } //如果缓冲区总读入了数据,则将该信道感兴趣的炒作设置为可读可写// key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); key.interestOps(SelectionKey.OP_READ| SelectionKey.OP_WRITE); buff.compact(); } } /** * 客户端信道已经准备好了将数据从缓冲区写入信道 */ @Override public void handleWrite(SelectionKey key) throws IOException { String mss=TcpMessage.getCookieByMessage(TcpServer.message);//解析得到传入信息中包含的机器序列号,进行筛选,选择指定的客户端 if(mss.equals((String)key.attachment())){ ByteBuffer buff = ByteBuffer.allocate(bufSize); buff.clear(); //拼接消息头尾 buff.put(("$"+TcpServer.message+"#").getBytes()); buff.flip(); while(buff.hasRemaining()) { SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.write(buff); } buff.compact(); }else { return; } } /** * 处理消息逻辑 解析服务端送来的信息 * @param message 消息体 */ public void handlerMessage(String message,SelectionKey key) { int funCode = TcpMessage.getFunCodeByMessage(message); //解析客服端信息,得到客户端序列号 String cookie =TcpMessage.getCookieByMessage(message); //作为唯一标识绑定进Key,方便指定用户发送信息。 key.attach(cookie); // 13功能:档案ID心跳包,每10秒档案柜向后台发送1个档案ID心跳包 if (funCode == 13) { TcpMessage tcpMessage = new HeartMessageHandler(message); log.debug(" 心跳包 message :"+tcpMessage+" "+((HeartMessageHandler) tcpMessage).getArchivesIdList()); //14功能:温湿度上传 } else if (funCode == 14) { TcpMessage tcpMessage=new HumitureMessageHandler(message); log.debug(" 温湿度 message :"+tcpMessage+" "+((HumitureMessageHandler) tcpMessage).getTemperatuerList()); // 15功能:存取档案变化上传 } else if (funCode == 15) { log.debug("存取档案变化" ); } } /** * 得到去除消息$和#的messagelist */ public ArrayList getMessagelist(String data){ ArrayList messagelist = new ArrayList(); String[] strs=data.split("#"); for(int i=0,len=strs.length;i<len;i++){ messagelist.add(strs[i].substring(1,strs[i].length())); } return messagelist; }}
服务器主动发送信息!!!!!
package com.js.archive.tcp;import java.util.Scanner;public class ServerWriteThread implements Runnable { /** * 服务器端写线程 */ public ServerWriteThread(){ new Thread(this).start(); } public void run(){ Scanner s= new Scanner(System.in); while(true){ System.out.println("输入:"); TcpServer.message = s.next(); TcpServer.signal = true; } }}
这里只是利用key的附件功能,简单的实现了客户端信息的保存,简单实现了指定客户端发送消息!
当然也可以选择使用一个Map将key和客户端id进行绑定
以上仅是本人作为菜鸟的一点小心得。。仅供参考!