UDT协议实现分析——close过程

Wesley13
• 阅读 644

最后再来看一下close的过程(src/api.cpp):

int CUDTUnited::close(const UDTSOCKET u) {
    CUDTSocket* s = locate(u);
    if (NULL == s)
        throw CUDTException(5, 4, 0);

    CGuard socket_cg(s->m_ControlLock);

    if (s->m_Status == LISTENING) {
        if (s->m_pUDT->m_bBroken)
            return 0;

        s->m_TimeStamp = CTimer::getTime();
        s->m_pUDT->m_bBroken = true;

        // broadcast all "accept" waiting
#ifndef WIN32
        pthread_mutex_lock(&(s->m_AcceptLock));
        pthread_cond_broadcast(&(s->m_AcceptCond));
        pthread_mutex_unlock(&(s->m_AcceptLock));
#else
        SetEvent(s->m_AcceptCond);
#endif

        return 0;
    }

    s->m_pUDT->close();

    // synchronize with garbage collection.
    CGuard manager_cg(m_ControlLock);

    // since "s" is located before m_ControlLock, locate it again in case it became invalid
    map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
    if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
        return 0;
    s = i->second;

    s->m_Status = CLOSED;

    // a socket will not be immediated removed when it is closed
    // in order to prevent other methods from accessing invalid address
    // a timer is started and the socket will be removed after approximately 1 second
    s->m_TimeStamp = CTimer::getTime();

    m_Sockets.erase(s->m_SocketID);
    m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));

    CTimer::triggerEvent();

    return 0;
}



int CUDT::close(UDTSOCKET u) {
    try {
        return s_UDTUnited.close(u);
    } catch (CUDTException &e) {
        s_UDTUnited.setError(new CUDTException(e));
        return ERROR;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return ERROR;
    }
}


int close(UDTSOCKET u) {
    return CUDT::close(u);
}

这个API的实现结构并没有什么特别值得关注的地方。直接来看CUDTUnited::close()。在CUDTUnited::close()函数中,主要是分两种情况来处理的:一种是Server端用于接受连接的Listening socket;另一种是常规的用于进行数据收发的socket。

对于第一种情况,可以看到,这里主要是设置了对应的CUDTSocket s的m_TimeStamp为当前时间,并将s->m_pUDT->m_bBroken置为true,然后将等待在accept的线程唤醒就结束了。不管是在CUDTUnited::close()中,还是在被它唤醒的执行CUDTUnited::accept()的线程中,都没有看到有实际做最后的清理的动作,比如被加入队列的为新连接请求创建的UDT Socket的清理,将当前UDT Socket从RcvQueue的listener移除。这些清理的动作只有在UDT的垃圾回收线程里做了。

接着是第二种情况,可以看到首先是执行了s->m_pUDT->close(),不难想像这个close中做的事情一定特别多;做状态的切换,将Socket的状态切换到CLOSED状态;更新对应的CUDTSocket s的m_TimeStamp为当前时间;将UDT Socket从总的打开socket表m_Sockets中移除,并加入到已关闭socket表m_ClosedSockets中,然后trigger一个Timer event并返回。

然后来看CUDT::close()(src/core.cpp):

