UDT协议实现分析——数据发送控制

Wesley13
• 阅读 678

前文中,我们有看到,数据发送的过程,大体是发送者CUDT将要发送的数据放进它的CSndBuffer m_pSndBuffer,并将它自己添加进它的CSndQueue m_pSndQueue的CSndUList m_pSndUList的堆里,后面CSndQueue m_pSndQueue的worker线程会通过CSndUList::pop()从CSndUList m_pSndUList的堆顶CUDT中获取一个要发送的包来发送,包的获取主要是通过CUDT::packData()来完成,而这个函数正是UDT中包发送的执行中心。

CUDT::packData()

===================

这里就来看一下CUDT::packData()的定义(src/core.cpp):

int CUDT::packData(CPacket& packet, uint64_t& ts) {
    int payload = 0;
    bool probe = false;

    uint64_t entertime;
    CTimer::rdtsc(entertime);

    if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))
        m_ullTimeDiff += entertime - m_ullTargetTime;

    // Loss retransmission always has higher priority.
    if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) {
        // protect m_iSndLastDataAck from updating by ACK processing
        CGuard ackguard(m_AckLock);

        int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);
        if (offset < 0)
            return 0;

        int msglen;

        payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);

        if (-1 == payload) {
            int32_t seqpair[2];
            seqpair[0] = packet.m_iSeqNo;
            seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);
            sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);

            // only one msg drop request is necessary
            m_pSndLossList->remove(seqpair[1]);

            // skip all dropped packets
            if (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)
                m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);

            return 0;
        } else if (0 == payload)
            return 0;

        ++m_iTraceRetrans;
        ++m_iRetransTotal;
    } else {
        // If no loss, pack a new packet.

        // check congestion/flow window limit
        int cwnd = (m_iFlowWindowSize < (int) m_dCongestionWindow) ? m_iFlowWindowSize : (int) m_dCongestionWindow;
        if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo))) {
            if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) {
                m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);
                m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);

                packet.m_iSeqNo = m_iSndCurrSeqNo;

                // every 16 (0xF) packets, a packet pair is sent
                if (0 == (packet.m_iSeqNo & 0xF))
                    probe = true;
            } else {
                m_ullTargetTime = 0;
                m_ullTimeDiff = 0;
                ts = 0;
                return 0;
            }
        } else {
            m_ullTargetTime = 0;
            m_ullTimeDiff = 0;
            ts = 0;
            return 0;
        }
    }

    packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);
    packet.m_iID = m_PeerID;
    packet.setLength(payload);

    m_pCC->onPktSent(&packet);
    //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);

    ++m_llTraceSent;
    ++m_llSentTotal;

    if (probe) {
        // sends out probing packet pair
        ts = entertime;
        probe = false;
    } else {
#ifndef NO_BUSY_WAITING
        ts = entertime + m_ullInterval;
#else
        if (m_ullTimeDiff >= m_ullInterval) {
            ts = entertime;
            m_ullTimeDiff -= m_ullInterval;
        } else {
            ts = entertime + m_ullInterval - m_ullTimeDiff;
            m_ullTimeDiff = 0;
        }
#endif
    }

    m_ullTargetTime = ts;

    return payload;
}

在这个函数中,处理了两大类packet的读取,一是丢失的packet,二是正常的顺序传输的包。来看一下这个函数具体的执行过程:

1. 读取当前的时间entertime。

2. 更新m_ullTimeDiff。在UDT中,包发送会有一个随着网络状况调整的一个发送周期,也就是m_ullInterval值。在每一次发送包时,都会根据m_ullInterval值计算下一次包发送的理想时间,并记录在m_ullTargetTime中。而m_ullTimeDiff则被用来记录当前的这次包发送想对于理想的发送时间的延滞值,这个值会被用于计算下一次包发送的理想时间。UDT正是通过这样的修正来尽可能使的包发送周期能够保持在m_ullInterval值附近。

3. 从丢失包列表m_pSndLossList中获取一个丢失的包的SeqNo,并赋值给packet.m_iSeqNo。这个丢失包列表中的包可能是来源于Timer,比如一个包超过了正常时间还没有得到响应,也有可能来源于发送端发回的NACK消息,后面会在来研究这个问题。

4. 前一步中获取的SeqNo大于等于0,这表明存在丢失了需要重传的包,则读取丢失的包的内容:

(1). 计算丢失的包的SeqNo与SndLastDataAck的差值offset。

(2). 检查前一步计算出来的offset值,若小于0,表明发送窗口已经滑过了,则直接返回,否则继续执行。

(3). 根据前面计算的offset值,通过m_pSndBuffer->readData()把数据读入packet中。packet的m_iMsgNo会被更新为packet的MsgNo,msglen也会在packet过期时被更新。

