C++面向对象实现封装线程池

Wesley13
• 阅读 972
面向对象:
Noncopyable.h Thread.h
#ifndef \_\_NONCOPYABLE\_H\_\_
#define \_\_NONCOPYABLE\_H\_\_
#include<iostream>
using namespace std;
namespace meihao
{
        class Noncopyable
        {
                public:
                        Noncopyable(){};
                        ~Noncopyable(){};
                private:
                        Noncopyable(const Noncopyable&);
                        Noncopyable& operator=(const Noncopyable&);
        };
};
#endif 
#ifndef \_\_THREAD\_H\_\_
#define \_\_THREAD\_H\_\_
#include<iostream>
#include"Noncopyable.h"
#include<pthread.h>
using namespace std;
namespace meihao
{
        class Thread:private Noncopyable
        {
                public:
                        Thread();
                        void start();
                        void join();
                        virtual void run() = 0;
                        virtual ~Thread();
                        static void\* threadFunc(void\*);
                private:
                        pthread\_t \_pthId;
                        bool \_isRunning;
        };
};
#endif
MutexLock.h Condition.h
#ifndef \_\_MUTEXLOCK\_H\_\_
#define \_\_MUTEXLOCK\_H\_\_
#include<iostream>
#include"Noncopyable.h"
#include<pthread.h>
using namespace std;
namespace meihao
{
        class MutexLock:public Noncopyable
        {
                public:
                        MutexLock();
                        ~MutexLock();
                        void lock();
                        void unlock();
                        pthread\_mutex\_t\* getMutexPtr();
                private:
                        pthread\_mutex\_t \_mutex;
        };
};
#endif
#ifndef \_\_CONDITION\_H\_\_
#define \_\_CONDITION\_H\_\_
#include<iostream>
#include"MutexLock.h"
#include"Noncopyable.h"
using namespace std;
namespace meihao
{
        class Condition:private Noncopyable
        {
                public:
                        Condition(MutexLock&);
                        ~Condition();
                        void wait();
                        void notify();
                        void notifyall();
                private:
                        pthread\_cond\_t \_cond;
                        MutexLock& \_mutex;
        };
};
#endif
Buffer.h Task.h
#ifndef \_\_BUFFER\_H\_\_
#define \_\_BUFFER\_H\_\_
#include<iostream>
#include"MutexLock.h"
#include"Condition.h"
#include"Task.h"
#include<queue>
using namespace std;
namespace meihao
{
        typedef Task\* DataType;
        class Buffer:private Noncopyable
        {
                public:
                        Buffer(int);
                        void push(DataType);
                        DataType pop();
                        void wakeupEmpty();  // 唤醒所有等待队列不为空的线程
                        bool full();
                        bool empty();
                private:
                        MutexLock \_mutex;
                        Condition \_notFull;
                        Condition \_notEmpty;
                        int \_queSize;
                        queue<DataType> \_que;
                        bool \_flag;
        };
};
#endif
#ifndef \_\_TASK\_H\_\_
#define \_\_TASK\_H\_\_
#include<iostream>
using namespace std;
namespace meihao
{
        class Task
        {
                public:
                        virtual void process() = 0;  // 线程池里面的线程具体要做的任务
        };
};
#endif
Threadpool.h ThreadpoolThread.h
#ifndef \_\_THREADPOOL\_H\_\_
#define \_\_THREADPOOL\_H\_\_
#include<iostream>
#include"Noncopyable.h"
#include"Task.h"
#include"Buffer.h"
#include<vector>
#include"Thread.h"
using namespace std;
namespace meihao
{
        class Threadpool:private Noncopyable
        {
                public:
                        Threadpool(int,int);
                        void start();
                        void stop();
                        void addTask(Task\*);
                        Task\* getTask();
                        void threadFunc();
                private:
                        int \_threadNum;
                        vector<Thread\*> \_threadsVec;  // 存放线程池里面的所有线程
                        int \_bufSize;
                        Buffer \_buf;  // 缓冲区用来存放任务,到时候线程池从里面取走一个任务执行
                        bool \_isExit;
        };
};
#endif
#ifndef \_\_THREADPOOLTHREAD\_H\_\_
#define \_\_THREADPOOLTHREAD\_H\_\_
#include<iostream>
#include"Thread.h"
#include"Threadpool.h"
using namespace std;
namespace meihao
{
        class Threadpoolthread:public Thread
        {
                public:
                Threadpoolthread(Threadpool& threadpool):\_threadpool(threadpool){}
                void run()
                {
                        \_threadpool.threadFunc();  // 继承的线程池线程内部实现的线程执行体方法
                        // 调用线程池的threadFunc方法,线程池的所有线程都去执行这个方法
                }
                private:
                        Threadpool& \_threadpool;
        };
};
#endif
Thread.cpp MutexLock.cpp
#include"Thread.h"
#include<iostream>
using namespace std;
namespace meihao
{
        Thread::Thread():\_pthId(0),\_isRunning(false)
        {
        }
        void Thread::start()
        {
                pthread\_create(&\_pthId,NULL,&Thread::threadFunc,this);
                \_isRunning = true;
        }
        void Thread::join()
        {
                if(\_isRunning)
                        pthread\_join(\_pthId,NULL);
        }
        Thread::~Thread()
        {
                if(\_isRunning)
                        pthread\_detach(\_pthId);
        }
        void\* Thread::threadFunc(void\* arg)
        {
                Thread\* pthread = static\_cast<Thread\*> (arg);
                if(NULL!=pthread)
                        pthread->run();
        }
};
#include<iostream>
#include"MutexLock.h"
using namespace std;
namespace meihao
{
        MutexLock::MutexLock()
        {
                pthread\_mutex\_init(&\_mutex,NULL);
        }
        MutexLock::~MutexLock()
        {
                pthread\_mutex\_destroy(&\_mutex);
        }
        void MutexLock::lock()
        {
                pthread\_mutex\_lock(&\_mutex);
        }
        void MutexLock::unlock()
        {
                pthread\_mutex\_unlock(&\_mutex);
        }
        pthread\_mutex\_t\* MutexLock::getMutexPtr()
        {
                return &\_mutex;
        }
};
Condition.cpp Buffer.cpp
#include"Condition.h"
#include<iostream>
using namespace std;
namespace meihao
{
        Condition::Condition(MutexLock& mutex):\_mutex(mutex)
        {
                pthread\_cond\_init(&\_cond,NULL);
        }
        Condition::~Condition()
        {
                pthread\_cond\_destroy(&\_cond);
        }
        void Condition::wait()
        {
                pthread\_cond\_wait(&\_cond,\_mutex.getMutexPtr());
        }
        void Condition::notify()
        {
                pthread\_cond\_signal(&\_cond);
        }
        void Condition::notifyall()
        {
                pthread\_cond\_broadcast(&\_cond);
        }
};
#include<iostream>
#include"Buffer.h"
using namespace std;
namespace meihao
{
        Buffer::Buffer(int size):\_queSize(size)
                                                         ,\_mutex()
                                                         ,\_notFull(\_mutex)
                                                         ,\_notEmpty(\_mutex)
                                                         ,\_flag(true)   // 开始,标志位true,表示buffer正常运行
                                                                                        // 后面线程池要结束的时候会修改为false
        {
        }
        bool Buffer::full()
        {
                return \_queSize == \_que.size();
        }
        bool Buffer::empty()
        {
                return \_que.size() == 0;
        }
        void Buffer::push(DataType elem)
        {
                \_mutex.lock();
                while(full())
                {
                        \_notFull.wait();  // 队列满,等待条件变量notFull满足
                        // 放到while里面是为了防止异常唤醒,唤醒的时候再次确认是否真的条件满足
                }
                \_que.push(elem);
                \_notEmpty.notify();
                \_mutex.unlock();
        }
        DataType Buffer::pop()
        {
                \_mutex.lock();
                while(\_flag&&empty())  // flag如果为false表示线程池等待回收线程stop线程池
                {
                        \_notEmpty.wait();
                }
                if(\_flag)
                {
                        DataType tmp = \_que.front();
                        \_que.pop();
                        \_notFull.notify();
                        \_mutex.unlock();
                        return tmp;
                }
                else
                {
                        \_mutex.unlock();
                        return NULL;
                }
        }
        void Buffer::wakeupEmpty()
        {
                \_flag = false;
                \_notEmpty.notifyall();  // 线程池在要关闭的时候会等待所有线程执行完挂在等待条件变量notEmpty上
                //这个时候直接唤醒所有等待的线程
        }
};
test.cpp Threadpool.cpp
#include<iostream>
#include"Threadpool.h"
#include<unistd.h>
using namespace std;
class MyTask:public meihao::Task
{
        public:
                void process()
                {
                        ::srand(time(NULL));
                        int num = ::rand()%100;
                        cout<<"produce a num:"<<num<<endl;
                }
};
int main()
{
        meihao::Threadpool threadpool(4,10);  // 定义一个线程池,里面有4个线程,去处理10个任务
        threadpool.start();
        meihao::Task\* ptask = new MyTask();  // 定义一个具体的任务
        int cnt = 10;
        while(cnt--)
        {
                threadpool.addTask(ptask);  // 线程池中添加任务
                sleep(1);
        }
        threadpool.stop();
        return 0;
}
C++面向对象实现封装线程池
#include"Threadpool.h"
#include<iostream>
#include"Threadpoolthread.h"
#include<unistd.h>
using namespace std;
namespace meihao
{
        Threadpool::Threadpool(int threadNum,int bufSize):\_threadNum(threadNum)
                              ,\_bufSize(bufSize)                                                                    ,\_buf(\_bufSize)                                                                        ,\_isExit(false)
        {
                \_threadsVec.reserve(\_threadNum);
        }
        void Threadpool::start()
        {
                for(int i=0;i<\_threadNum;++i)
                {
                        Thread\* pthread = new Threadpoolthread(\*this);  // 创建指定个数个线程
                        \_threadsVec.push\_back(pthread);
                }
                for(auto& elem:\_threadsVec)
                {
                        elem->start();  // 创造的线程开始启动
                }
        }
        void Threadpool::stop()
        {
                if(!\_isExit)
                {
                        while(!\_buf.empty())  // 如果缓冲池不空,也就是还有线程在执行
                        {
                                sleep(1);  // 等待所有线程执行完
                        }
                        // 线程全部执行完将会都挂在\_buf里面的\_notEmpty的条件变量上
                        \_buf.wakeupEmpty();  // 唤醒所有等待\_notEmpty的线程
                        // 这个函数设置flag = false; 所有等待在Buffer里面的pop函数
                        // 里的线程将全部返回NULL
                        \_isExit = true;
                        for(auto& elem:\_threadsVec)
                        {
                                elem->join();  // 回收所有线程资源
                                delete elem;
                        }
                }
        }
        void Threadpool::addTask(Task\* task)
        {
                \_buf.push(task);
        }
        Task\* Threadpool::getTask()
        {
                return \_buf.pop();
        }
        void Threadpool::threadFunc()
        {
                while(!\_isExit)
                {
                        Task\* task = getTask();
                        if(task)
                        {
                                task->process();
                        }
                }
        }
};
C++面向对象实现封装线程池




点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
AVR 嵌入式单片机芯片的中断系统介绍
<htmlxmlns"http://www.w3.org/1999/xhtml"<head<stylebody,table{fontfamily:微软雅黑;fontsize:13.5pt}table{bordercollapse:collapse;border:solidgray;borderwidth:2px
Wesley13 Wesley13
3年前
LINUX打开文件
<htmlxmlns"http://www.w3.org/1999/xhtml"<head<stylebody,table{fontfamily:微软雅黑;fontsize:10pt}table{bordercollapse:collapse;border:solidgray;borderwidth:2px0
Stella981 Stella981
3年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
LINUX中文件描述符传递
<htmlxmlns"http://www.w3.org/1999/xhtml"<head<stylebody,table{fontfamily:微软雅黑;fontsize:10pt}table{bordercollapse:collapse;border:solidgray;borderwidth:2px0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这