void CUDT::close() {
    if (!m_bOpened)
        return;

    if (0 != m_Linger.l_onoff) {
        uint64_t entertime = CTimer::getTime();

        while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0)
                && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) {
            // linger has been checked by previous close() call and has expired
            if (m_ullLingerExpiration >= entertime)
                break;

            if (!m_bSynSending) {
                // if this socket enables asynchronous sending, return immediately and let GC to close it later
                if (0 == m_ullLingerExpiration)
                    m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;

                return;
            }

#ifndef WIN32
            timespec ts;
            ts.tv_sec = 0;
            ts.tv_nsec = 1000000;
            nanosleep(&ts, NULL);
#else
            Sleep(1);
#endif
        }
    }

    // remove this socket from the snd queue
    if (m_bConnected)
        m_pSndQueue->m_pSndUList->remove(this);

    // trigger any pending IO events.
    s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
    // then remove itself from all epoll monitoring
    try {
        for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++i)
            s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
    } catch (...) {
    }

    if (!m_bOpened)
        return;

    // Inform the threads handler to stop.
    m_bClosing = true;

    CGuard cg(m_ConnectionLock);

    // Signal the sender and recver if they are waiting for data.
    releaseSynch();

    if (m_bListening) {
        m_bListening = false;
        m_pRcvQueue->removeListener(this);
    } else if (m_bConnecting) {
        m_pRcvQueue->removeConnector(m_SocketID);
    }

    if (m_bConnected) {
        if (!m_bShutdown)
            sendCtrl(5);

        m_pCC->close();

        // Store current connection information.
        CInfoBlock ib;
        ib.m_iIPversion = m_iIPversion;
        CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
        ib.m_iRTT = m_iRTT;
        ib.m_iBandwidth = m_iBandwidth;
        m_pCache->update(&ib);

        m_bConnected = false;
    }

    // waiting all send and recv calls to stop
    CGuard sendguard(m_SendLock);
    CGuard recvguard(m_RecvLock);

    // CLOSED.
    m_bOpened = false;
}

可以看到,这个函数中做的事情大体如下:

1. 检查Open状态m_bOpened,若m_bOpened为false,则直接返回,否则继续执行。

2. 等待一段时间,以使在发送缓冲区中还没有发送完成的数据能够可靠地发送完成,当然过了一定时间之后,发送缓冲区中还是存在没有可靠地发送完成的数据,则那些数据会被直接丢弃掉。这里的等待,是一种比较高频率地轮询。

3. 如果当前处于Connected状态,则将当前UDT Socket从发送队列的发送者列表m_pSndUList中移除出去。

4. 这里再一次检查了m_bOpened的值。

5. 将m_bClosing置为true。

如我们前面看到的,连接成功建立之后,UDT Socket会被加入到RcvQueue的数据接收者列表m_pRcvUList和m_pHash中,但这里在关闭UDT Socket时,却没有看到有将当前UDT Socket从那些列表中移除的code。这到底是怎么回事呢?

来看一下CRcvQueue::worker()中的这段code:

CRNode* ul = self->m_pRcvUList->m_pUList;
        uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
        while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) {
            CUDT* u = ul->m_pUDT;

            if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {
                u->checkTimers();
                self->m_pRcvUList->update(u);
            } else {
                // the socket must be removed from Hash table first, then RcvUList
                self->m_pHash->remove(u->m_SocketID);
                self->m_pRcvUList->remove(u);
                u->m_pRNode->m_bOnList = false;
            }

            ul = self->m_pRcvUList->m_pUList;
        }

将一个UDT Socket加入 到RcvQueue的数据接收者列表m_pRcvUList和m_pHash中的动作是由RcvQueue的worker线程自动完成的,那么将一个UDT Socket从RcvQueue的数据接收者列表m_pRcvUList和m_pHash中移除的动作自然也是有 RcvQueue的worker线程自动完成的。

在这里可以看到,将UDT Socket的m_bClosing置为true之后,RcvQueue的worker线程自会将UDT Socket从RcvQueue的数据接收者列表m_pRcvUList和m_pHash中移除。

6. 唤醒所有等待在这个UDT Socket的condition上的线程。

7. 处理处于listening状态的socket,主要是将m_bListening置为false,并将当前UDT Socket从RcvQueue的listener移除。

这里倒是看到了对于listener的处理了,但是还是没有看到对于处在队列中,但还没有被accept返回的UDT Socket的处理。

8. 处理处于m_bConnecting状态的socket,主要是将当前UDT Socket从connector列表中移除。

9. 处理处于m_bConnected状态的socket,主要是发送shutdown消息给peer端,并将m_bConnected置为false。

10. 将m_bOpened置为false。

可以看到,7、8、9的处理应该是互斥的。

Done。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Android蓝牙连接汽车OBD设备
//设备连接public class BluetoothConnect implements Runnable {    private static final UUID CONNECT_UUID  UUID.fromString("0000110100001000800000805F9B34FB");
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这