前言
在java的网络编程世界里,Netty的地位可谓是举足轻重,说到基于NIO的网络编程,Netty几乎成为企业的首选,本文不会过多介绍Netty的基本使用等知识,本文着重介绍在Netty中如何实现同一个端口,既能接收TCP请求,也能接收Http请求。
由于一些特殊的原因,我要实现一款消息中间件,暂时称为“企业消息总线”吧。简单描述一下场景,对如果有相同或者类似场景的小伙伴可能有帮助。在企业内部,特别是制造业企业,往往会存在生产管理系统和生产控制系统。管理类系统我们都很熟悉了,但是控制类系统更多的是控制,比如控制机器的运行,接收机器的传感器传递上来的数据,然而管理系统如果要收集生产现场的数据,一般都是通过控制系统上传到管理系统,如果管理系统想要控制机器,则需要下发一些数据到控制系统,因此就存在一个管理系统到控制系统之间的通讯需求。一般的控制系统和管理系统之间都是使用TCP进行通讯,当然现在也有支持使用HTTP直接通信的,具体可以参考如下简图
今天只讨论使用TCP进行通讯的模式
需求提出
上面简单介绍了管理系统和控制系统之间的通讯模式,我们都知道,在网络编程里面,比较关注的两点就是_通讯协议_和_序列化协议_。通讯协议一般就是指传输层协议TCP/UDP或者应用层协议HTTP等协议。而序列化的就多了,常见的有protocol buffer,json或者自定义的某协议。
上图只是简单的一个机器的控制系统直接和管理系统进行通讯,但是真实场景只是这么简单吗?当然不是了,真实场景往往是管理系统与控制系统是一对多的存在,不同产线不同的设备对应到的控制系统一般都是不同的,因此,如果采用这种模式进行通讯,那么管理系统的小伙伴要抓狂了。而且实际场景里,管理类系统也不只是一套,管理类系统之间也存在通讯的需求,并且大企业内部的管理系统大多还是异构系统,开发语言和运行平台什么的都可能存在较大差异,比如ERP类系统和MES类系统就需要进行通讯,而MES类系统又需要和控制系统进行通讯,为了解决这样一个问题,我们就需要引入一个企业级消息总线。
具体可以参考下图
通过上图我们可以看到,如果引入了企业级消息总线的话,总线和控制系统之间通过TCP通信,总线和各管理系统通过HTTP通信,这样设计就完全解耦开来了,各个管理系统不需要配置N个控制系统的地址,而控制系统也只管跟总线通讯即可。
端口复用
因为本文不是想介绍企业级消息总线如何设计和实现,上面说了那么多,只想引出一个问题,也就是今天要探讨的问题。因为总线既要支持TCP也要支持HTTP两种协议,那么传统的做法是怎么样的呢?传统的做法可以开两个端口一个监听HTTP服务,一个监听TCP服务,这样子做是没有问题的,但是我们都知道,HTTP协议是基于TCP协议的,能不能只开一个端口,然后同时支持TCP和HTTP两种协议呢?答案肯定是可以的。
下面就是本文的主角Netty登场啦。
在翻阅了很多资料,在netty的example里找到了这么一个类io.netty.example.portunification.PortUnificationServerHandler
,有兴趣的朋友可以参考下这个类的实现,我的实现也是参考这个类的。
代码实现
核心的PortUnificationServerHandler
的实现如下
/**
* 统一端口的处理器
* <p>
* 使用同一个端口去处理TCP/HTTP协议的请求,因为HTTP的底层协议也是TCP,因此可以在此处理器内部可以通过解析部分数据
* 来判断请求是TCP请求还是HTTP请求,然使用动态的pipeline切换
*
* @author Succy
* create on 2020/11/19
*/
@Slf4j
public class PortUnificationServerHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// Will use the first five bytes to detect a protocol.
if (byteBuf.readableBytes() < 5) {
return;
}
final int magic1 = byteBuf.getUnsignedByte(byteBuf.readerIndex());
final int magic2 = byteBuf.getUnsignedByte(byteBuf.readerIndex() + 1);
// 判断是不是HTTP请求
if (isHttp(magic1, magic2)) {
log.info("this is a http msg");
switchToHttp(channelHandlerContext);
} else {
log.info("this is a socket msg");
// 当成TCP请求处理
ChannelPipeline p = channelHandlerContext.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
p.addLast(new DelimiterBasedFrameDecoder(8192, delimiter))
.addLast(new SocketMsgDecoder())
.addLast(new SocketMsgHandler());
// 将自身移除掉
p.remove(this);
}
}
/**
* 跳转到http处理
*
* @param ctx
*/
private void switchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(65536))
.addLast(new HttpResponseEncoder())
.addLast(new HttpMsgHandler());
p.remove(this);
}
/**
* 判断请求是否是HTTP请求
*
* @param magic1 报文第一个字节
* @param magic2 报文第二个字节
* @return
*/
private boolean isHttp(int magic1, int magic2) {
return magic1 == 'G' && magic2 == 'E' || // GET
magic1 == 'P' && magic2 == 'O' || // POST
magic1 == 'P' && magic2 == 'U' || // PUT
magic1 == 'H' && magic2 == 'E' || // HEAD
magic1 == 'O' && magic2 == 'P' || // OPTIONS
magic1 == 'P' && magic2 == 'A' || // PATCH
magic1 == 'D' && magic2 == 'E' || // DELETE
magic1 == 'T' && magic2 == 'R' || // TRACE
magic1 == 'C' && magic2 == 'O'; // CONNECT
}
}
针对Socket的消息,需要先解码,解码之后再交由Handler处理,这里也贴一下这两个的代码
/**
* Socket消息解码器(简化版)
* @author Succy
* create on 2020/11/19
*/
public class SocketMsgDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int length = byteBuf.readableBytes();
byte[] data = new byte[length];
byteBuf.readBytes(data);
String msg = new String(data, StandardCharsets.UTF_8);
list.add(msg);
}
}
解码之后交由handler处理
/**
* Socket数据处理器
* @author Succy
* create on 2020/11/19
*/
@Slf4j
public class SocketMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("socket msg: {}", msg);
}
}
其实netty也自带了很多解码器,比如json的,比如protocol buf的,还有其他,只是我这里用的是一个简单的字符串而已,如果说是自定义协议,一般都是需要定义自己的编解码器的。
如果是http请求就相对简单多了,直接使用netty自带的编解码器即可。
/**
* Http数据处理器
* @author Succy
* create on 2020/11/19
*/
@Slf4j
public class HttpMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpRequest)) {
log.warn("不是http请求");
throw new RuntimeException("该请求不是一个http请求,拒绝处理");
}
FullHttpRequest request = (FullHttpRequest) msg;
ByteBuf content = request.content();
String body = content.toString(StandardCharsets.UTF_8);
log.info(body);
handleResp(ctx, "ok", HttpResponseStatus.OK);
request.release();
}
/**
* 处理响应
*
* @param ctx 通道上下文对象
* @param data 响应的数据
* @param status 响应的http状态码
*/
private void handleResp(ChannelHandlerContext ctx, String data, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
最后则是在一个主类中,将PortUnificationServerHandler
加进去,启动测试即可
/**
* 启动主类
* @author Succy
* create on 2020/11/18
*/
@Slf4j
public class NettyServer {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 做是否支持epoll轮询判断以获取更高性能
EventLoopGroup boss = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
EventLoopGroup worker = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
try {
serverBootstrap.group(boss, worker)
.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new PortUnificationServerHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
ChannelFuture future = serverBootstrap.bind(8088).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
//
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
运行结果截图
可以看到,此时的Netty服务就可以同时接收TCP和HTTP两种协议的数据了。