Voovan开发指南 (二) Socket客户端开发

Wesley13
• 阅读 782

Voovan 框架介绍

Voovan开源项目启动于2015年,始于自己在使用 Netty 和 Mina 时有较多难以理解的部分,同时在使用过程中遇到对粘包等问题的困扰,后来经过不断的对源码的学习以及对 java 异步通信的深入理解发现 自 java 1.7以后 JDK 提供了更优秀的异步通信模型 AIO,随后决定自己参照 AIO 模型重新造一个轮子。并在开发的过程中对使用到的各类工具方法等做了整理,形成了一个常用并且简单易用的工具包。

  • 可灵活实现Socket通信粘包的支持(代码中包含 HTTP协议,字符串换行,定长报文的粘包实现)。
  • 支持 SSL/TLS 加密通信。
  • 提供线程池依据系统负载情况自动动态调整。
  • 同时支持 NIO 和 AIO 特性。
  • 采用非阻塞方式的异步传输。
  • 事件驱动(Connect、Recive、Sent、Close、Exception),采用回调的方式完成调用。
  • 可灵活的加载过滤器机制。

###步骤介绍###

想要发起一个 Socket 连接仅仅需要一下四个步骤:

  • 实例化一个Socket对象: AioSocket 或 NioSocket 或 UdpSocket,用于连接。
  • 实例化一个消息分割器用来处理粘包问题。
  • 实例化一个过滤器,IoFilter。
  • 实例化一个Socket业务处理句柄: IoHandle。

###Step1: 实现一个Socket连接对象### 实例化Socket连接有两个类可以采取实例化动作.

  • AioSocket: 采用 JDK 的 AIO 模型的异步通信,JDK > 1.7。
  • NioSocket: 采用 JDK 的 NIO 模型的异步通信,JDK > 1.4。
  • UdpSocket: 采用 JDK 的 UDP 模型的异步通信,JDK > 1.4。

下面我们来看看这两个类的构造方法:

public NioSocket(String host,int port,int readTimeout) throws IOException
public AioSocket(String host,int port,int readTimeout) throws IOException
public UdpSocket(String host,int port,int readTimeout) throws IOException

我们可以看到这两个类的构造方法都具有三个参数:

  • host: 服务发布地址。
  • port: 服务发布端口。
  • readTimeout: 读取超时时间

下面我们来实例化一个Socket连接对象:

AioSocket socket = new AioSocket("127.0.0.1",2031,300);

实际使用中如果你想构造一个 Nio 模型的 Socket 连接,请将 AioSocket 替换成 NioSocket 即可。


###Step2: 实现一个消息分割器### 消息分割器是用来处理消息粘包的一个补充类,相对于 Netty 和 Mina 是一个特殊的地方. 注意:消息分割器是工作在过滤器之前的. 消息分割器是在 Socket 连接器接受到消息后对消息的内容进行判断是否是一个完成消息报文,如果是一个完成消息报文则返回给过滤器来处理,如果不是则等待消息报文被完整接受,如果一直接受的消息报文都不完整则一直等待,这个时候我们可以通过超时来控制尝试间接收不到消息的情况,具体参考TimeOutMesssageSplitter分割器的实现,也可以直接实例化这个分割器在你自定义的分割器中通过业务代码来判断是否需要使用超时。

下面我们给第一步实例化的 Socket 连接对象增加一个分割器:

socket.messageSplitter(new LineMessageSplitter());

Voovan 框架已经包括了一些消息分割器的实现在org.voovan.network.messagesplitter包内。

  • BufferLengthSplitter: 消息定长分割器
  • LineMessageSplitter: 换行消息分割器
  • HttpMessageSplitter: Http1.1消息分割器
  • TimeOutMesssageSplitter: 超时消息分割器

自定义一个消息过滤器需要实现MessageSplitte接口,接口的源码:

package org.voovan.network;

public interface MessageSplitter {

    public int canSplite(IoSession session,byte[] buffer);
    
}