(4). m_pSndBuffer->readData()返回0,表明读取的packet的数据长度为0。这样的packet没有实际发送的必要,直接返回,无需进行后续的步骤。

(5). m_pSndBuffer->readData()返回-1,表明要读取的packet已经过期,同样没有发送这个数据包本身的必要。

但此时会发送一个DropMsgRequest给数据接收端。

然后将过期的packet从SndLossList中移除出去。

m_iSndCurrSeqNo的值为最近一次发送的packet的SeqNo,这里还会在必要的时候更新m_iSndCurrSeqNo,以跳过所有被丢弃的packets。必要指的是,m_iSndCurrSeqNo的值小于等于要丢弃的这个Msg的最后一个packet的SeqNo。这也就意味着,要丢弃的这个Msg直到过期被丢弃都没有发完。

这个地方有一点比较奇怪,简化来看,seqpair[0] == packet.m_iSeqNo,seqpair[1] == seqpair[0] + msglen,也就是说seqpair[1]的值为要被丢弃Msg的最后一个packet的SeqNo加1,但在判断是否要更新m_iSndCurrSeqNo时,却是拿m_iSndCurrSeqNo和(seqpair[1] + 1),也就是要被丢弃的Msg的最后一个packet的SeqNo加2在比较,而更新也是被设置为这个值。但实际上,将m_iSndCurrSeqNo设置为被丢弃Msg的最后一个packet的SeqNo已经可以跳过整个Msg的发送了,因为下次要用m_iSndCurrSeqNo来获得SeqNo,会先将这个值加1的。这个地方的逻辑疑似存在bug。

返回0,向调用者表明暂时没有数据要发送。

(6). m_pSndBuffer->readData()返回大于0的值,表明有一个丢失的包需要重新发送,则更新m_iTraceRetrans和m_iRetransTotal,这两个值分别表示一次trace重发的总次数,和此UDT Socket总的重发次数,两者的区别在于前者在被读取之后会被重置为0(CUDT::sample()),而后者则不会。此时需要继续执行后面的第6步。

5. 在第3步中读取的SeqNo小于0,表明没有丢失的packet。此时则:

(1). 根据m_iFlowWindowSize和m_dCongestionWindow的值计算cwnd发送窗口的大小。发送窗口大小取这两个值中较小的那个,默认情况下,前者为8192(来自于Handshke消息的m_iFlightFlagSize字段,而m_iFlightFlagSize则根据m_iRcvBufSize和m_iFlightFlagSize得出),后者为16(来自于CC的m_dCWndSize字段,在CUDTCC::init()中该值被初始化为16。)。

(2). 检查发送窗口是否已满。若已满,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,然后返回0,向调用者表明没有数据要发送。否则继续执行。

m_iSndLastAck的值为下一次Ack应该确认的packet的SeqNo,CSeqNo::seqlen()计算的是包含两个端点在内的区间的长度。此处对CSeqNo::seqlen()的调用被用来计算,下个packet发送之后,发送窗口中所有的packet的个数。

(3). 读取下一个需要发送的packet,并检查返回的payload值。若payload值为0,表明数据缓冲区中所有的数据都已经发送了,无需再进行实际的发送,则将m_ullTargetTime和m_ullTimeDiff重置为0,将ts置为0,返回0,向调用者表明没有数据要发送。否则继续执行。

(4). 主要是更新m_iSndCurrSeqNo,并设置packet的SeqNo字段。如果SeqNo为16的整数倍,还会设置probe为true。

6. 设置packet的m_iTimeStamp,m_iID,及数据长度。

7. 更新m_llTraceSent和m_llSentTotal,其中前者表示CUDT这次Trace的过程中发送的总的packet数量,这个值会在CUDT::sample()获取trace数据之后被重置为0,而后者则表示发送的总共的packet数量,不会在CUDT::sample()获取trace数据之后被重置。

8. 根据probe的值,更新ts值等。配 合CUDT::packData()的调用者CSndUList::pop()一起看,可知ts是理想中该CUDT下次发送数据的时间点。

probe设置为true,就是表明,当前的这个packet被发送结束之后立即发送下一个packet。即使probe的值不为true,也有可能要立即发送下一个packet,比如延滞时间已经超过了理想的发生周期。存在延滞时间,但该延滞时间又没有超出理想的发送周期的,则下个packet的发送时间具体本次packet的发送时间会小于理想的packet发送周期。

总之这里是希望能够保持packet以接近理想的速率发送。

9. 更新m_ullTargetTime为ts。ts这个下次发包的理想时间点还需要m_ullTargetTime进行记录。

10. 返回payload值,也就是读取的packet的大小。

这里顺便来看下CSeqNo的设计与实现。这个类被用来帮助进行与SeqNo有关的一些计算(src/common.h):

