Voovan 框架介绍
Voovan开源项目启动于2015年,始于自己在使用 Netty 和 Mina 时有较多难以理解的部分,同时在使用过程中遇到对粘包等问题的困扰,后来经过不断的对源码的学习以及对 java 异步通信的深入理解发现 自 java 1.7以后 JDK 提供了更优秀的异步通信模型 AIO,随后决定自己参照 AIO 模型重新造一个轮子。并在开发的过程中对使用到的各类工具方法等做了整理,形成了一个常用并且简单易用的工具包。
- 可灵活实现Socket通信粘包的支持(代码中包含 HTTP协议,字符串换行,定长报文的粘包实现)。
- 支持 SSL/TLS 加密通信。
- 提供线程池依据系统负载情况自动动态调整。
- 同时支持 NIO 和 AIO 特性。
- 采用非阻塞方式的异步传输。
- 事件驱动(Connect、Recive、Sent、Close、Exception),采用回调的方式完成调用。
- 可灵活的加载过滤器机制。
###步骤介绍###
想要发起一个 Socket 连接仅仅需要一下四个步骤:
- 实例化一个Socket对象: AioSocket 或 NioSocket 或 UdpSocket,用于连接。
- 实例化一个消息分割器用来处理粘包问题。
- 实例化一个过滤器,IoFilter。
- 实例化一个Socket业务处理句柄: IoHandle。
###Step1: 实现一个Socket连接对象### 实例化Socket连接有两个类可以采取实例化动作.
- AioSocket: 采用 JDK 的 AIO 模型的异步通信,JDK > 1.7。
- NioSocket: 采用 JDK 的 NIO 模型的异步通信,JDK > 1.4。
- UdpSocket: 采用 JDK 的 UDP 模型的异步通信,JDK > 1.4。
下面我们来看看这两个类的构造方法:
public NioSocket(String host,int port,int readTimeout) throws IOException
public AioSocket(String host,int port,int readTimeout) throws IOException
public UdpSocket(String host,int port,int readTimeout) throws IOException
我们可以看到这两个类的构造方法都具有三个参数:
- host: 服务发布地址。
- port: 服务发布端口。
- readTimeout: 读取超时时间
下面我们来实例化一个Socket连接对象:
AioSocket socket = new AioSocket("127.0.0.1",2031,300);
实际使用中如果你想构造一个 Nio 模型的 Socket 连接,请将 AioSocket 替换成 NioSocket 即可。
###Step2: 实现一个消息分割器### 消息分割器是用来处理消息粘包的一个补充类,相对于 Netty 和 Mina 是一个特殊的地方. 注意:消息分割器是工作在过滤器之前的. 消息分割器是在 Socket 连接器接受到消息后对消息的内容进行判断是否是一个完成消息报文,如果是一个完成消息报文则返回给过滤器来处理,如果不是则等待消息报文被完整接受,如果一直接受的消息报文都不完整则一直等待,这个时候我们可以通过超时来控制尝试间接收不到消息的情况,具体参考TimeOutMesssageSplitter分割器的实现,也可以直接实例化这个分割器在你自定义的分割器中通过业务代码来判断是否需要使用超时。
下面我们给第一步实例化的 Socket 连接对象增加一个分割器:
socket.messageSplitter(new LineMessageSplitter());
Voovan 框架已经包括了一些消息分割器的实现在org.voovan.network.messagesplitter包内。
BufferLengthSplitter
: 消息定长分割器LineMessageSplitter
: 换行消息分割器HttpMessageSplitter
: Http1.1消息分割器TimeOutMesssageSplitter
: 超时消息分割器
自定义一个消息过滤器需要实现MessageSplitte接口,接口的源码:
package org.voovan.network;
public interface MessageSplitter {
public int canSplite(IoSession session,byte[] buffer);
}
通过源码我们可以发现,如果想实现一个消息分割器我们需要实现一个canSplite方法:
canSplite 方法
: 判断消息是否可分割。 这两个方法有两个相同的参数:IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。buffer参数
: Socket 接收到的所有字节。 ***`返回对象:如果消息是可分割的则返回 true,buffer 参数中的字节将会交给过滤器来处理,如果返回 false 则继续等待 Socket 接受新的字节。
下面我们给出框架实现的TimeOutMesssageSplitter的源码以供参考:
public class TimeOutMesssageSplitter implements MessageSplitter {
private long initTime;
public TimeOutMesssageSplitter(){
initTime = -1;
}
@Override
public int canSplite(IoSession session, byte[] buffer) {
int timeOut = session.sockContext().getReadTimeout();
long currentTime = System.currentTimeMillis();
if(initTime==-1){
initTime = currentTime;
}
if(currentTime-initTime >= timeOut){
return byteBuffer.limit();
}else{
return -1;
}
}
}
###Step3: 实现一个过滤器### 过滤器可以在 Socket 通信中对传递的字节流进行解码和编码操作,比如:我们传递的报文是 JSON 数据格式,那么我们可以通过实现一个过滤器在发送一个对象作为消息时将对象转换成 JSON 字符串通过 Socket 发送,同时在Socket接受到消息后将接收到的 JSON 字符传转换成对象。
我们可以定义多个过滤器形成一个过滤器链,这样可以提高部分过滤器的复用性. 在第一步实例化好的Socket连接对象中调用增加过滤器方法可以向 Socket 连接对象增加过滤器。 增加的过滤器在过滤器链中是有先后顺序的,例如:在使用 add 方法加入的过滤器则在过滤器的最后一个.在解码的过程中过滤器的方法 decode 时是按照加入的从第一个到最后一个的顺序调用的.在编码的过程中过滤器方法 encode 是按照最后一个到第一个的顺序调用的。
下面我们给第一步实例化的 Socket 连接对象增加一个过滤器:
socket.filterChain().add(new StringFilter());
其中我们通过socket.filterChain()获取过滤器链,然后通过过滤器链的 add 方法增加一个名为StringFilter的过滤器。
Voovan 框架已经包括了一个过滤器的实现: StringFilter过滤器,用于将字节流转换成字符串。
如果我们要根据自己的需求定义一个自定义过滤器,那么我们的过滤器实现一个 IoFilter 接口. 下面我们给出 IoFilter 接口的源码:
package org.voovan.network;
import org.voovan.network.exception.IoFilterException;
public interface IoFilter {
public Object decode(IoSession session,Object object) throws IoFilterException;
public Object encode(IoSession session,Object object)throws IoFilterException;
}
通过源码我们可以发现,如果想实现一个过滤器我们需要实现两个过滤器方法:
decode 方法
: 过滤器解码函数,接收事件(onRecive)前调用encode 方法
: 过滤器编码函数,发送事件(onSend)前调用 这两个方法有两个相同的参数:IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等.object参数
: 上一个过滤器的处理结果,如果只有一个过滤器则是业务代码中发送数据对象. ***返回对象 ***
过滤器处理过的返回结果,被下一个过滤器调用,如果是最后一个过滤器那么这个结果则会传入Socket业务处理句柄的 onRecive 方法。
下面我们给出框架实现的StringFilter的源码以供参考:
public class StringFilter implements IoFilter {
@Override
public Object encode(IoSession session,Object object) {
if(object instanceof String){
String sourceString = TObject.cast(object);
return ByteBuffer.wrap(sourceString.getBytes());
}
return object;
}
@Override
public Object decode(IoSession session,Object object) {
if(object instanceof ByteBuffer){
return TByteBuffer.toString((ByteBuffer)object);
}
return object;
}
}
###Step4: 实现一个Socket业务处理句柄### 定义Socket 业务处理句柄需要实现IoHandler接口. 下面我们给出 IoFilter 接口的源码:
package org.voovan.network;
public interface IoHandler {
public Object onConnect(IoSession session);
public void onDisconnect(IoSession session);
public Object onReceive(IoSession session,Object obj);
public void onSent(IoSession session,Object obj);
public void onException(IoSession session,Exception e);
}
下面我们对5个方法做逐个说明:
public Object onConnect(IoSession session);
当Socket 连接成功后会回调这个方法。 IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 ***返回值
:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。
public void onDisconnect(IoSession session);
当Socket 连接断开后会回调这个方法。 IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。
public Object onReceive(IoSession session,Object obj);
当Socket 接受到数据,并且经过消息分割器分割后再经过过滤器的decode方法处理后的数据。 IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等。 obj参数
: 接受的数据,这个数据是经过消息分割器和过滤器处理后的数据。 ***返回值
:***返回一个对象,这个对象将会由 Socket 进行发送,如果返回 null 则不发送任何数据。
public void onSent(IoSession session,Object obj);
当Socket 发送成功后会回调这个方法. IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. obj参数
: 发送的数据,这个数据是经过过滤器处理后的数据。
public void onException(IoSession session,Exception e);
当Socket 处理过程中发生异常则回调这个方法。 IoSession参数
: 当前 Socket 的对话对象,可以保存会话变量,获取 Socket 上下文对象等等. e参数
: Exception 对象描述这个异常。
使用 session.close() 来关闭 socket 连接。
下面我们给第一步实例化的 Socket 连接对象增加一个业务处理句柄:
socket.handler(new ClientHandlerTest());
下面我们给出一个实现的样例:
public class ClientHandlerTest implements IoHandler {
@Override
public Object onConnect(IoSession session) {
Logger.simple("onConnect");
session.setAttribute("key", "attribute value");
String msg = new String("test message\r\n");
return msg;
}
@Override
public void onDisconnect(IoSession session) {
Logger.simple("onDisconnect");
}
@Override
public Object onReceive(IoSession session, Object obj) {
//+"["+session.remoteAddress()+":"+session.remotePort()+"]"
Logger.simple("Client onRecive: "+obj.toString());
Logger.simple("Attribute onRecive: "+session.getAttribute("key"));
session.close();
return obj;
}
@Override
public void onException(IoSession session, Exception e) {
Logger.simple("Client Exception");
Logger.error(e);
session.close();
}
@Override
public void onSent(IoSession session, Object obj) {
ByteBuffer sad = (ByteBuffer)obj;
sad = (ByteBuffer)sad.rewind();
Logger.simple("Client onSent: "+new String(sad.array()));
}
}
###Step5: 启动socket### 完整的服务实例:
public class AioSocketTest {
public static void main(String[] args) throws Exception {
AioSocket socket = new AioSocket("127.0.0.1",2031,300);
socket.handler(new ClientHandlerTest());
socket.filterChain().add(new StringFilter());
socket.messageSplitter(new LineMessageSplitter());
socket.start();
Logger.simple("Terminate");
}
}
你可能发现我们的过滤器、分割器、业务处理句柄没有按照我们上面的顺序来设置,是的这个设置顺序是没有要求的,只要在 start()方法被调用前设置都可以生效。