22_BasicTaskScheduler基本任务调度器(二)——Live555源码阅读(一)任务调

可莉
• 阅读 856

#22_BasicTaskScheduler基本任务调度器(二)——Live555源码阅读(一)任务调度相关类

[TOC]

这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。

本文由乌合之众 lym瞎编,欢迎转载 my.oschina.net/oloroso

##SingleStep方法

这是这里最重要的一个方法。每一次调用都是一次真正的处理数据的过程。 前面的延时队列DelayQueue、处理程序链表HanlerSet、触发器数组fTriggeredEventHandlersfTriggeredEventClientDatas都是在这里被真正的调度起来的。 这一段的代码很长,过程有点多。要联系了前面讲过的内容来看才能比较好理解。这里要注意的fLastHandledSocketNum成员的操作。因为其在别的位置都没有修改过,只在这里轮询处理的时候,如果有处理了fHandlers中某个节点的时候才会去设置。再一个要思考的是,fHandlers中的元素是**从何而来的?**在BasicTaskScheduler的两个基类中都没有对fHandlers成员有相关的操作。

这个函数做了三件事情。

  1. 获取延时队列头结点的延时剩余时间,作为select操作的超时时间。调用select监控三个集合。 如果select调用成功了,那么就开始轮询HandlerSet对象fHandlers中的节点,有符合条件的就使用其内部保存的函数指针和数据指针以及条件掩码来调用函数。

  2. 处理等待触发事件集里面的事件。

  3. 处理延时队列中到达延时时间的节点。

    void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) { //拷贝三个集合去给select调用做参数 fd_set readSet = fReadSet; // make a copy for this select() call fd_set writeSet = fWriteSet; // ditto fd_set exceptionSet = fExceptionSet; // ditto //获取延时队列头结点的延时剩余时间(作为select超时时间) DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm(); struct timeval tv_timeToDelay; tv_timeToDelay.tv_sec = timeToDelay.seconds(); tv_timeToDelay.tv_usec = timeToDelay.useconds(); // Very large "tv_sec" values cause select() to fail. // Don't make it any larger than 1 million seconds (11.5 days) // 控制在1百万秒以内 const long MAX_TV_SEC = MILLION; if (tv_timeToDelay.tv_sec > MAX_TV_SEC) { tv_timeToDelay.tv_sec = MAX_TV_SEC; } // Also check our "maxDelayTime" parameter (if it's > 0): if (maxDelayTime > 0 && (tv_timeToDelay.tv_sec > (long)maxDelayTime / MILLION || (tv_timeToDelay.tv_sec == (long)maxDelayTime / MILLION && tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) { tv_timeToDelay.tv_sec = maxDelayTime / MILLION; tv_timeToDelay.tv_usec = maxDelayTime%MILLION; } //调用select来监控集合 int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay); //------------------------------------------------------------------------------------------ //select出错返回,处理错误 if (selectResult < 0) { #if defined(__WIN32__) || defined(_WIN32) int err = WSAGetLastError(); // For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if // it was called with no entries set in "readSet". If this happens, ignore it: if (err == WSAEINVAL && readSet.fd_count == 0) { err = EINTR; // To stop this from happening again, create a dummy socket: int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0); FD_SET((unsigned)dummySocketNum, &fReadSet); } if (err != EINTR) { #else if (errno != EINTR && errno != EAGAIN) { #endif // Unexpected error - treat this as fatal: #if !defined(_WIN32_WCE) perror("BasicTaskScheduler::SingleStep(): select() fails"); #endif //内部错误,调用abort() internalError(); } } //----------------------------------------------------------------------------- //开始处理 // Call the handler function for one readable socket: HandlerIterator iter(*fHandlers); HandlerDescriptor* handler; // To ensure forward progress through the handlers, begin past the last // socket number that we handled: //注意fLastHandledSocketNum如果不为-1,说明已经调度过某些任务了 if (fLastHandledSocketNum >= 0) { while ((handler = iter.next()) != NULL) { //从链表中找上一次最后调度的处理程序描述对象 if (handler->socketNum == fLastHandledSocketNum) break; } if (handler == NULL) { fLastHandledSocketNum = -1; //没有找到 iter.reset(); // start from the beginning instead 迭代器回到起点 } } //轮询处理 //如果上面最后一个Handle == NULL成立了,那么这里不会进入,iter.next()还是会返回NULL //也就是说上次最后被调度的对象被找到了,这里的循环才会进入 //这是为了提高效率,因为找到了最后一个被调度的元素,那么其之前的元素就都已经被调度过了 while ((handler = iter.next()) != NULL) { int sock = handler->socketNum; // alias 别名 int resultConditionSet = 0; // 结果条件(状态)集合 if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/sanity理智 check/) resultConditionSet |= SOCKET_READABLE; //添加可读属性 if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/sanity check/) resultConditionSet |= SOCKET_WRITABLE; //添加可写属性 if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/sanity check/) resultConditionSet |= SOCKET_EXCEPTION; //添加异常属性 if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) { fLastHandledSocketNum = sock; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. //调用相关处理 (*handler->handlerProc)(handler->clientData, resultConditionSet); break; } } //如果没有找到上次最后被调度的对象,并且fLastHandledSocketNum标识存在 if (handler == NULL && fLastHandledSocketNum >= 0) { // We didn't call a handler, but we didn't get to check all of them, // so try again from the beginning: // 我们没有给一个处理程序,但我们没有去检查所有这些,所以试着重新开始: iter.reset(); //回到链表头 //从链表第头开始轮询处理 while ((handler = iter.next()) != NULL) { int sock = handler->socketNum; // alias int resultConditionSet = 0; if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/sanity check/) resultConditionSet |= SOCKET_READABLE; if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/sanity check/) resultConditionSet |= SOCKET_WRITABLE; if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/sanity check/) resultConditionSet |= SOCKET_EXCEPTION; if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) { //设置fLastHandledSocketNum为最后一个被调用的处理程序的标识 fLastHandledSocketNum = sock; // Note: we set "fLastHandledSocketNum" before calling the handler, // in case the handler calls "doEventLoop()" reentrantly. (*handler->handlerProc)(handler->clientData, resultConditionSet); break; } } // 没有一个合适的处理程序被调用 if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler } //==========================================================================================

    // Also handle any newly-triggered event (Note that we do this *after* calling a socket handler,
    // in case the triggered event handler modifies The set of readable sockets.)
    // 处理等待触发的事件,这个在fTriggersAwaitingHandling中被标识
    if (fTriggersAwaitingHandling != 0) {
        if (fTriggersAwaitingHandling == fLastUsedTriggerMask) {
        //只有一个等待触发的事件
            // Common-case optimization for a single event trigger:
            fTriggersAwaitingHandling = 0;
            if (fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL) {
                //函数调用
                (*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDatas[fLastUsedTriggerNum]);
            }
        }
        else {
            // 有多个等待触发的事件
            // Look for an event trigger that needs handling (making sure that we make forward progress through all possible triggers):
            unsigned i = fLastUsedTriggerNum;
            EventTriggerId mask = fLastUsedTriggerMask;
    
            do {
                i = (i + 1) % MAX_NUM_EVENT_TRIGGERS;
                mask >>= 1;
                if (mask == 0) mask = 0x80000000;
    
                if ((fTriggersAwaitingHandling&mask) != 0) {
                    fTriggersAwaitingHandling &= ~mask;
                    if (fTriggeredEventHandlers[i] != NULL) {
                        (*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]);
                    }
    
                    fLastUsedTriggerMask = mask;
                    fLastUsedTriggerNum = i;
                    break;
                }
            } while (i != fLastUsedTriggerNum);
        }
    }
    

    //====================================================================================== // Also handle any delayed event that may have come due. // 处理延时队列中已经到时间的延时任务 fDelayQueue.handleAlarm(); }


