C++ 多线程编程总结

Wesley13
• 阅读 560

在开发C++程序时,一般在吞吐量、并发、实时性上有较高的要求。设计C++程序时,总结起来可以从如下几点提高效率:

  • 并发
  • 异步
  • 缓存

下面将我平常工作中遇到一些问题例举一二,其设计思想无非以上三点。

1任务队列

1.1    以生产者-消费者模型设计任务队列

         生产者-消费者模型是人们非常熟悉的模型,比如在某个服务器程序中,当User数据被逻辑模块修改后,就产生一个更新数据库的任务(produce),投递给IO模块任务队列,IO模块从任务队列中取出任务执行sql操作(consume)。

设计通用的任务队列,示例代码如下:

详细实现可参见:

http://ffown.googlecode.com/svn/trunk/fflib/include/detail/task_queue_impl.h

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

voidtask_queue_t::produce(consttask_t& task_) {

        lock_guard_t lock(m_mutex);

        if(m_tasklist->empty()){//! 条件满足唤醒等待线程

            m_cond.signal();

        }

        m_tasklist->push_back(task_);

    }

int  task_queue_t::comsume(task_t& task_){

        lock_guard_t lock(m_mutex);

        while(m_tasklist->empty())//! 当没有作业时,就等待直到条件满足被唤醒{

            if(false== m_flag){

                return-1;

            }

            m_cond.wait();

        }

        task_ = m_tasklist->front();

        m_tasklist->pop_front();

        return0;

}

1.2    任务队列使用技巧

1.2.1 IO 与 逻辑分离

         比如网络游戏服务器程序中,网络模块收到消息包,投递给逻辑层后立即返回,继续接受下一个消息包。逻辑线程在一个没有io操作的环境下运行,以保障实时性。示例:

?

1

2

3

voidhandle_xx_msg(longuid,constxx_msg_t& msg){

    logic_task_queue->post(boost::bind(&servie_t::proces, uid, msg));

}

注意,此模式下为单任务队列,每个任务队列单线程。

1.2.2  并行流水线

         上面的只是完成了io 和 cpu运算的并行,而cpu中逻辑操作是串行的。在某些场合,cpu逻辑运算部分也可实现并行,如游戏中用户A种菜和B种菜两种操作是完全可以并行的,因 为两个操作没有共享数据。最简单的方式是A、B相关的操作被分配到不同的任务队列中。示例如下:

?

1

2

3

4

voidhandle_xx_msg(longuid,constxx_msg_t& msg) {

  logic_task_queue_array[uid %sizeof(logic_task_queue_array)]->post(

    boost::bind(&servie_t::proces, uid, msg));

}

  注意,此模式下为多任务队列,每个任务队列单线程。

1.2.3 连接池与异步回调

         比如逻辑Service模块需要数据库模块异步载入用户数据,并做后续处理计算。而数据库模块拥有一个固定连接数的连接池,当执行SQL的任务到来时,选择一个空闲的连接,执行SQL,并把SQL 通过回调函数传递给逻辑层。其步骤如下:

  • 预先分配好线程池,每个线程创建一个连接到数据库的连接
  • 为数据库模块创建一个任务队列,所有线程都是这个任务队列的消费者
  • 逻辑层想数据库模块投递sql执行任务,同时传递一个回调函数来接受sql执行结果

示例如下:

?

1

2

3

4

5

6

7

8

voiddb_t:load(longuid_, boost::function<void(user_data_t&) func_){

    //! sql execute, construct user_data_t user

    func_(user)

}

voidprocess_user_data_loaded(user_data_t&){

    //! todo something

}

db_task_queue->post(boost::bind(&db_t:load, uid, func));

注意,此模式下为单任务队列,每个任务队列多线程。

2. 日志

         本文主要讲C++多线程编程,日志系统不是为了提高程序效率,但是在程序调试、运行期排错上,日志是无可替代的工具,相信开发后台程序的朋友都会使用日志。常见的日志使用方式有如下几种:

  • 流式,如logstream << “start servie time[%d]” << time(0) << ” app name[%s]” << app_string.c_str() << endl;
  • Printf 格式如:logtrace(LOG_MODULE, “start servie time[%d] app name[%s]“, time(0), app_string.c_str());

二者各有优缺点,流式是线程安全的,printf格式格式化字符串会更直接,但缺点是线程不安全,如果把app_string.c_str() 换成app_string (std::string),编译被通过,但是运行期会crash(如果运气好每次都crash,运气不好偶尔会crash)。我个人钟爱printf风 格,可以做如下改进:

  • 增加线程安全,利用C++模板的traits机制,可以实现线程安全。示例:

