Dubbo处理TCP拆包粘包问题

Stella981
• 阅读 1035

#Dubbo处理TCP拆包粘包问题

在TCP网络传输工程中,由于TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么如果合理的decode接受的TCP数据很重要,需要考虑TCP拆包和粘包的问题。我们知道在Netty提供了各种Decoder来解决此类问题,比如LineBasedFrameDecoder,LengthFieldBasedFrameDecoder等等,但是这些都是处理一些通用简单的协议栈,并不能处理高度自定义的协议栈。由于dubbo协议是自定义协议栈,并且包含消息头和消息体两部分,而消息头中包含消息类型、协议版本、协议魔数以及payload长度等信息。所以使用Netty自带的处理方案可能无法满足Dubbo解析自身协议的需求,所以需要Dubbo自己来处理,那自己处理,就需要自己处理TCP的拆包和粘包的问题。这里就对Dubbo处理此类问题进行探讨,从而加深自己对它的理解。

##说明 此处所描述的协议是dubbo协议,其他的协议比如http,webservice等协议不是这里讨论范围。并且这里使用的通信框架以Netty来讲解,Mina以及grizzly也不在种类讨论范围。

##NettyCodecAdapter NettyCodecAdapter是对dubbo协议解析的入口,里面包含decoder和encoder两部分,而TCP的拆包和粘包主要是decoder部分,所以encoder这里不进行讨论。在NettyCodecAdapter中的decoder是由InternalDecoder来实现,它的父类是Netty的SimpleChannelUpstreamHandler可以接受所有inbound消息,那么就可以对接受的消息进行decode。这里需要说明一下对于某一个Channel都有一个私有的InternalDecoder对象,并不是和其他的Channel共享,这里就避免了并发问题,所以在InternalDecoder里面可以用单线程的方式去看待,这样就比较容易理解。

###InternalDecoder 每个channel的inbound消息都会发送到InternalDecodermessageReceived方法,而dubbo会先将接受的消息缓存到InternalDecoderbuffer属性中,这个变量很重要,后面会讨论。下面是messageReceived方法中将接受的消息负载到buffer实现。

      private class InternalDecoder extends SimpleChannelUpstreamHandler {

        private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Object o = event.getMessage();
            if (! (o instanceof ChannelBuffer)) {
                ctx.sendUpstream(event);
                return;
            }

            ChannelBuffer input = (ChannelBuffer) o;
            int readable = input.readableBytes();
            if (readable <= 0) {
                return;
            }

            com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
            if (buffer.readable()) {
                if (buffer instanceof DynamicChannelBuffer) {
                    buffer.writeBytes(input.toByteBuffer());
                    message = buffer;
                } else {
                    int size = buffer.readableBytes() + input.readableBytes();
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                        size > bufferSize ? size : bufferSize);
                    message.writeBytes(buffer, buffer.readableBytes());
                    message.writeBytes(input.toByteBuffer());
                }
            } else {
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                    input.toByteBuffer());
            }

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
            } finally {
                if (message.readable()) {
                    message.discardReadBytes();
                    buffer = message;
                } else {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                }
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            ctx.sendUpstream(e);
        }
    }

首先是判断当前decoder对象的buffer中是否有可以读取的消息,如果有则进行合并,并且把对象引用赋予message局部变量,所以message则获取了当前channel的inbound消息。得到inbound消息之后,那么接下来就是对协议的解析了。

     do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());

这里首先要做的是把当前message的读索引保存到局部变量saveReaderIndex中,用于后面的消息回滚。后面紧接着是对消息的decode,这里的codecDubboCountCodec对象实体,这里需要注意一点,DubboCountCodecdecode每次只会解析出一个完整的dubbo协议栈,带着这个看看decode的实现。

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }

这里暂存了当前buffer的读索引,同样也是为了后面的回滚。可以看到当decode返回的是NEED_MORE_INPUT则表示当前的buffer中数据不足,不能完整解析出一个dubbo协议栈,同时将buffer的读索引回滚到之前暂存的索引并且退出循环,将结果返回。那接下来看看什么时候会返回NEED_MORE_INPUT,最终会定位到在ExchangeCodecdecode方法会解析出协议栈。

    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH 
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i ++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        if( readable < tt ) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

这个方法开始是对telnet协议进行解析(由于dubbo支持telnet连接,所以这里提供了支持,可以忽略这一部分)。看到会有两个地方返回NEED_MORE_INPUT,一个是当前buffer的可读长度还没有消息头长,说明当前buffer连协议栈的头都不完整,所以需要继续读取inbound数据,另一个是当前buffer包含了完整的消息头,便可以得到payload的长度,发现它的可读的长度,并没有包含整个协议栈的数据,所以也需要继续读取inbound数据。如果上面两个情况都不复核,那么说明当前的buffer至少包含一个dubbo协议栈的数据,那么从当前buffer中读取一个dubbo协议栈的数据,解析出一个dubbo数据,当然这里可能读取完一个dubbo数据之后还会有剩余的数据。

上面对dubbo解析出一个完整的dubbo协议栈过程进行了讨论,但是还没有对TCP的拆包和粘包问题做过多的讨论。下面结合上面内容做一个综合讨论。

我这里对TCP拆包和粘包分别列举一个场景来讨论。