class CSeqNo {
 public:
    inline static int seqcmp(int32_t seq1, int32_t seq2) {
        return (abs(seq1 - seq2) < m_iSeqNoTH) ? (seq1 - seq2) : (seq2 - seq1);
    }

    inline static int seqlen(int32_t seq1, int32_t seq2) {
        return (seq1 <= seq2) ? (seq2 - seq1 + 1) : (seq2 - seq1 + m_iMaxSeqNo + 2);
    }

    inline static int seqoff(int32_t seq1, int32_t seq2) {
        if (abs(seq1 - seq2) < m_iSeqNoTH)
            return seq2 - seq1;

        if (seq1 < seq2)
            return seq2 - seq1 - m_iMaxSeqNo - 1;

        return seq2 - seq1 + m_iMaxSeqNo + 1;
    }

    inline static int32_t incseq(int32_t seq) {
        return (seq == m_iMaxSeqNo) ? 0 : seq + 1;
    }

    inline static int32_t decseq(int32_t seq) {
        return (seq == 0) ? m_iMaxSeqNo : seq - 1;
    }

    inline static int32_t incseq(int32_t seq, int32_t inc) {
        return (m_iMaxSeqNo - seq >= inc) ? seq + inc : seq - m_iMaxSeqNo + inc - 1;
    }

 public:
    static const int32_t m_iSeqNoTH;             // threshold for comparing seq. no.
    static const int32_t m_iMaxSeqNo;            // maximum sequence number used in UDT
};

连接发起端在执行CUDT::connect(const sockaddr* serv_addr)时,会计算一个随机的值作为m_iISN,也即是发送的首个数据packet的SeqNo,而在连接建立过程中,这个值会被同步给Peer端的Socket。

这里可以看到,UDT Packet的SeqNo是[0, 0x7FFFFFFF]区间中的一个值。每发送一个packet,m_iSndCurrSeqNo都会被递增。通过class CSeqNo可以看到这个递增的规则,即SeqNo超出0x7FFFFFFF时会被归0。也正是由于0是一个合法的SeqNo,在incseq(int32_t seq, int32_t inc)中,SeqNo超出最大值时的计算里能看到有额外的减1,在seqlen()里,SeqNo超出最大值时的计算里能看到加2。

由seqcmp()和seqoff()这两个函数可见,同一时刻同时有效的两个SeqNo seq1和seq2之间的距离不能超过m_iSeqNoTH 0x3FFFFFFF,若超过则表明一定有一个SeqNo越过了最大值0x7FFFFFFF,也即较小的那个值越过了最大值。

这里还可以再来看一下CSndBuffer::readData():

int CSndBuffer::readData(char** data, int32_t& msgno) {
    // No data to read
    if (m_pCurrBlock == m_pLastBlock)
        return 0;

    *data = m_pCurrBlock->m_pcData;
    int readlen = m_pCurrBlock->m_iLength;
    msgno = m_pCurrBlock->m_iMsgNo;

    m_pCurrBlock = m_pCurrBlock->m_pNext;

    return readlen;
}

int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen) {
    CGuard bufferguard(m_BufLock);

    Block* p = m_pFirstBlock;

    for (int i = 0; i < offset; ++i)
        p = p->m_pNext;

    if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t) p->m_iTTL)) {
        msgno = p->m_iMsgNo & 0x1FFFFFFF;

        msglen = 1;
        p = p->m_pNext;
        bool move = false;
        while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) {
            if (p == m_pCurrBlock)
                move = true;
            p = p->m_pNext;
            if (move)
                m_pCurrBlock = p;
            msglen++;
        }

        return -1;
    }

    *data = p->m_pcData;
    int readlen = p->m_iLength;
    msgno = p->m_iMsgNo;

    return readlen;
}

CSndBuffer::readData(char** data, int32_t& msgno)读取当前的Block。基本上就是读取Block,然后将指向当前Block的指针m_pCurrBlock向后移一个Block。

而CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)则是读取距未响应的Block中最旧一块Block offset个单位的Block。在这个函数中,首先是移动到要读取的目标Block,如果要读取的Block已过期,则使m_pCurrBlock跳过该packet所属的Msg的所有Packet,然后返回-1退出。目标Block没有过期,则读取Block后返回数据长度。

总结在CUDT::packData()中对发送过程的控制。

丢失的包具有最高的发送优先级,这也是发送可靠性的保障方法。所有丢失的packet都会被放进SndLossList,这个List中的包可能来源于超时未得到响应,也可能来源于消息接收端发回的NACK。

对于正常的顺序packet发送的控制主要在于两个方面,一是发送窗口的大小,也就是某个时刻已经发送但未得到相应的packet的最大个数,这一点主要由m_dCongestionWindow和m_iFlowWindowSize来表示;二是控制两个包发送的时间间隔,也就是包的发送速率,这一点则主要用m_ullInterval来表示。所有的发送控制机制主要通过影响这几个变量来控制发送过程。