通过源码我们可以发现,如果想实现一个消息分割器我们需要实现一个canSplite方法:

  • canSplite 方法: 判断消息是否可分割。 这两个方法有两个相同的参数:
  • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。
  • buffer参数: Socket 接收到的所有字节。 ***`返回对象:如果消息是可分割的则返回 true,buffer 参数中的字节将会交给过滤器来处理,如果返回 false 则继续等待 Socket 接受新的字节。

下面我们给出框架实现的TimeOutMesssageSplitter的源码以供参考:

public class TimeOutMesssageSplitter implements MessageSplitter {

   private long initTime;
   
   public TimeOutMesssageSplitter(){
       initTime = -1;
   }
   
   @Override
   public int canSplite(IoSession session, byte[] buffer) {
        int timeOut = session.sockContext().getReadTimeout();
       long currentTime = System.currentTimeMillis();
       if(initTime==-1){
               initTime = currentTime;
       } 
    
       if(currentTime-initTime >= timeOut){
               return byteBuffer.limit();
       }else{
               return -1;
       }
   }

}

###Step3: 实现一个过滤器### 过滤器可以在 Socket 通信中对传递的字节流进行解码和编码操作,比如:我们传递的报文是 JSON 数据格式,那么我们可以通过实现一个过滤器在发送一个对象作为消息时将对象转换成 JSON 字符串通过 Socket 发送,同时在Socket接受到消息后将接收到的 JSON 字符传转换成对象。

我们可以定义多个过滤器形成一个过滤器链,这样可以提高部分过滤器的复用性. 在第一步实例化好的Socket连接对象中调用增加过滤器方法可以向 Socket 连接对象增加过滤器。 增加的过滤器在过滤器链中是有先后顺序的,例如:在使用 add 方法加入的过滤器则在过滤器的最后一个.在解码的过程中过滤器的方法 decode 时是按照加入的从第一个到最后一个的顺序调用的.在编码的过程中过滤器方法 encode 是按照最后一个到第一个的顺序调用的。

下面我们给第一步实例化的 Socket 连接对象增加一个过滤器:

socket.filterChain().add(new StringFilter());

其中我们通过socket.filterChain()获取过滤器链,然后通过过滤器链的 add 方法增加一个名为StringFilter的过滤器。

Voovan 框架已经包括了一个过滤器的实现: StringFilter过滤器,用于将字节流转换成字符串。

如果我们要根据自己的需求定义一个自定义过滤器,那么我们的过滤器实现一个 IoFilter 接口. 下面我们给出 IoFilter 接口的源码:

package org.voovan.network;

import org.voovan.network.exception.IoFilterException;

public interface IoFilter {

public Object decode(IoSession session,Object object) throws IoFilterException;

public Object encode(IoSession session,Object object)throws IoFilterException;
}

通过源码我们可以发现,如果想实现一个过滤器我们需要实现两个过滤器方法:

  • decode 方法: 过滤器解码函数,接收事件(onRecive)前调用
  • encode 方法: 过滤器编码函数,发送事件(onSend)前调用 这两个方法有两个相同的参数:
  • IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等.
  • object参数: 上一个过滤器的处理结果,如果只有一个过滤器则是业务代码中发送数据对象. ***返回对象 *** 过滤器处理过的返回结果,被下一个过滤器调用,如果是最后一个过滤器那么这个结果则会传入Socket业务处理句柄的 onRecive 方法。

下面我们给出框架实现的StringFilter的源码以供参考:

public class StringFilter implements IoFilter {

    @Override
    public Object encode(IoSession session,Object object) {
        if(object instanceof String){
            String sourceString = TObject.cast(object);
            return ByteBuffer.wrap(sourceString.getBytes());
        }
        return object;
    }

    @Override
    public Object decode(IoSession session,Object object) {
        if(object instanceof ByteBuffer){
            return TByteBuffer.toString((ByteBuffer)object);
        }
        return object;
    }
}

###Step4: 实现一个Socket业务处理句柄### 定义Socket 业务处理句柄需要实现IoHandler接口. 下面我们给出 IoFilter 接口的源码:

package org.voovan.network;

public interface IoHandler {
    public Object onConnect(IoSession session);
    
    public void onDisconnect(IoSession session);
    
    public Object onReceive(IoSession session,Object obj);
    
    public void onSent(IoSession session,Object obj);    
    
    public void onException(IoSession session,Exception e);
}

下面我们对5个方法做逐个说明:

 public Object onConnect(IoSession session);

当Socket 连接成功后会回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 ***返回值:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。

 public void onDisconnect(IoSession session);

当Socket 连接断开后会回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。

 public Object onReceive(IoSession session,Object obj);

当Socket 接受到数据,并且经过消息分割器分割后再经过过滤器的decode方法处理后的数据。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 obj参数: 接受的数据,这个数据是经过消息分割器和过滤器处理后的数据。 ***返回值:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。

 public void onSent(IoSession session,Object obj);

当Socket 发送成功后会回调这个方法. IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. obj参数: 发送的数据,这个数据是经过过滤器处理后的数据。

 public void onException(IoSession session,Exception e);

当Socket 处理过程中发生异常则回调这个方法。 IoSession参数: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. e参数: Exception 对象描述这个异常。

使用 session.close() 来关闭 socket 连接。

下面我们给第一步实例化的 Socket 连接对象增加一个业务处理句柄:

socket.handler(new ClientHandlerTest());

下面我们给出一个实现的样例:

public class ClientHandlerTest implements IoHandler {

   @Override
   public Object onConnect(IoSession session) {
       Logger.simple("onConnect");
       session.setAttribute("key", "attribute value");
       String msg = new String("test message\r\n");
       return msg;
   }

   @Override
   public void onDisconnect(IoSession session) {
       Logger.simple("onDisconnect");
   }

   @Override
   public Object onReceive(IoSession session, Object obj) {
       //+"["+session.remoteAddress()+":"+session.remotePort()+"]"
       Logger.simple("Client onRecive: "+obj.toString());
       Logger.simple("Attribute onRecive: "+session.getAttribute("key"));
       session.close();
       return obj;
   }

   @Override
   public void onException(IoSession session, Exception e) {
       Logger.simple("Client Exception");
       Logger.error(e);
       session.close();
   }

   @Override
   public void onSent(IoSession session, Object obj) {
       ByteBuffer sad = (ByteBuffer)obj;
       sad = (ByteBuffer)sad.rewind();
       Logger.simple("Client onSent: "+new String(sad.array()));
   }
}

###Step5: 启动socket### 完整的服务实例:

public class AioSocketTest {
    
    public static void main(String[] args) throws Exception {
        AioSocket socket = new AioSocket("127.0.0.1",2031,300);
        socket.handler(new ClientHandlerTest());
        socket.filterChain().add(new StringFilter());
        socket.messageSplitter(new LineMessageSplitter());
        socket.start();
        Logger.simple("Terminate");
    }
}

你可能发现我们的过滤器、分割器、业务处理句柄没有按照我们上面的顺序来设置,是的这个设置顺序是没有要求的,只要在 start()方法被调用前设置都可以生效。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
Voovan开发指南 (一) Socket服务端开发
Voovan框架介绍Voovan开源项目启动于2015年,始于自己在使用Netty和Mina时有较多难以理解的部分,同时在使用过程中遇到对粘包等问题的困扰,后来经过不断的对源码的学习以及对java异步通信的深入理解发现自java1.7以后JDK提供了更优秀的异步通信模型AIO,随后决定自己参照AIO模型重新造一个轮子。并
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这