###当反生TCP拆包问题时候 这里假设之前还没有发生过任何数据交互,系统刚刚初始化好,那么这个时候在InternalDecoder里面的buffer属性会是EMPTY_BUFFER。当发生第一次inbound数据的时候,第一次在InternalDecoder里面接收的肯定是dubbo消息头的部分(这个由TCP协议保证),由于发生了拆包情况,那么此时接收的inbound消息可能存在一下几种情况

1、当前inbound消息只包含dubbo协议头的一部分

2、当前inbound消息只包含dubbo的协议头

3、当前inbound消息只包含dubbo消息头和部分payload消息

通过上面的讨论,我们知道发生上面三种情况,都会触发ExchangeCodec返回NEED_MORE_INPUT,由于在DubboCountCodec对余返回NEED_MORE_INPUT会回滚读索引,所以此时的buffer里面的数据可以当作并没有发生过读取操作,并且DubboCountCodec的decode也会返回NEED_MORE_INPUT,在InternalDecoder对于当判断返回NEED_MORE_INPUT,也会进行读索引回滚,并且退出循环,最后会执行finally内容,这里会判断inbound消息是否还有可读的,由于在DubboCountCodec里面进行了读索引回滚,所以次数的buffer里面是完整的inbound消息,等待第二次的inbound消息的到来,当第二次inbound消息过来的时候,再次经过上面的判断。

###当发生TCP粘包的时候 当发生粘包的时候是tcp将一个以上的dubbo协议栈放在一个tcp包中,那么有可能发生下面几种情况

1、当前inbound消息只包含一个dubbo协议栈

2、当前inbound消息包含一个dubbo协议栈,同时包含部分另一个或者多个dubbo协议栈内容

如果发生只包含一个协议栈,那么当前buffer通过ExchangeCodec解析协议之后,当前的buffer的readeIndex位置应该是 buffer尾部,那么在返回到InternalDecodermessage的方法readable返回的是false,那么就会对buffer重新赋予EMPTY_BUFFER实体,而针对包含一个以上的dubbo协议栈,当然也会解析出其中一个dubbo协议栈,但是经过ExchangeCodec解析之后,message的readIndex不在message尾部,所以messagereadable方法返回的是true。那么则会继续遍历message,读取下面的信息。最终要么message刚好整数倍包含完整的dubbo协议栈,要不ExchangeCodec返回NEED_MORE_INPUT,最后将未读完的数据缓存到buffer中,等待下次inbound事件,将buffer中的消息合并到下次的inbound消息中,种类又回到了拆包的问题上。

##总结

dubbo在处理tcp的粘包和拆包时是借助InternalDecoderbuffer缓存对象来缓存不完整的dubbo协议栈数据,等待下次inbound事件,合并进去。所以说在dubbo中解决TCP拆包和粘包的时候是通过buffer变量来解决的。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
tcp粘包与udp丢包的原因
tcp粘包与udp丢包的原因一,什么是tcp粘包与udp丢包TCP是面向流的, 流要说明就像河水一样, 只要有水, 就会一直流向低处, 不会间断. TCP为了提高传输效率, 发送数据的时候, 并不是直接发送数据到网路, 而是先暂存到系统缓冲, 超过时间或者缓冲满了, 才把缓冲区的内容发送
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
xiguaapp xiguaapp
3年前
tcp的三次握手四次挥手
tcp的三次握手流程:在tcp/ip协议中,tcp协议提供可靠的连接服务,采用三次握手建立一个连接。第一次握手:建立连接时,客户端发送SYN包【synj】到服务器,并进入SYN_SEND状态,等待服务器确认;第二次握手:服务器收到SYN包,必须确认客户的SYN(ackj1),同时自己也发送一个SYN包(syn
Stella981 Stella981
3年前
Netty使用解码器Decoder解决TCP粘包和拆包问题
解码器Decoder和ChannelHandler的关系netty的解码器通常是继承自ByteToMessageDecoder,而它又是继承自ChannelInboundHandlerAdapter,其实也是一种ChannelHandler和我们自定义的ChannelHandler一样都是来处理进
Wesley13 Wesley13
3年前
TCP协议粘包问题详解
TCP协议粘包问题详解前言在本章节中,我们将探讨TCP协议基于流式传输的最大一个问题,即粘包问题。本章主要介绍TCP粘包的原理与其三种解决粘包的方案。并且还会介绍为什么UDP协议不会产生粘包。基于TCP协议的socket实现远程命令输入我们准备做一个可以在Clie
Easter79 Easter79
3年前
TCP协议与Wireshark实验
目录TCP协议TCP报文段结构字段解析标志字段捕获从计算机到远程服务器的批量TCP传输跟踪包的初步观察TCPBasicsTCP拥塞控制参考资料TCP协议TCP协议给使用者提供了两种服务,分别是面向连接的服务
Stella981 Stella981
3年前
Netty中粘包和拆包的解决方案
粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。TCP粘包和拆包TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包
Stella981 Stella981
3年前
Netty解决TCP粘包和拆包问题的四种方案
 在RPC框架中,TCP粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使用的同一个连接,这样就会产生粘包和拆包的问题。本文首先会对TCP粘包和拆包问题进行描述,然后介绍其常用的解决方案,最后会对Netty提供的几种解决方案进
Wesley13 Wesley13
3年前
34.TCP取样器
阅读文本大概需要3分钟。1、TCP取样器的作用   TCP取样器作用就是通过TCP/IP协议来连接服务器,然后发送数据和接收数据。2、TCP取样器详解!(https://oscimg.oschina.net/oscnet/32a9b19ba1db00f321d22a0f33bcfb68a0d.png)TCPClien