SndLossList

先来看一下CSndLossList这个数据结构。这个Class的定义如下(src/list.h):

class CSndLossList {
 public:
    CSndLossList(int size = 1024);
    ~CSndLossList();

    // Functionality:
    //    Insert a seq. no. into the sender loss list.
    // Parameters:
    //    0) [in] seqno1: sequence number starts.
    //    1) [in] seqno2: sequence number ends.
    // Returned value:
    //    number of packets that are not in the list previously.
    int insert(int32_t seqno1, int32_t seqno2);

    // Functionality:
    //    Remove ALL the seq. no. that are not greater than the parameter.
    // Parameters:
    //    0) [in] seqno: sequence number.
    // Returned value:
    //    None.
    void remove(int32_t seqno);

    // Functionality:
    //    Read the loss length.
    // Parameters:
    //    None.
    // Returned value:
    //    The length of the list.
    int getLossLength();

    // Functionality:
    //    Read the first (smallest) loss seq. no. in the list and remove it.
    // Parameters:
    //    None.
    // Returned value:
    //    The seq. no. or -1 if the list is empty.
    int32_t getLostSeq();

 private:
    int32_t* m_piData1;                  // sequence number starts
    int32_t* m_piData2;                  // seqnence number ends
    int* m_piNext;                       // next node in the list

    int m_iHead;                         // first node
    int m_iLength;                       // loss length
    int m_iSize;                         // size of the static array
    int m_iLastInsertPos;                // position of last insert node

    pthread_mutex_t m_ListLock;          // used to synchronize list operation

 private:
    CSndLossList(const CSndLossList&);
    CSndLossList& operator=(const CSndLossList&);
};

这是一个不可复制容器。提供的接口不是很多,配合注释,都没有太多难以理解的地方。这是一个用数组实现的链表。接着来看这个class的构造和析构(src/list.cpp):

CSndLossList::CSndLossList(int size)
        : m_piData1(NULL),
          m_piData2(NULL),
          m_piNext(NULL),
          m_iHead(-1),
          m_iLength(0),
          m_iSize(size),
          m_iLastInsertPos(-1),
          m_ListLock() {
    m_piData1 = new int32_t[m_iSize];
    m_piData2 = new int32_t[m_iSize];
    m_piNext = new int[m_iSize];

    // -1 means there is no data in the node
    for (int i = 0; i < size; ++i) {
        m_piData1[i] = -1;
        m_piData2[i] = -1;
    }

    // sender list needs mutex protection
#ifndef WIN32
    pthread_mutex_init(&m_ListLock, 0);
#else
    m_ListLock = CreateMutex(NULL, false, NULL);
#endif
}

CSndLossList::~CSndLossList() {
    delete[] m_piData1;
    delete[] m_piData2;
    delete[] m_piNext;

#ifndef WIN32
    pthread_mutex_destroy(&m_ListLock);
#else
    CloseHandle(m_ListLock);
#endif
}

CUDT::connect()中创建CSndLossList时,size值为m_iFlowWindowSize * 2,也即8192 × 2 == 16384。如果不用数组,而用常规一点的方法来实现的话,链表的节点定义可能是这样的:

struct Node {
    int32_t m_iStart;
    int32_t m_iEnd;
    Node *m_pNext;
};

然后来看这个class最关键的函数之一CSndLossList::insert()的定义:

