Netty如何实现同一个端口接收TCP和HTTP请求

Stella981
• 阅读 2481

前言

在java的网络编程世界里,Netty的地位可谓是举足轻重,说到基于NIO的网络编程,Netty几乎成为企业的首选,本文不会过多介绍Netty的基本使用等知识,本文着重介绍在Netty中如何实现同一个端口,既能接收TCP请求,也能接收Http请求。

由于一些特殊的原因,我要实现一款消息中间件,暂时称为“企业消息总线”吧。简单描述一下场景,对如果有相同或者类似场景的小伙伴可能有帮助。在企业内部,特别是制造业企业,往往会存在生产管理系统和生产控制系统。管理类系统我们都很熟悉了,但是控制类系统更多的是控制,比如控制机器的运行,接收机器的传感器传递上来的数据,然而管理系统如果要收集生产现场的数据,一般都是通过控制系统上传到管理系统,如果管理系统想要控制机器,则需要下发一些数据到控制系统,因此就存在一个管理系统到控制系统之间的通讯需求。一般的控制系统和管理系统之间都是使用TCP进行通讯,当然现在也有支持使用HTTP直接通信的,具体可以参考如下简图 Netty如何实现同一个端口接收TCP和HTTP请求

今天只讨论使用TCP进行通讯的模式

需求提出

上面简单介绍了管理系统和控制系统之间的通讯模式,我们都知道,在网络编程里面,比较关注的两点就是_通讯协议_和_序列化协议_。通讯协议一般就是指传输层协议TCP/UDP或者应用层协议HTTP等协议。而序列化的就多了,常见的有protocol buffer,json或者自定义的某协议。

上图只是简单的一个机器的控制系统直接和管理系统进行通讯,但是真实场景只是这么简单吗?当然不是了,真实场景往往是管理系统与控制系统是一对多的存在,不同产线不同的设备对应到的控制系统一般都是不同的,因此,如果采用这种模式进行通讯,那么管理系统的小伙伴要抓狂了。而且实际场景里,管理类系统也不只是一套,管理类系统之间也存在通讯的需求,并且大企业内部的管理系统大多还是异构系统,开发语言和运行平台什么的都可能存在较大差异,比如ERP类系统和MES类系统就需要进行通讯,而MES类系统又需要和控制系统进行通讯,为了解决这样一个问题,我们就需要引入一个企业级消息总线。

具体可以参考下图 Netty如何实现同一个端口接收TCP和HTTP请求

通过上图我们可以看到,如果引入了企业级消息总线的话,总线和控制系统之间通过TCP通信,总线和各管理系统通过HTTP通信,这样设计就完全解耦开来了,各个管理系统不需要配置N个控制系统的地址,而控制系统也只管跟总线通讯即可。

端口复用

因为本文不是想介绍企业级消息总线如何设计和实现,上面说了那么多,只想引出一个问题,也就是今天要探讨的问题。因为总线既要支持TCP也要支持HTTP两种协议,那么传统的做法是怎么样的呢?传统的做法可以开两个端口一个监听HTTP服务,一个监听TCP服务,这样子做是没有问题的,但是我们都知道,HTTP协议是基于TCP协议的,能不能只开一个端口,然后同时支持TCP和HTTP两种协议呢?答案肯定是可以的。

下面就是本文的主角Netty登场啦。

在翻阅了很多资料,在netty的example里找到了这么一个类io.netty.example.portunification.PortUnificationServerHandler,有兴趣的朋友可以参考下这个类的实现,我的实现也是参考这个类的。

代码实现

核心的PortUnificationServerHandler的实现如下

/**
 * 统一端口的处理器
 * <p>
 * 使用同一个端口去处理TCP/HTTP协议的请求,因为HTTP的底层协议也是TCP,因此可以在此处理器内部可以通过解析部分数据
 * 来判断请求是TCP请求还是HTTP请求,然使用动态的pipeline切换
 *
 * @author Succy
 * create on 2020/11/19
 */
