Nio服务器和客户端相互通信,服务器接受且能主动推送消息给客服端

Stella981
• 阅读 486

先介绍一下项目需求,主要是服务器接受客服端(电子柜台)传来的心跳信息,服务器也能主动发送信息给客户端

最近看了很多帖子,大多是服务器接受信息,然后被动回应客服端,这里我简单的做了一个管理客户的列表。用于指定发送

(当然就是本地操作成功,还未完全测试,应该还是存在很多Bug,仅供参考!!)

先说说Nio,这里就直接贴大神的链接啦:https://gitbook.cn/books/5b1792ad26a49a55324e782c/index.html

还有就是这里这套学校管理老师的比喻,就很形象,比较适合我这种菜鸟理解:https://www.cnblogs.com/wcyBlog/p/4716676.html

然后就是代码了!!!!!

服务器代码如下:

package com.js.archive.tcp;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.util.HashMap;import java.util.Iterator;public class TcpServer {    //缓冲区长度    private static final int BUF_SIZE = 28000;    //select 方法等待星道准备好的最长时间    private static final int timeout = 3000;    public static Selector selector;    /**     * 发送消息的开关     */    public static boolean signal = false;    /**     * 发送消息的内容     */    public static String message= "";    public static void main(String[] args) throws IOException {        init("192.168.0.166",1006);        new ServerWriteThread();        tcpServer();    }    public static void init(String hostname,Integer port) throws IOException {        if (selector == null) {            synchronized (TcpServer.class) {                if (selector == null) {                    //创建一个选择器                    selector = Selector.open();                    //实例化一个信道                    ServerSocketChannel socketChannel  = ServerSocketChannel.open();                    //将该信道绑定到指定端口                    InetSocketAddress inetSocketAddress = new InetSocketAddress(hostname, port);                    socketChannel .bind(inetSocketAddress);                    System.out.println("服务端套接字"+socketChannel .socket().getLocalSocketAddress());                    //配置信道为非阻塞模式                    socketChannel.configureBlocking(false);                    //将选择器注册到各个信道                    socketChannel.register(selector, SelectionKey.OP_ACCEPT);                }            }        }    }    public static void tcpServer() throws IOException {        //创建一个实现了协议接口的对象        TCPProtocol protocol = new EchoSelectorProtocol(BUF_SIZE);        while (true) {            //一直等待,直至到准备好了io操作            if (selector.select(timeout) == 0) {                //等待TCP receive data:                System.out.println("write...");                continue;            }            //获取准备好的信道所关联的Key集合的iterator实例            Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator();            //循环取得集合中的每个键值            while (keyIterable.hasNext()) {                SelectionKey key = keyIterable.next();                // 处理事件                if (key.isAcceptable()) {                    protocol.handleAccept(key);                }                //读取信息                 if (key.isReadable()) {                    protocol.handleRead(key);                 }                 //判断并发送信息                if(key.isValid() && key.isWritable()){                    if(signal == true){                        protocol.handleWrite(key);                    }                }                //这里需要手动从键集中移除当前的key                keyIterable.remove();            }            signal = false;        }    }    }

package com.js.archive.tcp;import java.io.IOException;import java.nio.channels.SelectionKey;import java.util.HashMap;public interface TCPProtocol {    /**     * 接收一个SocketChannel的处理     * @param key     * @throws IOException     */    void handleAccept(SelectionKey key) throws IOException;    /**     * 从一个SocketChannel读取信息的处理     * @param key     * @throws IOException     */    void handleRead(SelectionKey key) throws IOException;    /**     * 向一个SocketChannel写入信息的处理     * @param key     * @throws IOException     */    void handleWrite(SelectionKey key) throws IOException;}



package com.js.archive.tcp;import com.js.archive.tcp.messageHandler.HeartMessageHandler;import com.js.archive.tcp.messageHandler.HumitureMessageHandler;import com.js.archive.tcp.messageHandler.RFBoardFailure;import com.js.archive.tcp.messageHandler.TcpMessage;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.List;@Slf4jpublic class EchoSelectorProtocol implements TCPProtocol {    private int bufSize;//缓冲区长度    public EchoSelectorProtocol(int bufSize) {        this.bufSize = bufSize;    }    /**     * 服务端信道已经准备好了接收新的客户端连接     */    // 处理连接事件    @Override    public void handleAccept(SelectionKey key) throws IOException {        SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();        channel.configureBlocking(false);        System.out.println("客户端套接字:"+channel.getRemoteAddress());        //将选择器注册到连接到的客户端信道,并指定该信道key值的属性为OP_READ,同时为该信道指定关联的附件        channel.register(key.selector(), SelectionKey.OP_READ);    }    /**     * 客户端信道已经准备好了从信道中读取数据到缓冲区     */    // 处理读事件    @Override    public void handleRead(SelectionKey key) throws IOException {        SocketChannel channel = (SocketChannel) key.channel();        ByteBuffer buff = ByteBuffer.allocate(bufSize);        //如果read()方法返回-1说明客户端关闭了链接,那么客户端已经接收到了与自己发送字节数        long  bytesRead=0;        try{            bytesRead = channel.read(buff);        }catch(IOException e){            key.cancel();            channel.socket().close();            channel.close();            return;        }        if (bytesRead <=0) {            channel.close();        } else if (bytesRead > 0) {            buff.flip();            byte[] array = new byte[buff.remaining()];            buff.get(array);            //todo xurunfei 连帧 $sdfasdf#$dsfdsf#处理            //n内网公网是否都可以            String dataSum = new String(array);            List<String> datalist =getMessagelist(dataSum);            for (String data:datalist                 ) {                if (!data.contains("$") && !data.contains("#")) {                    handlerMessage(data,key);                } else {                    log.error("tcp data format error {}", data);                }            }            //如果缓冲区总读入了数据,则将该信道感兴趣的炒作设置为可读可写//            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);            key.interestOps(SelectionKey.OP_READ| SelectionKey.OP_WRITE);            buff.compact();        }    }    /**     * 客户端信道已经准备好了将数据从缓冲区写入信道     */    @Override    public void handleWrite(SelectionKey key) throws IOException {        String mss=TcpMessage.getCookieByMessage(TcpServer.message);//解析得到传入信息中包含的机器序列号,进行筛选,选择指定的客户端         if(mss.equals((String)key.attachment())){          ByteBuffer buff = ByteBuffer.allocate(bufSize);          buff.clear();  //拼接消息头尾          buff.put(("$"+TcpServer.message+"#").getBytes());          buff.flip();          while(buff.hasRemaining()) {            SocketChannel socketChannel = (SocketChannel) key.channel();            socketChannel.write(buff);        }        buff.compact();    }else {        return;    }        }    /**     * 处理消息逻辑   解析服务端送来的信息     * @param message 消息体     */    public void handlerMessage(String message,SelectionKey key) {        int funCode = TcpMessage.getFunCodeByMessage(message);     //解析客服端信息,得到客户端序列号        String cookie =TcpMessage.getCookieByMessage(message);     //作为唯一标识绑定进Key,方便指定用户发送信息。        key.attach(cookie);        //  13功能:档案ID心跳包,每10秒档案柜向后台发送1个档案ID心跳包        if (funCode == 13) {            TcpMessage tcpMessage = new HeartMessageHandler(message);            log.debug(" 心跳包 message :"+tcpMessage+"      "+((HeartMessageHandler) tcpMessage).getArchivesIdList());        //14功能:温湿度上传        } else if (funCode == 14) {            TcpMessage tcpMessage=new HumitureMessageHandler(message);            log.debug(" 温湿度 message :"+tcpMessage+"       "+((HumitureMessageHandler) tcpMessage).getTemperatuerList());        //  15功能:存取档案变化上传        } else if (funCode == 15) {            log.debug("存取档案变化" );             }    }    /**     * 得到去除消息$和#的messagelist     */    public ArrayList getMessagelist(String data){        ArrayList messagelist = new ArrayList();        String[] strs=data.split("#");        for(int i=0,len=strs.length;i<len;i++){            messagelist.add(strs[i].substring(1,strs[i].length()));        }        return messagelist;    }}

服务器主动发送信息!!!!!

package com.js.archive.tcp;import java.util.Scanner;public class ServerWriteThread implements Runnable {    /**     * 服务器端写线程     */    public ServerWriteThread(){        new Thread(this).start();    }    public void run(){        Scanner s= new Scanner(System.in);        while(true){            System.out.println("输入:");            TcpServer.message = s.next();            TcpServer.signal = true;        }    }}

这里只是利用key的附件功能,简单的实现了客户端信息的保存,简单实现了指定客户端发送消息!

当然也可以选择使用一个Map将key和客户端id进行绑定  

以上仅是本人作为菜鸟的一点小心得。。仅供参考!

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
待兔 待兔
2个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Stella981 Stella981
2年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
8个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这