int CSndLossList::insert(int32_t seqno1, int32_t seqno2) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength) {
        // insert data into an empty list

        m_iHead = 0;
        m_piData1[m_iHead] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[m_iHead] = seqno2;

        m_piNext[m_iHead] = -1;
        m_iLastInsertPos = m_iHead;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);

        return m_iLength;
    }

    // otherwise find the position where the data can be inserted
    int origlen = m_iLength;
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno1);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (offset < 0) {
        // Insert data prior to the head pointer

        m_piData1[loc] = seqno1;
        if (seqno2 != seqno1)
            m_piData2[loc] = seqno2;

        // new node becomes head
        m_piNext[loc] = m_iHead;
        m_iHead = loc;
        m_iLastInsertPos = loc;

        m_iLength += CSeqNo::seqlen(seqno1, seqno2);
    } else if (offset > 0) {
        if (seqno1 == m_piData1[loc]) {
            m_iLastInsertPos = loc;

            // first seqno is equivlent, compare the second
            if (-1 == m_piData2[loc]) {
                if (seqno2 != seqno1) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                }
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                // Do nothing if it is already there
                return 0;
        } else {
            // searching the prior node
            int i;
            if ((-1 != m_iLastInsertPos) && (CSeqNo::seqcmp(m_piData1[m_iLastInsertPos], seqno1) < 0))
                i = m_iLastInsertPos;
            else
                i = m_iHead;

            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno1) < 0))
                i = m_piNext[i];

            if ((-1 == m_piData2[i]) || (CSeqNo::seqcmp(m_piData2[i], seqno1) < 0)) {
                m_iLastInsertPos = loc;

                // no overlap, create new node
                m_piData1[loc] = seqno1;
                if (seqno2 != seqno1)
                    m_piData2[loc] = seqno2;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iLength += CSeqNo::seqlen(seqno1, seqno2);
            } else {
                m_iLastInsertPos = i;

                // overlap, coalesce with prior node, insert(3, 7) to [2, 5], ... becomes [2, 7]
                if (CSeqNo::seqcmp(m_piData2[i], seqno2) < 0) {
                    m_iLength += CSeqNo::seqlen(m_piData2[i], seqno2) - 1;
                    m_piData2[i] = seqno2;

                    loc = i;
                } else
                    return 0;
            }
        }
    } else {
        m_iLastInsertPos = m_iHead;

        // insert to head node
        if (seqno2 != seqno1) {
            if (-1 == m_piData2[loc]) {
                m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                m_piData2[loc] = seqno2;
            } else
                return 0;
        } else
            return 0;
    }

    // coalesce with next node. E.g., [3, 7], ..., [6, 9] becomes [3, 9]
    while ((-1 != m_piNext[loc]) && (-1 != m_piData2[loc])) {
        int i = m_piNext[loc];

        if (CSeqNo::seqcmp(m_piData1[i], CSeqNo::incseq(m_piData2[loc])) <= 0) {
            // coalesce if there is overlap
            if (-1 != m_piData2[i]) {
                if (CSeqNo::seqcmp(m_piData2[i], m_piData2[loc]) > 0) {
                    if (CSeqNo::seqcmp(m_piData2[loc], m_piData1[i]) >= 0)
                        m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[loc]);

                    m_piData2[loc] = m_piData2[i];
                } else
                    m_iLength -= CSeqNo::seqlen(m_piData1[i], m_piData2[i]);
            } else {
                if (m_piData1[i] == CSeqNo::incseq(m_piData2[loc]))
                    m_piData2[loc] = m_piData1[i];
                else
                    m_iLength--;
            }

            m_piData1[i] = -1;
            m_piData2[i] = -1;
            m_piNext[loc] = m_piNext[i];
        } else
            break;
    }

    return m_iLength - origlen;
}

1. 这个函数首先处理了最简单的向空链表中插入元素的case。

这种情况下,m_iHead被赋予0值。m_piData1[m_iHead]会被赋值为要插入的这段丢失packet范围的起始SeqNo。

如果起始SeqNo和结束SeqNo的值不同,m_piData2[m_iHead]还会被赋值为结束SeqNo;如果相同,则m_piData2[m_iHead]将仍然保持构造函数中初始化的-1,以表示这段丢失packet范围只有一个元素。

m_piNext[m_iHead]被赋值为-1以表示这是链表中的最后一个元素。m_iLastInsertPos用来记录上一次插入的位置,这里会被赋值为m_iHead。m_iLength表示CSndLossList中记录的丢失packet的总格数,这里会被设置为这段packet的长度。然后返回m_iLength。

可见m_iHead指向链表的头部。m_piData1,m_piData2和m_piNext这三个数组中相同位置的元素共同表示一个链表节点,它们分别表示一个丢失packet范围的起始SeqNo,结束SeqNo和该节点在链表中next节点的位置。

2. 链表中已经有元素了,则将m_iLength保存在origlen中。计算要插入的这段丢失packet的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值offset。然后计算要插入的这段丢失packet范围的的可能的位置loc,这个可能的位置主要由这段丢失packet范围的起始SeqNo与链表中原有的头节点的起始SeqNo字段的差值决定。

由loc的计算方法可见,数组中的空间是被循环利用的。比如要插入的节点是向CSndLossList中插入的第二个节点,则此时m_iHead仍然为0,而要插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段,则新插入的节点将被绕回到数组的尾部。

3. 处理offset小于0的情况。这表示插入的这个丢失packet范围的起始位置小于原有的头节点的起始SeqNo字段值,此时则会在loc位置插入一个新的节点以描述这段丢失packet范围。更新m_iHead和m_iLastInsertPos指向新插入的这个节点。并更新m_iLength以体现新加入的这个丢失packet范围。

向单向链表的头部插入元素总是比较简单。由此我们也看到,这个链表是以节点的起始SeqNo字段值的升序排列的有序链表。

但这个地方貌似没有处理新插入的这个丢失packet范围与原有头节点表示的丢失packet范围存在交叉的情况?没错,是没有处理,这种情况会在处理完所有的插入情况之后再统一来做。

