最后再来看一下close的过程(src/api.cpp):
int CUDTUnited::close(const UDTSOCKET u) {
CUDTSocket* s = locate(u);
if (NULL == s)
throw CUDTException(5, 4, 0);
CGuard socket_cg(s->m_ControlLock);
if (s->m_Status == LISTENING) {
if (s->m_pUDT->m_bBroken)
return 0;
s->m_TimeStamp = CTimer::getTime();
s->m_pUDT->m_bBroken = true;
// broadcast all "accept" waiting
#ifndef WIN32
pthread_mutex_lock(&(s->m_AcceptLock));
pthread_cond_broadcast(&(s->m_AcceptCond));
pthread_mutex_unlock(&(s->m_AcceptLock));
#else
SetEvent(s->m_AcceptCond);
#endif
return 0;
}
s->m_pUDT->close();
// synchronize with garbage collection.
CGuard manager_cg(m_ControlLock);
// since "s" is located before m_ControlLock, locate it again in case it became invalid
map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))
return 0;
s = i->second;
s->m_Status = CLOSED;
// a socket will not be immediated removed when it is closed
// in order to prevent other methods from accessing invalid address
// a timer is started and the socket will be removed after approximately 1 second
s->m_TimeStamp = CTimer::getTime();
m_Sockets.erase(s->m_SocketID);
m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));
CTimer::triggerEvent();
return 0;
}
int CUDT::close(UDTSOCKET u) {
try {
return s_UDTUnited.close(u);
} catch (CUDTException &e) {
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
} catch (...) {
s_UDTUnited.setError(new CUDTException(-1, 0, 0));
return ERROR;
}
}
int close(UDTSOCKET u) {
return CUDT::close(u);
}
这个API的实现结构并没有什么特别值得关注的地方。直接来看CUDTUnited::close()。在CUDTUnited::close()函数中,主要是分两种情况来处理的:一种是Server端用于接受连接的Listening socket;另一种是常规的用于进行数据收发的socket。
对于第一种情况,可以看到,这里主要是设置了对应的CUDTSocket s的m_TimeStamp为当前时间,并将s->m_pUDT->m_bBroken置为true,然后将等待在accept的线程唤醒就结束了。不管是在CUDTUnited::close()中,还是在被它唤醒的执行CUDTUnited::accept()的线程中,都没有看到有实际做最后的清理的动作,比如被加入队列的为新连接请求创建的UDT Socket的清理,将当前UDT Socket从RcvQueue的listener移除。这些清理的动作只有在UDT的垃圾回收线程里做了。
接着是第二种情况,可以看到首先是执行了s->m_pUDT->close(),不难想像这个close中做的事情一定特别多;做状态的切换,将Socket的状态切换到CLOSED状态;更新对应的CUDTSocket s的m_TimeStamp为当前时间;将UDT Socket从总的打开socket表m_Sockets中移除,并加入到已关闭socket表m_ClosedSockets中,然后trigger一个Timer event并返回。
然后来看CUDT::close()(src/core.cpp):
void CUDT::close() {
if (!m_bOpened)
return;
if (0 != m_Linger.l_onoff) {
uint64_t entertime = CTimer::getTime();
while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0)
&& (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) {
// linger has been checked by previous close() call and has expired
if (m_ullLingerExpiration >= entertime)
break;
if (!m_bSynSending) {
// if this socket enables asynchronous sending, return immediately and let GC to close it later
if (0 == m_ullLingerExpiration)
m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;
return;
}
#ifndef WIN32
timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1000000;
nanosleep(&ts, NULL);
#else
Sleep(1);
#endif
}
}
// remove this socket from the snd queue
if (m_bConnected)
m_pSndQueue->m_pSndUList->remove(this);
// trigger any pending IO events.
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);
// then remove itself from all epoll monitoring
try {
for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++i)
s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
} catch (...) {
}
if (!m_bOpened)
return;
// Inform the threads handler to stop.
m_bClosing = true;
CGuard cg(m_ConnectionLock);
// Signal the sender and recver if they are waiting for data.
releaseSynch();
if (m_bListening) {
m_bListening = false;
m_pRcvQueue->removeListener(this);
} else if (m_bConnecting) {
m_pRcvQueue->removeConnector(m_SocketID);
}
if (m_bConnected) {
if (!m_bShutdown)
sendCtrl(5);
m_pCC->close();
// Store current connection information.
CInfoBlock ib;
ib.m_iIPversion = m_iIPversion;
CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
ib.m_iRTT = m_iRTT;
ib.m_iBandwidth = m_iBandwidth;
m_pCache->update(&ib);
m_bConnected = false;
}
// waiting all send and recv calls to stop
CGuard sendguard(m_SendLock);
CGuard recvguard(m_RecvLock);
// CLOSED.
m_bOpened = false;
}
可以看到,这个函数中做的事情大体如下:
1. 检查Open状态m_bOpened,若m_bOpened为false,则直接返回,否则继续执行。
2. 等待一段时间,以使在发送缓冲区中还没有发送完成的数据能够可靠地发送完成,当然过了一定时间之后,发送缓冲区中还是存在没有可靠地发送完成的数据,则那些数据会被直接丢弃掉。这里的等待,是一种比较高频率地轮询。
3. 如果当前处于Connected状态,则将当前UDT Socket从发送队列的发送者列表m_pSndUList中移除出去。
4. 这里再一次检查了m_bOpened的值。
5. 将m_bClosing置为true。
如我们前面看到的,连接成功建立之后,UDT Socket会被加入到RcvQueue的数据接收者列表m_pRcvUList和m_pHash中,但这里在关闭UDT Socket时,却没有看到有将当前UDT Socket从那些列表中移除的code。这到底是怎么回事呢?
来看一下CRcvQueue::worker()中的这段code:
CRNode* ul = self->m_pRcvUList->m_pUList;
uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) {
CUDT* u = ul->m_pUDT;
if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {
u->checkTimers();
self->m_pRcvUList->update(u);
} else {
// the socket must be removed from Hash table first, then RcvUList
self->m_pHash->remove(u->m_SocketID);
self->m_pRcvUList->remove(u);
u->m_pRNode->m_bOnList = false;
}
ul = self->m_pRcvUList->m_pUList;
}
将一个UDT Socket加入 到RcvQueue的数据接收者列表m_pRcvUList和m_pHash中的动作是由RcvQueue的worker线程自动完成的,那么将一个UDT Socket从RcvQueue的数据接收者列表m_pRcvUList和m_pHash中移除的动作自然也是有 RcvQueue的worker线程自动完成的。
在这里可以看到,将UDT Socket的m_bClosing置为true之后,RcvQueue的worker线程自会将UDT Socket从RcvQueue的数据接收者列表m_pRcvUList和m_pHash中移除。
6. 唤醒所有等待在这个UDT Socket的condition上的线程。
7. 处理处于listening状态的socket,主要是将m_bListening置为false,并将当前UDT Socket从RcvQueue的listener移除。
这里倒是看到了对于listener的处理了,但是还是没有看到对于处在队列中,但还没有被accept返回的UDT Socket的处理。
8. 处理处于m_bConnecting状态的socket,主要是将当前UDT Socket从connector列表中移除。
9. 处理处于m_bConnected状态的socket,主要是发送shutdown消息给peer端,并将m_bConnected置为false。
10. 将m_bOpened置为false。
可以看到,7、8、9的处理应该是互斥的。
Done。