Swoole源码学习记录(十一)——Worker,Connection

Wesley13
• 阅读 635

Swoole版本:1.7.5-stable

Github地址: https://github.com/LinkedDestiny/swoole-src-analysis

本章将分析Swoole中的三个比较重要的模块,Worker,ReactorProcess和Connection。其中Worker和ReactorProcess其实是对前面三章的一个补充,在前面的章节中为了分析结果的流畅性没有针对这些模块做特定分析,在此做出补充。

Worker模块

首先是Worker模块。Worker在Swoole中为核心工作进程的封装,包括用于处理核心逻辑的worker和用于处理任务的task_worker。在Swoole中使用了结构体swWorker来封装worker进程的相关属性,其声明在swoole.h文件中的727 – 787行,其声明如下:

struct _swWorker
{
         /**
          * worker process
          */
         pid_tpid;
 
         /**
          * worker thread
          */
         pthread_ttid;
 
         swProcessPool*pool;
 
         swMemoryPool*pool_output;
 
         swQueue*queue;
 
         /**
          * redirect stdout to pipe_master
          */
         uint8_tredirect_stdout;
 
         /**
          * worker status, IDLE or BUSY
          */
         uint8_tstatus;
 
         uint8_tipc_mode;
 
         /**
          * redirect stdin to pipe_worker
          */
         uint8_tredirect_stdin;
 
         /**
          * worker id
          */
         uint32_tid;
 
         /**
          * eventfd, process notify
          */
         swPipe*notify;
 
         /**
          * share memory store
          */
         struct
         {
                   uint8_tlock;
                   void*ptr;
         }store;
 
         intpipe_master;
         intpipe_worker;
         intpipe;
         intreactor_id;
         void*ptr;
         void*ptr2;
};

标有注释的变量就不说明了,大家一看就明白。剩下的几个,pool是个进程池,用于分配task_worker,pool_output用于存放task_worker执行结束后的结果,queue是消息队列,ipc_mode是进程间的通讯模式,pipe_master和pipe_worker是管道的fd,分别用于写消息到master进程和从master读取消息,pipe当然就是管道的id,reactor_id是该worker归属的reactor的标志。(ptr和ptr2待补充,实在没找到这俩变量在哪用的)

这里做一点补充,一个Worker中有两个管道,一个管道用于和master进程通信,一个管道用于和Reactor通信。而实际上如果指定了消息队列模式,则通信方式都是通过读写swQueue队列来实现的。这几点在之前的分析中已经有说明了,再此补充说明一下。

swWorker有四个操作函数,这些函数声明在Server.h文件中的545 – 548行,其声明如下:

int swWorker_create(swWorker *worker);
void swWorker_free(swWorker *worker);
void swWorker_signal_init(void);
void swWorker_signal_handler(int signo);

这四个函数声明在Worker.c中,其中swWorker_free只是释放了worker的store内存并且关闭了notify管道(不接收reactor消息),swWorker_signal_init指定了对应信号的回调函数,swWorker_signal_handler规定了对应信号的操作(基本——没内容,看一看就好……),这里只贴出swWorker_create的源码:

void*store = sw_shm_malloc(SwooleG.serv->buffer_output_size);
         if(store == NULL)
         {
                   swWarn("mallocfor worker->store failed.");
                   returnSW_ERR;
         }
 
         swPipe*worker_notify = sw_malloc(sizeof(swPipe));
         if(worker_notify == NULL)
         {
                   swWarn("mallocfor worker->notify failed.");
                   sw_shm_free(store);
                   returnSW_ERR;
         }
 
         /**
          * Create notify pipe
          */
         if(swPipeNotify_auto(worker_notify, 1, 0))
         {
                   sw_shm_free(store);
                   sw_free(worker_notify);
                   returnSW_ERR;
         }

源码解释:创建共享内存store,分配通信管道notify的内存,并调用swPipeNotify_auto创建实际管道(根据内核版本决定使用eventfd管道还是Base管道)。

Connection模块

Connection模块应该是少有的有自己独立的头文件和C文件的模块了……