##setBackgroundHandling方法(添加后台处理程序)

setBackgroundHandling方法用于添加或更新一个处理程序到fHandlers链表。如果conditionSet为0,就将socketNum标识的节点从fHandlers中移除。否则若socketNum标识的节点存在,就更新,否则就添加一个节点。

void BasicTaskScheduler
::setBackgroundHandling(int socketNum, int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData) {
    if (socketNum < 0) return;    //标识不合法
    FD_CLR((unsigned)socketNum, &fReadSet);        //不监控此套接口的可读状态
    FD_CLR((unsigned)socketNum, &fWriteSet);    //写
    FD_CLR((unsigned)socketNum, &fExceptionSet);//异常
    if (conditionSet == 0) {    //不监控任何可操作状态
        fHandlers->clearHandler(socketNum);    //从链表中移除
        if (socketNum + 1 == fMaxNumSockets) {    //最大socket数减1,效率提升
            --fMaxNumSockets;
        }
    }
    else {
        //更新链表,分配处理程序
        fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
        if (socketNum + 1 > fMaxNumSockets) {
            fMaxNumSockets = socketNum + 1;    //更新最大socket数
        }
        //设置要监控的状态
        if (conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
        if (conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
        if (conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
    }
}

##moveSocketHandling方法(转移socket处理)

这个方法名不怎么好翻译,有点类似C++11 move操作。都是转移操作。这里是将原本对oldSocketNum套接口操作的处理程序转移到去操作newSocketNum套接口。如果原本oldSocketNum就不再链表fHandler中呢?那就相当于仅仅把对oldSocketNum的监控给移除了。注意,这里设置了对newSocketNum的监控,而无论其是否被加入到fHandler链表。

void BasicTaskScheduler::moveSocketHandling(int oldSocketNum, int newSocketNum) {
    if (oldSocketNum < 0 || newSocketNum < 0) return; // sanity check完整性检查
    //清理三个集合中对oldSocketNum的监控
    if (FD_ISSET(oldSocketNum, &fReadSet)) { FD_CLR((unsigned)oldSocketNum, &fReadSet); FD_SET((unsigned)newSocketNum, &fReadSet); }
    if (FD_ISSET(oldSocketNum, &fWriteSet)) { FD_CLR((unsigned)oldSocketNum, &fWriteSet); FD_SET((unsigned)newSocketNum, &fWriteSet); }
    if (FD_ISSET(oldSocketNum, &fExceptionSet)) { FD_CLR((unsigned)oldSocketNum, &fExceptionSet); FD_SET((unsigned)newSocketNum, &fExceptionSet); }
    //替换socketNum
    fHandlers->moveHandler(oldSocketNum, newSocketNum);

    if (oldSocketNum + 1 == fMaxNumSockets) {
        --fMaxNumSockets;
    }
    if (newSocketNum + 1 > fMaxNumSockets) {
        fMaxNumSockets = newSocketNum + 1;
    }
}
点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
6个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
可莉 可莉
3年前
19_BasicTaskScheduler0 基本任务调度类基类(一)——Live555源码阅读(一
19\_BasicTaskScheduler0基本任务调度类基类(一)——Live555源码阅读(一)任务调度相关类这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。本文由乌
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
可莉 可莉
3年前
21_BasicTaskScheduler基本任务调度器(一)——Live555源码阅读(一)任务调
21\_BasicTaskScheduler基本任务调度器(一)——Live555源码阅读(一)任务调度相关类\TOC\这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。
可莉 可莉
3年前
18 TaskScheduler任务调度器抽象基类——Live555源码阅读(一)任务调度相关类
18TaskScheduler任务调度器抽象基类——Live555源码阅读(一)任务调度相关类这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。本文由乌合之众lym瞎编,欢
可莉 可莉
3年前
20_BasicTaskScheduler0 基本任务调度类基类(二)——Live555源码阅读(一
20\_BasicTaskScheduler0基本任务调度类基类(二)——Live555源码阅读(一)任务调度相关类这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。本文由乌
可莉 可莉
3年前
17 任务调度相关类综述——Live555源码阅读(一)任务调度相关类
17任务调度相关类综述——Live555源码阅读(一)任务调度相关类这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。本文由乌合之众lym瞎编,欢迎转载my.oschi
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这