?

1

2

3

4

5

template

voidlogtrace(constchar* module,constchar* fmt, ARG1 arg1){

    boost::format s(fmt);

    f % arg1;

}

  这样,除了标准类型+std::string 传入其他类型将编译不能通过。这里只列举了一个参数的例子,可以重载该版本支持更多参数,如果你愿意,可以支持9个参数或更多。

  • 为日志增加颜色,在printf中加入控制字符,可以再屏幕终端上显示颜色,Linux下示例:printf(“33[32;49;1m [DONE] 33[39;49;0m")

更多颜色方案参见:

http://hi.baidu.com/jiemnij/blog/item/d95df8c28ac2815cb219a80e.html

  • 每个线程启动时,都应该用日志打印该线程负责什么功能。这样,程序跑起来的时候通过top –H – p pid 可以得知那个功能使用cpu的多少。实际上,我的每行日志都会打印线程id,此线程id非pthread_id,而其实是线程对应的系统分配的进程id号。

3. 性能监控

         尽管已经有很多工具可以分析c++程序运行性能,但是其大部分还是运行在程序debug阶段。我们需要一种手段在debug和release阶段都能监控程序,一方面得知程序瓶颈之所在,一方面尽早发现哪些组件在运行期出现了异常。

         通常都是使用gettimeofday 来计算某个函数开销,可以精确到微妙。可以利用C++的确定性析构,非常方便的实现获取函数开销的小工具,示例如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

structprofiler{

    profiler(constchar* func_name){

        gettimeofday(&tv, NULL);

    }

    ~profiler(){

        structtimeval tv2;

        gettimeofday(&tv2, NULL);

        longcost = (tv.tv_sec - tv.tv_sec) * 1000000 + (tv.tv_usec - tv.tv_usec);

        //! post to some manager

    }

    structtimeval tv;

};

#define PROFILER() profiler(__FUNCTION__)

Cost 应该被投递到性能统计管理器中,该管理器定时讲性能统计数据输出到文件中。

4 Lambda 编程

使用foreach 代替迭代器

         很多编程语言已经内建了foreach,但是c++还没有。所以建议自己在需要遍历容器的地方编写foreach函数。习惯函数式编程的人应该会非常钟情使用foreach,使用foreach的好处多多少少有些,如:

http://www.cnblogs.com/chsword/archive/2007/09/28/910011.html

         但主要是编程哲学上层面的。

示例:

?

1

2

3

4

5

voiduser_mgr_t::foreach(boost::function<void(user_t&)> func_){

    for(iterator it = m_users.begin(); it != m_users.end() ++it){

        func_(it->second);

    }

}

比如要实现dump 接口,不需要重写关于迭代器的代码

?

1

2

3

4

5

6

7

8

voiduser_mgr_t:dump(){

    structlambda {

        staticvoidprint(user_t& user){

            //! print(tostring(user);

        }

    };

    this->foreach(lambda::print);

}

实际上,上面的代码变通的生成了匿名函数,如果是c++ 11 标准的编译器,本可以写的更简洁一些:

?

1

this->foreach([](user_t& user) {} );

但是我大部分时间编写的程序都要运行在centos 上,你知道吗它的gcc版本是gcc 4.1.2, 所以大部分时间我都是用变通的方式使用lambda函数。

Lambda 函数结合任务队列实现异步

         常见的使用任务队列实现异步的代码如下:

?

1

2

3

4

5

6

7

voidservice_t:async_update_user(longuid){

    task_queue->post(boost::bind(&service_t:sync_update_user_impl,this, uid));

}

voidservice_t:sync_update_user_impl(longuid){

    user_t& user = get_user(uid);

    user.update()

}

这样做的缺点是,一个接口要响应的写两遍函数,如果一个函数的参数变了,那么另一个参数也要跟着改动。并且代码也不是很美观。使用lambda可以让异步看起来更直观,仿佛就是在接口函数中立刻完成一样。示例代码:

?

1

2

3

4

5

6

7

8

9

voidservice_t:async_update_user(longuid){

    structlambda {

        staticvoidupdate_user_impl(service_t* servie,longuid){

            user_t& user = servie->get_user(uid);

            user.update();

        }

    };

    task_queue->post(boost::bind(&lambda:update_user_impl,this, uid));

}

这样当要改动该接口时,直接在该接口内修改代码,非常直观。

5. 奇技淫巧

利用shared_ptr 实现map/reduce

Map/reduce的语义是先将任务划分为多个任务,投递到多个worker中并发执行,其产生的结果经reduce汇总后生成最终的结果。 Shared_ptr的语义是什么呢?当最后一个shared_ptr析构时,将会调用托管对象的析构函数。语义和map/reduce过程非常相近。我 们只需自己实现讲请求划分多个任务即可。示例过程如下:

  • 定义请求托管对象,加入我们需要在10个文件中搜索“oh nice”字符串出现的次数,定义托管结构体如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

structreducer{

    voidset_result(intindex,longresult) {

        m_result[index] = result;

    }

    ~reducer(){

        longtotal = 0;

        for(inti = 0; i <sizeof(m_result); ++i){

            total += m_result[i];

        }

        //! post total to somewhere

    }

    longm_result[10];

};

  • 定义执行任务的 worker

?

1

2

3

voidworker_t:exe(intindex_, shared_ptr ret) {

  ret->set_result(index, 100);

}

  • 将任务分割后,投递给不同的worker

?

1

2

3

4

5

shared_ptr ret(newreducer());

for(inti = 0; i < 10; ++i)

{

    task_queue[i]->post(boost::bind(&worker_t:exe, i, ret));

}

转载自 MySQLOPS  原文链接

点赞
收藏
评论区
推荐文章
捉虫大师 捉虫大师
3年前
一种极致性能的缓冲队列
本文已收录https://github.com/lkxiaolou/lkxiaolou欢迎star。背景在多线程下的生产者消费者模型中,需求满足如下情况:对生产者生产投递数据的性能要求非常高多个生产者,单个(多个也可以,本文只介绍单个的情况)消费者当消费者跟不上生产者速度时,可容忍少部分数据丢失生产者是单条单条地生产数据举个日志采集的例子,日志在不同的
Stella981 Stella981
3年前
Celery分布式任务队列的认识和基本操作
一、简单认识  Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。它的特点有:简单:熟悉了它的流程后,配置使用简单;高可用
Stella981 Stella981
3年前
MQ对比之RabbitMQ & Redis
消息队列选择:RabbitMQ&RedisRabbitMQRabbitMQ是一个由erlang开发的AMQP(AdvancedMessageQueue)的开源实现的产品,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message
Stella981 Stella981
3年前
Linux系统 Centos7 环境基于Docker部署Rocketmq服务
消息队列基本概述MQ,MessageQueue,基于TCP协议构建的简单协议,区别于具体的通信协议。基于通信协议定义和抽象的更高层次的通信模型,一般都是生产者和消费者模型,又或者说服务端和客户端模型。生产者/消费者模型:一般通过定义生产者和消费者实现消息通信从而屏
Stella981 Stella981
3年前
Gevent简明教程
1、前述进程线程协程异步并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。多进程编程在python中有类似C的os.fork,更高层封装的有multiprocessing标准库多线程编程python中有Thread和threading异步编程在linux下主要有三种实现selec
Wesley13 Wesley13
3年前
3.rabbitmq
rabbitmq发布订阅模式模型组成一个消费者Producer,一个交换机Exchange,多个消息队列Queue,多个消费者Consumer一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送
Wesley13 Wesley13
3年前
ActiveMQ简述,使用
官网地址:http://activemq.apache.org/参考文章:http://my.oschina.net/nk2011/blog/366395JMS支持两种消息发送和接收模型。一种称为P2P(PonittoPoint)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的
Wesley13 Wesley13
3年前
Java并发系列9
今天要讲的BlockingQueue可谓是大名鼎鼎,在并发编程中比较常见的一个类。BlockingQueue顾名思义是表示一个阻塞队列,注意这两个词:阻塞和队列。可以拿我们熟悉的生产者消费者队列来举例,一条流水线上,A生产零件,B组装零件,A就是生产者,B是消费者。如果A生成的太快,则零件堆积,A需要休息一会儿等待B把零件消费完;如果A生产的太
Easter79 Easter79
3年前
SpringMVC中配置RabbitMQ
        RabbitMQ是工作在amqp协议(advancedmessagequeueprotocal,高级消息队列协议)上的一个消息中间件。它通过一个生产者消费者模型来处理应用中产生的消息。        除了生产者和消费者,此模型中另外一个重要的概念叫“工作队列”,也称为“任务队列”(TaskQueue),任务队列背后的核心想法是避免
Wesley13 Wesley13
3年前
Java并发(五)任务间使用管道进行通信
通过I/O在线程间进行通信通常很有用。提供线程功能的类库以“管道”的形式对线程间的I/O提供了支持。它们在JavaI/O类库中的对应物就是PipedWriter(允许任务向管道写)和PipedReader(允许不同的任务从同一个管道中读取)。这个模型可以看做是“生产者消费者”问题的变体,这里的管道就是一个封装好的解决方案。管道基本上是一个阻塞队列,