Connection用于存储一个实际的连接(C-to-S),用来存放一些相关的变量,比如连接时长、上一次响应时间之类的,当然另一个好处就在于可以给这个连接加一个SSL控制然后就能实现收发数据的安全加密解密了对不对(好像很厉害的样子)……

首先是swConnection结构体,该结构体封装了所有Connection的属性变量,其声明在Connection.c文件中,声明如下:

typedef struct _swConnection
{
         /**
          * is active
          * system fd must be 0. en: timerfd, signalfd,listen socket
          */
         uint8_tactive;
 
         /**
          * file descript
          */
         intfd;
 
         /**
          * ReactorThread id
          */
         uint16_tfrom_id;
 
         /**
          * from which socket fd
          */
         uint16_tfrom_fd;
 
         /**
          * socket address
          */
         structsockaddr_in addr;
 
         /**
          * link any thing
          */
         void*object;
 
         /**
          * input buffer
          */
         swBuffer*in_buffer;
 
         /**
          * output buffer
          */
         swBuffer*out_buffer;
 
         /**
          * connect time(seconds)
          */
         time_tconnect_time;
 
         /**
          * received time with last data
          */
         time_tlast_time;
 
#ifdef SW_USE_OPENSSL
         SSL*ssl;
         uint32_tssl_state;
#endif
 
} swConnection;

基本上每一个变量都有注释我就不多废话了……不过这里说明一下,swConnection中的fd变量存放的是该连接所对应的描述符,而那个from_fd吧……经过一系列的排查我最终发现……from_fd是Server创建的监听fd。

(大概说一下排查过程,首先找到创建Connection的函数,因为知道Connection大概会在Server处创建,所以在Server.h中找到swServer_connection_new,然后看每个变量的具体赋值,然后发现from_fd是参数中的swDataHead赋予的,于是回追到调用swServer_connection_new的函数swServer_master_onAccept,然后就发现fd来自于accept的返回值,是一个新创建的socket连接,而from_fd是server创建的用于监听的socketfd)

Connection的操作函数分布在两个地方,new和close函数声明在Server.h的476 – 477 行,而其他操作函数都在Connection.h文件中声明(收发数据、操作缓存)。

首先是在Server.h中用于新建和关闭连接的函数,其声明如下:

swConnection* swServer_connection_new(swServer *serv, swDataHead *ev);
int swServer_connection_close(swServer*serv, int fd, int notify);

这两个函数的具体定义在Server.c文件中,swServer_connection_new函数定义在文件最末尾,其核心代码如下:

int conn_fd = ev->fd;
   swConnection* connection = NULL;
 
   SwooleStats->accept_count++;
   sw_atomic_fetch_add(&SwooleStats->connection_num, 1);
 
   if (conn_fd > swServer_get_maxfd(serv))
    {
       swServer_set_maxfd(serv, conn_fd);
#ifdef SW_CONNECTION_LIST_EXPAND
       //新的fd超过了最大fd
        //需要扩容
       if (conn_fd == serv->connection_list_capacity - 1)
       {
           void *new_ptr = sw_shm_realloc(serv->connection_list,sizeof(swConnection)*(serv->connection_list_capacity +SW_CONNECTION_LIST_EXPAND));
           if (new_ptr == NULL)
           {
                swWarn("connection_listrealloc fail");
                return SW_ERR;
           }
           else
           {
               serv->connection_list_capacity += SW_CONNECTION_LIST_EXPAND;
                serv->connection_list =(swConnection *)new_ptr;
           }
       }
#endif
    }
 
   connection = &(serv->connection_list[conn_fd]);
 
   connection->fd = conn_fd;
   connection->from_id = ev->from_id;
   connection->from_fd = ev->from_fd;
   connection->connect_time = SwooleGS->now;
   connection->last_time = SwooleGS->now;
   connection->active= 1; //使此连接激活,必须在最后,保证线程安全