4. 处理offset大于0的情况。这又分为两种情况:

(1). seqno1 == m_piData1[loc],表明新节点的目标插入位置中原有节点保存的丢失Packet范围的起始SeqNo与要插入的这个丢失Packets范围的起始SeqNo相同。则此时会首先更新m_iLastInsertPos为loc,还需要处理这样的几种case,

case 1:原有的范围中只有一个元素,要插入的这个范围有多个元素。

case 2:原有的范围中有多个元素,要插入的这个范围有一个元素。

case 3:原有的范围和要插入的范围都只有一个元素。

case 4:原有的范围和要插入的范围中都有多个元素,但新插入的范围完全包含原有的范围。

case 5:原有的范围和要插入的范围中都有多个元素,但原有的范围完全包含新插入的范围。

case 6:原有的范围和要插入的范围中都有多个元素,且完全相同。

这些case包含的packet范围的相对关系可以用下图来简单表示:

UDT协议实现分析——数据发送控制

代码的具体写法不同,这些case中的一些可能会以不同的方式被合并成一个处理,而有些case则不需要对原有的链表进行任何的调整。

if block中处理的是case1和case3,else-if block中处理的是case 4,else block中处理的是case 2,case 5和case 6。其中case 3,case 2,case 5和case 6都不需要对链表做出调整。

以此来看,在第一个if block的内部,应该再加一个else block来直接返回0会比较好一点。

那段代码的一种等价实现形式:

if (seqno2 != seqno1) {
                if (-1 == m_piData2[loc]) {
                    m_iLength += CSeqNo::seqlen(seqno1, seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else if (CSeqNo::seqcmp(seqno2, m_piData2[loc]) > 0) {
                    // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
                    m_iLength += CSeqNo::seqlen(m_piData2[loc], seqno2) - 1;
                    m_piData2[loc] = seqno2;
                } else {
                    return 0;
                }
            } else {
                // Do nothing if it is already there
                return 0;
            }

(2). seqno1与m_piData1[loc]不相等。这其实主要有两种可能,一是loc位置已经有了其它的节点,但该节点所表示的范围的起始SeqNo与要插入的这个范围的起始SeqNo不同;二是loc位置还没有被插入节点。但第一种可能应该是不会出现的,因而这里实际要处理的也就是loc位置还没有节点插入的情况。

对于这种情况,节点的插入位置完全不是问题,关键的问题是调整链表中一些节点的关系。可以看到这里的处理过程:

查找新插入节点前面的那个节点。该查找过程的开始位置由m_piData1[m_iLastInsertPos]与seqno1的相对大小决定,如果前者较小,则从m_iLastInsertPos开始,否则,从m_iHead开始。这大概主要是想要利用空间局部性原理来提高查找的效率。然后就是通过一个循环找到新插入节点前面的那个节点。

找到的这前面的节点所表示的丢失packet范围与要插入的节点所要表示的范围之间的关系又有这样的几种case:

case 1:前面的节点表示的范围只有一个packet。

case 2:前面的节点表示的范围含有多个packet,但它的结束SeqNo仍然小于要插入的范围的起始SeqNo。

case 3:前面的节点表示的范围含有多个packet,但它的结束SeqNo大于等于要插入的范围的起始SeqNo。

前两个case表明两个范围不相交,而case 3则表明两个范围是相交的。对于前两个case,则在前面的那个节点之后插入一个节点,链表中节点的连接关系做适当的调整即可。对于case 3,则需要将插入的这个范围合并入前面的那个节点。如果前面的那个节点包含的范围完全覆盖了要插入的范围,则什麽都不做,如果不是则需要对结束SeqNo字段做一些调整。

5. offset值等于0,表明要插入的这个范围的起始SeqNo与Head节点表示的范围的起始SeqNo相同,这个过程则与offset大于0时,seqno1 == m_piData1[loc]中的处理基本一致。

6. 从插入新节点的位置开始,合并链表中与新插入的这个丢失packet范围相交,或被包含或紧紧相邻的节点。

7. 返回新加入CSndLossList的丢失packet的总个数。

有这个函数的整个执行过程不难看出,头节点中将包含最老的丢失packets。

看完了插入,自然不能不再来看一下CSndLossList::remove():

void CSndLossList::remove(int32_t seqno) {
    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return;

    // Remove all from the head pointer to a node with a larger seq. no. or the list is empty
    int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno);
    int loc = (m_iHead + offset + m_iSize) % m_iSize;

    if (0 == offset) {
        // It is the head. Remove the head and point to the next node
        loc = (loc + 1) % m_iSize;

        if (-1 == m_piData2[m_iHead])
            loc = m_piNext[m_iHead];
        else {
            m_piData1[loc] = CSeqNo::incseq(seqno);
            if (CSeqNo::seqcmp(m_piData2[m_iHead], CSeqNo::incseq(seqno)) > 0)
                m_piData2[loc] = m_piData2[m_iHead];

            m_piData2[m_iHead] = -1;

            m_piNext[loc] = m_piNext[m_iHead];
        }

        m_piData1[m_iHead] = -1;

        if (m_iLastInsertPos == m_iHead)
            m_iLastInsertPos = -1;

        m_iHead = loc;

        m_iLength--;
    } else if (offset > 0) {
        int h = m_iHead;

        if (seqno == m_piData1[loc]) {
            // target node is not empty, remove part/all of the seqno in the node.
            int temp = loc;
            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[temp])
                m_iHead = m_piNext[temp];
            else {
                // remove part, e.g., [3, 7] becomes [], [4, 7] after remove(3)
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[temp], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[temp];
                m_iHead = loc;
                m_piNext[loc] = m_piNext[temp];
                m_piNext[temp] = loc;
                m_piData2[temp] = -1;
            }
        } else {
            // target node is empty, check prior node
            int i = m_iHead;
            while ((-1 != m_piNext[i]) && (CSeqNo::seqcmp(m_piData1[m_piNext[i]], seqno) < 0))
                i = m_piNext[i];

            loc = (loc + 1) % m_iSize;

            if (-1 == m_piData2[i])
                m_iHead = m_piNext[i];
            else if (CSeqNo::seqcmp(m_piData2[i], seqno) > 0) {
                // remove part/all seqno in the prior node
                m_piData1[loc] = CSeqNo::incseq(seqno);
                if (CSeqNo::seqcmp(m_piData2[i], m_piData1[loc]) > 0)
                    m_piData2[loc] = m_piData2[i];

                m_piData2[i] = seqno;

                m_piNext[loc] = m_piNext[i];
                m_piNext[i] = loc;

                m_iHead = loc;
            } else
                m_iHead = m_piNext[i];
        }

        // Remove all nodes prior to the new head
        while (h != m_iHead) {
            if (m_piData2[h] != -1) {
                m_iLength -= CSeqNo::seqlen(m_piData1[h], m_piData2[h]);
                m_piData2[h] = -1;
            } else
                m_iLength--;

            m_piData1[h] = -1;

            if (m_iLastInsertPos == h)
                m_iLastInsertPos = -1;

            h = m_piNext[h];
        }
    }
}