@Slf4j
public class PortUnificationServerHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // Will use the first five bytes to detect a protocol.
        if (byteBuf.readableBytes() < 5) {
            return;
        }
        final int magic1 = byteBuf.getUnsignedByte(byteBuf.readerIndex());
        final int magic2 = byteBuf.getUnsignedByte(byteBuf.readerIndex() + 1);

        // 判断是不是HTTP请求
        if (isHttp(magic1, magic2)) {
            log.info("this is a http msg");
            switchToHttp(channelHandlerContext);
        } else {
            log.info("this is a socket msg");
            // 当成TCP请求处理
            ChannelPipeline p = channelHandlerContext.pipeline();

            ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());
            p.addLast(new DelimiterBasedFrameDecoder(8192, delimiter))
                    .addLast(new SocketMsgDecoder())
                    .addLast(new SocketMsgHandler());
            // 将自身移除掉
            p.remove(this);
        }
    }

    /**
     * 跳转到http处理
     *
     * @param ctx
     */
    private void switchToHttp(ChannelHandlerContext ctx) {
        ChannelPipeline p = ctx.pipeline();
        p.addLast(new HttpRequestDecoder())
                .addLast(new HttpObjectAggregator(65536))
                .addLast(new HttpResponseEncoder())
                .addLast(new HttpMsgHandler());

        p.remove(this);
    }


    /**
     * 判断请求是否是HTTP请求
     *
     * @param magic1 报文第一个字节
     * @param magic2 报文第二个字节
     * @return
     */
    private boolean isHttp(int magic1, int magic2) {
        return magic1 == 'G' && magic2 == 'E' || // GET
                magic1 == 'P' && magic2 == 'O' || // POST
                magic1 == 'P' && magic2 == 'U' || // PUT
                magic1 == 'H' && magic2 == 'E' || // HEAD
                magic1 == 'O' && magic2 == 'P' || // OPTIONS
                magic1 == 'P' && magic2 == 'A' || // PATCH
                magic1 == 'D' && magic2 == 'E' || // DELETE
                magic1 == 'T' && magic2 == 'R' || // TRACE
                magic1 == 'C' && magic2 == 'O';   // CONNECT
    }

}

针对Socket的消息,需要先解码,解码之后再交由Handler处理,这里也贴一下这两个的代码

/**
 * Socket消息解码器(简化版)
 * @author Succy
 * create on 2020/11/19
 */
public class SocketMsgDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int length = byteBuf.readableBytes();
        byte[] data = new byte[length];
        byteBuf.readBytes(data);
        String msg = new String(data, StandardCharsets.UTF_8);
        list.add(msg);
    }
}

解码之后交由handler处理

/**
 * Socket数据处理器
 * @author Succy
 * create on 2020/11/19
 */
@Slf4j
public class SocketMsgHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("socket msg: {}", msg);
    }
}

其实netty也自带了很多解码器,比如json的,比如protocol buf的,还有其他,只是我这里用的是一个简单的字符串而已,如果说是自定义协议,一般都是需要定义自己的编解码器的。

如果是http请求就相对简单多了,直接使用netty自带的编解码器即可。

/**
 * Http数据处理器
 * @author Succy
 * create on 2020/11/19
 */
@Slf4j
public class HttpMsgHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof FullHttpRequest)) {
            log.warn("不是http请求");
            throw new RuntimeException("该请求不是一个http请求,拒绝处理");
        }

        FullHttpRequest request = (FullHttpRequest) msg;
        ByteBuf content = request.content();
        String body = content.toString(StandardCharsets.UTF_8);
        log.info(body);

        handleResp(ctx, "ok", HttpResponseStatus.OK);
        request.release();
    }

    /**
     * 处理响应
     *
     * @param ctx    通道上下文对象
     * @param data   响应的数据
     * @param status 响应的http状态码
     */
    private void handleResp(ChannelHandlerContext ctx, String data, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
                Unpooled.copiedBuffer(data, CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
}

最后则是在一个主类中,将PortUnificationServerHandler加进去,启动测试即可

/**
 * 启动主类
 * @author Succy
 * create on 2020/11/18
 */
@Slf4j
public class NettyServer {

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 做是否支持epoll轮询判断以获取更高性能
        EventLoopGroup boss = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        EventLoopGroup worker = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        try {
            serverBootstrap.group(boss, worker)
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new PortUnificationServerHandler());
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true);

            ChannelFuture future = serverBootstrap.bind(8088).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            //
            throw new RuntimeException(e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

运行结果截图

Netty如何实现同一个端口接收TCP和HTTP请求

可以看到,此时的Netty服务就可以同时接收TCP和HTTP两种协议的数据了。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这