源码解释:首先将accept_count和connection_num两个计数器加1。如果新的conn_fd超过了serv中连接列表的最大容量,就将连接列表扩容(注意到这里调用的是sw_shm_realloc,也就是connection_list的内存是可共享的)。随后将新的conn_fd加入连接列表中,然后给connection的属性赋值。主要的属性有:连接描述符fd,来自于哪个reactor、来自于哪个监听fd、连接的时间和上一次响应时间、连接的active状态。

然后是swServer_connection_close函数,该函数也定义在Server.c的末尾,让我没想到的是一个close函数也有这么长,以至于我不得不分段进行分析……

conn->active = 0;
         /**
          * Close count
          */
         sw_atomic_fetch_add(&SwooleStats->close_count,1);
         sw_atomic_fetch_sub(&SwooleStats->connection_num,1);
 
         intreactor_id = conn->from_id;
 
         reactor= &(serv->reactor_threads[reactor_id].reactor);

源码解释:首先将conn设置为关闭状态,然后修改两个计数器close_count和connection_num.随后根据conn中的reactor_id从serv的reactor列表中获取对应的reactor。

//释放缓存区占用的内存
   if (serv->open_eof_check)
    {
       if (conn->in_buffer)
       {
           swBuffer_free(conn->in_buffer);
           conn->in_buffer = NULL;
       }
    }
   else if (serv->open_length_check)
    {
       if (conn->object)
       {
           swString_free(conn->object);
       }
    }
   else if (serv->open_http_protocol)
    {
       if (conn->object)
       {
           swHttpRequest *request = (swHttpRequest *) conn->object;
           if (request->state > 0 && request->buffer)
           {
                swTrace("ConnectionClose.free buffer=%p, request=%p\n", request->buffer, request);
                swString_free(request->buffer);
                bzero(request,sizeof(swHttpRequest));
           }
       }
    }
    if(conn->out_buffer != NULL)
    {
       swBuffer_free(conn->out_buffer);
       conn->out_buffer = NULL;
    }
 
   if (conn->in_buffer != NULL)
    {
       swBuffer_free(conn->in_buffer);
       conn->in_buffer = NULL;
    }

源码解释:这里用于释放conn开启的缓存区。如果开启了eof_check,则需要将conn的输入缓存释放(这里放着没有检测到eof的数据);如果打开了length_check,则将object指向的对象释放;如果使用了http协议,则释放object指向的swHttpRequest对象。最后,释放out_buffer和in_buffer。

//通知到worker进程
   if (serv->onClose != NULL && notify == 1)
    {
       //通知worker进程
       notify_ev.from_id = reactor_id;
       notify_ev.fd = fd;
       notify_ev.type = SW_EVENT_CLOSE;
       SwooleG.factory->notify(SwooleG.factory, ¬ify_ev);
    }

源码解释:如果设置了onClose回调并且指定需要通知,则调用factory的notify方法发送通知到worker进程。(所以发现onClose回调异常的童鞋可以通过这里查找bug)

#ifdef SW_USE_OPENSSL
         if(conn->ssl)
         {
                   swSSL_close(conn);
         }
#endif
 
   /**
    * reset maxfd, for connection_list
    */
   if (fd == swServer_get_maxfd(serv))
    {
       SwooleG.lock.lock(&SwooleG.lock);
       int find_max_fd = fd - 1;
       swTrace("set_maxfd=%d|close_fd=%d\n", find_max_fd, fd);
       /**
        * Find the new max_fd
        */
       for (; serv->connection_list[find_max_fd].active == 0 &&find_max_fd > swServer_get_minfd(serv); find_max_fd--);
       swServer_set_maxfd(serv, find_max_fd);
       SwooleG.lock.unlock(&SwooleG.lock);
    }
 
     //关闭此连接,必须放在最前面,以保证线程安全
     returnreactor->del(reactor, fd);

源码解释:如果使用了SSL功能,则需要从SSL中移除该SSL。如果关闭的fd是maxfd,则需要重新设置serv中的连接列表,操作步骤为先锁住全局变量SwooleG,然后从末尾循环便利connection_list直到找到当前最大的fd,最后解锁。最后一步,从reactor中移除监听的fd(谁能给我解释一下这句注释啥意思,很急,在线等)

