#22_BasicTaskScheduler基本任务调度器(二)——Live555源码阅读(一)任务调度相关类
[TOC]
这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。
本文由乌合之众 lym瞎编,欢迎转载 my.oschina.net/oloroso
##SingleStep方法
这是这里最重要的一个方法。每一次调用都是一次真正的处理数据的过程。 前面的延时队列DelayQueue
、处理程序链表HanlerSet
、触发器数组fTriggeredEventHandlers
和fTriggeredEventClientDatas
都是在这里被真正的调度起来的。 这一段的代码很长,过程有点多。要联系了前面讲过的内容来看才能比较好理解。这里要注意的fLastHandledSocketNum
成员的操作。因为其在别的位置都没有修改过,只在这里轮询处理
的时候,如果有处理了fHandlers
中某个节点的时候才会去设置。再一个要思考的是,fHandlers
中的元素是**从何而来的?
**在BasicTaskScheduler
的两个基类中都没有对fHandlers
成员有相关的操作。
这个函数做了三件事情。
获取延时队列头结点的延时剩余时间,作为select操作的超时时间。调用select监控三个集合。 如果select调用成功了,那么就开始轮询HandlerSet对象fHandlers中的节点,有符合条件的就使用其内部保存的函数指针和数据指针以及条件掩码来调用函数。
处理等待触发事件集里面的事件。
处理延时队列中到达延时时间的节点。
void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) { //拷贝三个集合去给select调用做参数 fd_set readSet = fReadSet; // make a copy for this select() call fd_set writeSet = fWriteSet; // ditto fd_set exceptionSet = fExceptionSet; // ditto //获取延时队列头结点的延时剩余时间(作为select超时时间) DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm(); struct timeval tv_timeToDelay; tv_timeToDelay.tv_sec = timeToDelay.seconds(); tv_timeToDelay.tv_usec = timeToDelay.useconds(); // Very large "tv_sec" values cause select() to fail. // Don't make it any larger than 1 million seconds (11.5 days) // 控制在1百万秒以内 const long MAX_TV_SEC = MILLION; if (tv_timeToDelay.tv_sec > MAX_TV_SEC) { tv_timeToDelay.tv_sec = MAX_TV_SEC; } // Also check our "maxDelayTime" parameter (if it's > 0): if (maxDelayTime > 0 && (tv_timeToDelay.tv_sec > (long)maxDelayTime / MILLION || (tv_timeToDelay.tv_sec == (long)maxDelayTime / MILLION && tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) { tv_timeToDelay.tv_sec = maxDelayTime / MILLION; tv_timeToDelay.tv_usec = maxDelayTime%MILLION; } //调用select来监控集合 int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay); //------------------------------------------------------------------------------------------ //select出错返回,处理错误 if (selectResult < 0) { #if defined(__WIN32__) || defined(_WIN32) int err = WSAGetLastError(); // For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if // it was called with no entries set in "readSet". If this happens, ignore it: if (err == WSAEINVAL && readSet.fd_count == 0) { err = EINTR; // To stop this from happening again, create a dummy socket: int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0); FD_SET((unsigned)dummySocketNum, &fReadSet); } if (err != EINTR) { #else if (errno != EINTR && errno != EAGAIN) { #endif // Unexpected error - treat this as fatal: #if !defined(_WIN32_WCE) perror("BasicTaskScheduler::SingleStep(): select() fails"); #endif //内部错误,调用abort() internalError(); } } //----------------------------------------------------------------------------- //开始处理 // Call the handler function for one readable socket: HandlerIterator iter(*fHandlers); HandlerDescriptor* handler; // To ensure forward progress through the handlers, begin past the last // socket number that we handled: //注意fLastHandledSocketNum如果不为-1,说明已经调度过某些任务了 if (fLastHandledSocketNum >= 0) { while ((handler = iter.next()) != NULL) { //从链表中找上一次最后调度的处理程序描述对象 if (handler->socketNum == fLastHandledSocketNum) break; } if (handler == NULL) { fLastHandledSocketNum = -1; //没有找到 iter.reset(); // start from the beginning instead 迭代器回到起点 } } //轮询处理 //如果上面最后一个Handle == NULL成立了,那么这里不会进入,iter.next()还是会返回NULL //也就是说上次最后被调度的对象被找到了,这里的循环才会进入 //这是为了提高效率,因为找到了最后一个被调度的元素,那么其之前的元素就都已经被调度过了 while ((handler = iter.next()) != NULL) { int sock = handler->socketNum; // alias 别名 int resultConditionSet = 0; // 结果条件(状态)集合 if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/sanity理智 check/) resultConditionSet |= SOCKET_READABLE; //添加可读属性 if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/sanity check/) resultConditionSet |= SOCKET_WRITABLE; //添加可写属性 if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/sanity check/) resultConditionSet |= SOCKET_EXCEPTION; //添加异常属性 if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) { fLastHandledSocketNum = sock; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. //调用相关处理 (*handler->handlerProc)(handler->clientData, resultConditionSet); break; } } //如果没有找到上次最后被调度的对象,并且fLastHandledSocketNum标识存在 if (handler == NULL && fLastHandledSocketNum >= 0) { // We didn't call a handler, but we didn't get to check all of them, // so try again from the beginning: // 我们没有给一个处理程序,但我们没有去检查所有这些,所以试着重新开始: iter.reset(); //回到链表头 //从链表第头开始轮询处理 while ((handler = iter.next()) != NULL) { int sock = handler->socketNum; // alias int resultConditionSet = 0; if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/sanity check/) resultConditionSet |= SOCKET_READABLE; if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/sanity check/) resultConditionSet |= SOCKET_WRITABLE; if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/sanity check/) resultConditionSet |= SOCKET_EXCEPTION; if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) { //设置fLastHandledSocketNum为最后一个被调用的处理程序的标识 fLastHandledSocketNum = sock; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. (*handler->handlerProc)(handler->clientData, resultConditionSet); break; } } // 没有一个合适的处理程序被调用 if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler } //==========================================================================================
// Also handle any newly-triggered event (Note that we do this *after* calling a socket handler, // in case the triggered event handler modifies The set of readable sockets.) // 处理等待触发的事件,这个在fTriggersAwaitingHandling中被标识 if (fTriggersAwaitingHandling != 0) { if (fTriggersAwaitingHandling == fLastUsedTriggerMask) { //只有一个等待触发的事件 // Common-case optimization for a single event trigger: fTriggersAwaitingHandling = 0; if (fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL) { //函数调用 (*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDatas[fLastUsedTriggerNum]); } } else { // 有多个等待触发的事件 // Look for an event trigger that needs handling (making sure that we make forward progress through all possible triggers): unsigned i = fLastUsedTriggerNum; EventTriggerId mask = fLastUsedTriggerMask; do { i = (i + 1) % MAX_NUM_EVENT_TRIGGERS; mask >>= 1; if (mask == 0) mask = 0x80000000; if ((fTriggersAwaitingHandling&mask) != 0) { fTriggersAwaitingHandling &= ~mask; if (fTriggeredEventHandlers[i] != NULL) { (*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]); } fLastUsedTriggerMask = mask; fLastUsedTriggerNum = i; break; } } while (i != fLastUsedTriggerNum); } }
//====================================================================================== // Also handle any delayed event that may have come due. // 处理延时队列中已经到时间的延时任务 fDelayQueue.handleAlarm(); }
##setBackgroundHandling方法(添加后台处理程序)
setBackgroundHandling
方法用于添加或更新一个处理程序到fHandlers
链表。如果conditionSe
t为0
,就将socketNum
标识的节点从fHandlers
中移除。否则若socketNum
标识的节点存在,就更新,否则就添加一个节点。
void BasicTaskScheduler
::setBackgroundHandling(int socketNum, int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData) {
if (socketNum < 0) return; //标识不合法
FD_CLR((unsigned)socketNum, &fReadSet); //不监控此套接口的可读状态
FD_CLR((unsigned)socketNum, &fWriteSet); //写
FD_CLR((unsigned)socketNum, &fExceptionSet);//异常
if (conditionSet == 0) { //不监控任何可操作状态
fHandlers->clearHandler(socketNum); //从链表中移除
if (socketNum + 1 == fMaxNumSockets) { //最大socket数减1,效率提升
--fMaxNumSockets;
}
}
else {
//更新链表,分配处理程序
fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
if (socketNum + 1 > fMaxNumSockets) {
fMaxNumSockets = socketNum + 1; //更新最大socket数
}
//设置要监控的状态
if (conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
if (conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
if (conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
}
}
##moveSocketHandling方法(转移socket处理)
这个方法名不怎么好翻译,有点类似C++11 move
操作。都是转移操作。这里是将原本对oldSocketNum
套接口操作的处理程序转移到去操作newSocketNum
套接口。如果原本oldSocketNum
就不再链表fHandle
r中呢?那就相当于仅仅把对oldSocketNum
的监控给移除了。注意,这里设置了对newSocketNum
的监控,而无论其是否被加入到fHandler
链表。
void BasicTaskScheduler::moveSocketHandling(int oldSocketNum, int newSocketNum) {
if (oldSocketNum < 0 || newSocketNum < 0) return; // sanity check完整性检查
//清理三个集合中对oldSocketNum的监控
if (FD_ISSET(oldSocketNum, &fReadSet)) { FD_CLR((unsigned)oldSocketNum, &fReadSet); FD_SET((unsigned)newSocketNum, &fReadSet); }
if (FD_ISSET(oldSocketNum, &fWriteSet)) { FD_CLR((unsigned)oldSocketNum, &fWriteSet); FD_SET((unsigned)newSocketNum, &fWriteSet); }
if (FD_ISSET(oldSocketNum, &fExceptionSet)) { FD_CLR((unsigned)oldSocketNum, &fExceptionSet); FD_SET((unsigned)newSocketNum, &fExceptionSet); }
//替换socketNum
fHandlers->moveHandler(oldSocketNum, newSocketNum);
if (oldSocketNum + 1 == fMaxNumSockets) {
--fMaxNumSockets;
}
if (newSocketNum + 1 > fMaxNumSockets) {
fMaxNumSockets = newSocketNum + 1;
}
}