先来回忆下CSndLossList的类定义中,对于这个函数的语义的说明:移除所有不大于参数值的SeqNo。这个函数的主要执行过程如下:

1. 检查m_iLength是否为0,若为0,说明CSndLossList还没有加入任何SeqNo,则直接返回,否则继续执行。

2. 计算头节点所表示的丢失packet范围的起始SeqNo与参数seqno的offset,及可能以seqno作为起始SeqNo字段的节点的位置loc。

根据这个函数的语义,我们知道,其实是不需要处理offset小于的case的。如我们前面所了解的,头节点所表示的SeqNo范围是CSndLossList中SeqNo值最小的一个范围,而如果seqno小于这个范围的起始SeqNo的话,则说明不大于seqno的所有SeqNo都已经不存在了。

3. 处理offset == 0的情况。offset == 0,表明seqno是包含于头节点所表示的范围,而且还是这个范围的起始SeqNo。此时又主要分两种情况来处理:

(1). 头节点表示的这个范围只有一个SeqNo。

(2). 头节点表示的范围包含多个SeqNo。

对于情况(1),则头节点将向后滑动一个节点,原来头节点的存放位置会被复位。对于情况(2),为了保证两个节点的相对位置等于节点所表示的丢失packet范围的起始SeqNo的差值这样的一种节点间关系依然成立,需要将头节点保存的位置后移一个位置。对于情况(2),还会再分为两种情况来处理,一是原来的头节点中只包含2个SeqNo,则在移除seqno后只剩下了一个,此时要保持m_piData2[loc]为-1,若包含3个及以上SeqNo的,则要复制原来的头节点的结束SeqNo到新的位置。

其它就是适当地更新m_iLastInsertPos,m_iHead和m_iLength了。这里似乎补一个

m_piNext[m_iHead] = -1;

要更好一点。

4. 处理offset大于0的情况。对于这种情况,比较麻烦的是找到seqno具体包含在哪个节点中。处理过程大致为:

(1). 检查一下,seqno与m_piData1[loc]是否相等,若相等,则要找的节点已经是找到了,且seqno为目标节点表示的丢失packet范围的起始seqno。此时则会再分为两种情况来处理,一是目标节点中只包含一个SeqNo,则使链表头指向目标节点的下一个节点。