在Connection.h中一共声明了10个操作函数,其中三个内联函数。据我目测,实际上原来应该没有swConnection_recv和swConnection_send这两个函数,应该是有了SSL特性后专门添加上的,因为这俩函数就只是判断了一下有没有开启SSL特性,如果开启了并且conn设置了ssl,则调用ssl的安全读写方法。swConnection_error函数也不具体分析了,只是根据errcode的值返回对应的SW_*。

另外7个操作函数声明如下:

int swConnection_send_blocking(int fd, void*data, int length, int timeout);
int swConnection_buffer_send(swConnection*conn);
 
swString *swConnection_get_string_buffer(swConnection *conn);
void swConnection_clear_string_buffer(swConnection *conn);
volatile swBuffer_trunk*swConnection_get_out_buffer(swConnection *conn, uint32_t type);
volatile swBuffer_trunk*swConnection_get_in_buffer(swConnection *conn);
int swConnection_sendfile(swConnection*conn, char *filename);

可以看到基本上都是用于发送数据、文件以及操作缓存区的函数,下面一个个上分析。

首先是swConnection_send_blocking函数,该函数是一个阻塞式的发送函数,其源码如下:

while (writen > 0)
    {
       if (swSocket_wait(fd, timeout, SW_EVENT_WRITE) < 0)
       {
           return SW_ERR;
       }
       else
       {
           n = send(fd, data, writen, MSG_NOSIGNAL | MSG_DONTWAIT);
           if (n < 0)
           {
                swWarn("send() failed.Error: %s[%d]", strerror(errno), errno);
                return SW_ERR;
           }
           else
           {
                writen -= n;
                continue;
           }
       }
    }

源码解释:首先调用swSocket_wait函数(调用poll函数监听socket直到socket满足监听条件)监听到fd可写,随后调用send函数发送数据;循环写入直到所有数据都写入fd。

这里我补充说明一下swoole的Buffer.c,不作详细分析,只说明其功能。Buffer是个链表式结构,每个链表节点是一个trunk,trunk用于存放具体的数据。

下面分析swConnection_buffer_send函数,这个函数是将conn中的输出缓存区中的数据发出,核心源码如下:

swBuffer *buffer= conn->out_buffer;
   swBuffer_trunk *trunk = swBuffer_get_trunk(buffer);
   sendn = trunk->length - trunk->offset;
 
   if (sendn == 0)
    {
       swBuffer_pop_trunk(buffer, trunk);
       return SW_CONTINUE;
    }
   ret = swConnection_send(conn, trunk->store.ptr + trunk->offset,sendn, 0);
   //printf("BufferOut:reactor=%d|sendn=%d|ret=%d|trunk->offset=%d|trunk_len=%d\n",reactor->id, sendn, ret, trunk->offset, trunk->length);
   if (ret < 0)
    {
       switch (swConnection_error(errno))
       {
       case SW_ERROR:
           swWarn("send to fd[%d] failed. Error: %s[%d]", conn->fd,strerror(errno), errno);
           return SW_OK;
       case SW_CLOSE:
           return SW_CLOSE;
       case SW_WAIT:
           return SW_WAIT;
       default:
           return SW_CONTINUE;
       }
    }
   //trunk full send
   else if (ret == sendn || sendn == 0)
    {
       swBuffer_pop_trunk(buffer, trunk);
    }
   else
    {
       trunk->offset += ret;
    }

源码解释:首先从conn中的输出缓存中拿到首部的trunk,并计算该trunk还有多少数据需要发送。如果已经发送完了,弹出该trunk,继续发送下一个trunk。如果没有发送完,调用swConnection_send方法发送剩余数据。随后从trunk中移除已经发送的数据,并再次判定是否发送结束。

swConnection中有个void* object变量,这个指针可以用来指向一些奇奇怪怪的东西,比如一个swString对象,于是就有了两个操作函数swConnection_get_string_buffer和swConnection_clear_string_buffer,这俩函数不贴代码了,一个new一个free而已。

接着分析两个操作buffer的函数。首先是swConnection_get_in_buffer,该函数的作用是创建一个新的trunk加入到in_buffer中,并返回这个trunk的指针,其核心源码如下:

if (conn->in_buffer == NULL)
    {
       buffer = swBuffer_new(SW_BUFFER_SIZE);
       //buffer create failed
       if (buffer == NULL)
       {
           return NULL;
       }
       //new trunk
       trunk = swBuffer_new_trunk(buffer, SW_TRUNK_DATA,buffer->trunk_size);
       if (trunk == NULL)
       {
           sw_free(buffer);
           return NULL;
       }
       conn->in_buffer = buffer;
    }
   else
    {
       buffer = conn->in_buffer;
       trunk = buffer->tail;
       if (trunk == NULL || trunk->length == buffer->trunk_size)
       {
           trunk = swBuffer_new_trunk(buffer, SW_TRUNK_DATA,buffer->trunk_size);
       }
    }

源码解释:如果in_buffe为空,则创建一个swBuffer,并在这个buffer中创建一个新的trunk;否则,直接在in_buffer中创建trunk并添加到buffer末尾。

swConnection_get_out_buffer用于从out_buffer中拿到一个trunk。核心源码如下:

if (conn->out_buffer == NULL)
    {
       conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
       if (conn->out_buffer == NULL)
       {
           return NULL;
       }
    }
   if (type == SW_TRUNK_SENDFILE)
    {
       trunk = swBuffer_new_trunk(conn->out_buffer, SW_TRUNK_SENDFILE, 0);
    }
    else
    {
       trunk = swBuffer_get_trunk(conn->out_buffer);
       if (trunk == NULL)
       {
           trunk = swBuffer_new_trunk(conn->out_buffer, SW_TRUNK_DATA,conn->out_buffer->trunk_size);
       }
    }

源码解释:如果out_buffer为空,尝试创建,如果创建失败,返回NULL。然后判定type类型,如果为SENDFILEl类型,则直接创建type类型的trunk;如果不是,则尝试从buffer中获取首部的trunk,如果该trunk为空,则创建新的trunk。

这里重点分析一下swConnection_sendfile函数,该函数用于在out_buffer中添加一个用于sendfile的trunk。其核心源码如下:

if(conn->out_buffer == NULL)
    {
       conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
       if (conn->out_buffer == NULL)
       {
           return SW_ERR;
       }
    }
 
   swBuffer_trunk *trunk = swBuffer_new_trunk(conn->out_buffer,SW_TRUNK_SENDFILE, 0);
   if (trunk == NULL)
    {
       swWarn("get out_buffer trunk failed.");
       return SW_ERR;
    }
   swTask_sendfile *task = sw_malloc(sizeof(swTask_sendfile));
   if (task == NULL)
    {
       swWarn("malloc for swTask_sendfile failed.");
       //TODO: 回收这里的内存
       return SW_ERR;
    }
   bzero(task, sizeof(swTask_sendfile));
 
   task->filename = strdup(filename);
   int file_fd = open(filename, O_RDONLY);
   if (file_fd < 0)
    {
       swWarn("open file[%s] failed. Error: %s[%d]",task->filename, strerror(errno), errno);
       return SW_ERR;
    }
   struct stat file_stat;
   if (fstat(file_fd, &file_stat) < 0)
    {
       swWarn("swoole_async_readfile: fstat failed. Error: %s[%d]",strerror(errno), errno);
       return SW_ERR;
    }
 
   task->filesize = file_stat.st_size;
   task->fd = file_fd;
   trunk->store.ptr= (void *)task;

源码解释:先判定out_buffer是否存在,如果不存在则创建,随后在out_buffer中创建一个类型为sendfile的trunk。创建一个sendfile的task(swTask_sendfile),指定文件名以及文件的描述符fd,通过fstat函数获取文件的大小。随后将task添加到trunk的存储空间中。

以上就是Connection.c模块的相关分析。这章文字不多,代码不少,各位读者见谅……

(原计划分析ReactorProcess模块,但是发现这里需要和ProcessPool一起分析,因此单独开一章。)

版权声明:本文为博主原创文章,未经博主允许不得转载。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
3年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这