二是目标节点中包含多个SeqNo,则需要将seqno排除在目标节点范围之外新建一个节点,将新节点保存在目标节点后面相邻的位置,若目标节点中包含2个节点,则需要设置新节点的结束SeqNo字段为-1,若大于等于3,则复制此字段的值,然后将原来的目标节点改造为只包含seqno的节点,并使它指向新建的这个节点。总之就是将原来的一个节点拆分成了两个节点,一个节点只包含seqno,另一个则包含原来的目标节点中其余的SeqNo。

(2). seqno与m_piData1[loc]不相等的情况,则需要先找到起始SeqNo字段小于seqno的SeqNo值最大的那个节点。这又可以分为3种case来处理,

case 1:找到的节点只包含一个SeqNo,此时则使链表头指向找到的节点的next节点。

case 2:找到的节点包含多个SeqNo,且seqno小于找到的节点的结束SeqNo字段,此时则需要将找到的节点裂为链表中的两个节点,一个包含的范围为[原节点的起始SeqNo,seqno],另一个包含的范围为[seqno + 1, 原节点的结束SeqNo]。同时链表头应该指向后者。

case 3:找到的节点包含多个SeqNo,且seqno大于等于找到的节点的结束SeqNo字段,此时则同样使链表头指向找到的节点的next节点。

5. 移除新找到的头节点之前所有的节点。适当地更新m_iLength等。

这个地方移除的操作看起来好罗嗦。必须要计算出offset值,而不是简单的用比较符号,是由SeqNo的递增规则决定的。但对于offset大于等于0的情况,则可以使用统一的过程来处理:找到起始SeqNo字段不大于seqno的 SeqNo值最大的那个节点,然后根据seqno与这个节点描述的范围的4种关系,即seqno等于找到的节点的起始SeqNo值,seqno大于起始SeqNo值但小于结束SeqNo值,seqno等于结束SeqNo值,seqno大于结束SeqNo值,及找到的节点所描述的范围的大小,来分类处理即可。

还有我们前面在CUDT::packData()中看到的CSndLossList::getLostSeq():

int32_t CSndLossList::getLostSeq() {
    if (0 == m_iLength)
        return -1;

    CGuard listguard(m_ListLock);

    if (0 == m_iLength)
        return -1;

    if (m_iLastInsertPos == m_iHead)
        m_iLastInsertPos = -1;

    // return the first loss seq. no.
    int32_t seqno = m_piData1[m_iHead];

    // head moves to the next node
    if (-1 == m_piData2[m_iHead]) {
        //[3, -1] becomes [], and head moves to next node in the list
        m_piData1[m_iHead] = -1;
        m_iHead = m_piNext[m_iHead];
    } else {
        // shift to next node, e.g., [3, 7] becomes [], [4, 7]
        int loc = (m_iHead + 1) % m_iSize;

        m_piData1[loc] = CSeqNo::incseq(seqno);
        if (CSeqNo::seqcmp(m_piData2[m_iHead], m_piData1[loc]) > 0)
            m_piData2[loc] = m_piData2[m_iHead];

        m_piData1[m_iHead] = -1;
        m_piData2[m_iHead] = -1;

        m_piNext[loc] = m_piNext[m_iHead];
        m_iHead = loc;
    }

    m_iLength--;

    return seqno;
}

这个函数读取首个丢失packet的SeqNo。

这个函数会取链表头节点中最小的SeqNo,返回给调用者,然后将这个SeqNo从链表头节点中移除。移除的时候又可以分为2种情况:1. 头节点描述的packet范围只包含一个SeqNo,2. 包含多个SeqNo。

对于情况1,则将链表头节点向后移动一个节点,然后移除原来的头节点。对于情况2,则将原来的头节点分裂为两个节点,一个只包含首个丢失packet的SeqNo,另一个包含原来头节点中其余的SeqNo,令链表头节点指向后一个节点,并移除前一个节点。

可见,通过CSndLossList::getLostSeq()返回给调用者的Packet是会被从CSndLossList中移除出去的。

最后可以再来看一下,经过了这些操作的蹂躏之后,这个链表可能的样子。比如有4段丢失的packet,其SeqNo范围及被插入的顺序为[8, 9],[3, 5],[12,12],[15, 19],假设缓冲区大小为20,则看起来可能为:

UDT协议实现分析——数据发送控制

UDT中,丢失packet列表大体如此。

Done。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
UDT协议实现分析——数据的接收
看了UDT中数据发送的部分之后,我们转换一个角度,来看一下接收端发生的故事。如我们前面在UDT协议实现分析——连接的建立(http://my.oschina.net/wolfcs/blog/505253)一文中看到的那样,CUDT在connect()的后半场,会通过调用CRcvQueue::removeConnector()把它自己从它的CCha
Stella981 Stella981
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Easter79 Easter79
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Stella981 Stella981
3年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这