面向对象:
Noncopyable.h | Thread.h |
#ifndef \_\_NONCOPYABLE\_H\_\_ #define \_\_NONCOPYABLE\_H\_\_ #include<iostream> using namespace std; namespace meihao { class Noncopyable { public: Noncopyable(){}; ~Noncopyable(){}; private: Noncopyable(const Noncopyable&); Noncopyable& operator=(const Noncopyable&); }; }; #endif | #ifndef \_\_THREAD\_H\_\_ #define \_\_THREAD\_H\_\_ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class Thread:private Noncopyable { public: Thread(); void start(); void join(); virtual void run() = 0; virtual ~Thread(); static void\* threadFunc(void\*); private: pthread\_t \_pthId; bool \_isRunning; }; }; #endif |
MutexLock.h | Condition.h |
#ifndef \_\_MUTEXLOCK\_H\_\_ #define \_\_MUTEXLOCK\_H\_\_ #include<iostream> #include"Noncopyable.h" #include<pthread.h> using namespace std; namespace meihao { class MutexLock:public Noncopyable { public: MutexLock(); ~MutexLock(); void lock(); void unlock(); pthread\_mutex\_t\* getMutexPtr(); private: pthread\_mutex\_t \_mutex; }; }; #endif | #ifndef \_\_CONDITION\_H\_\_ #define \_\_CONDITION\_H\_\_ #include<iostream> #include"MutexLock.h" #include"Noncopyable.h" using namespace std; namespace meihao { class Condition:private Noncopyable { public: Condition(MutexLock&); ~Condition(); void wait(); void notify(); void notifyall(); private: pthread\_cond\_t \_cond; MutexLock& \_mutex; }; }; #endif |
Buffer.h | Task.h |
#ifndef \_\_BUFFER\_H\_\_ #define \_\_BUFFER\_H\_\_ #include<iostream> #include"MutexLock.h" #include"Condition.h" #include"Task.h" #include<queue> using namespace std; namespace meihao { typedef Task\* DataType; class Buffer:private Noncopyable { public: Buffer(int); void push(DataType); DataType pop(); void wakeupEmpty(); // 唤醒所有等待队列不为空的线程 bool full(); bool empty(); private: MutexLock \_mutex; Condition \_notFull; Condition \_notEmpty; int \_queSize; queue<DataType> \_que; bool \_flag; }; }; #endif | #ifndef \_\_TASK\_H\_\_ #define \_\_TASK\_H\_\_ #include<iostream> using namespace std; namespace meihao { class Task { public: virtual void process() = 0; // 线程池里面的线程具体要做的任务 }; }; #endif |
Threadpool.h | ThreadpoolThread.h |
#ifndef \_\_THREADPOOL\_H\_\_ #define \_\_THREADPOOL\_H\_\_ #include<iostream> #include"Noncopyable.h" #include"Task.h" #include"Buffer.h" #include<vector> #include"Thread.h" using namespace std; namespace meihao { class Threadpool:private Noncopyable { public: Threadpool(int,int); void start(); void stop(); void addTask(Task\*); Task\* getTask(); void threadFunc(); private: int \_threadNum; vector<Thread\*> \_threadsVec; // 存放线程池里面的所有线程 int \_bufSize; Buffer \_buf; // 缓冲区用来存放任务,到时候线程池从里面取走一个任务执行 bool \_isExit; }; }; #endif | #ifndef \_\_THREADPOOLTHREAD\_H\_\_ #define \_\_THREADPOOLTHREAD\_H\_\_ #include<iostream> #include"Thread.h" #include"Threadpool.h" using namespace std; namespace meihao { class Threadpoolthread:public Thread { public: Threadpoolthread(Threadpool& threadpool):\_threadpool(threadpool){} void run() { \_threadpool.threadFunc(); // 继承的线程池线程内部实现的线程执行体方法 // 调用线程池的threadFunc方法,线程池的所有线程都去执行这个方法 } private: Threadpool& \_threadpool; }; }; #endif |
Thread.cpp | MutexLock.cpp |
#include"Thread.h" #include<iostream> using namespace std; namespace meihao { Thread::Thread():\_pthId(0),\_isRunning(false) { } void Thread::start() { pthread\_create(&\_pthId,NULL,&Thread::threadFunc,this); \_isRunning = true; } void Thread::join() { if(\_isRunning) pthread\_join(\_pthId,NULL); } Thread::~Thread() { if(\_isRunning) pthread\_detach(\_pthId); } void\* Thread::threadFunc(void\* arg) { Thread\* pthread = static\_cast<Thread\*> (arg); if(NULL!=pthread) pthread->run(); } }; | #include<iostream> #include"MutexLock.h" using namespace std; namespace meihao { MutexLock::MutexLock() { pthread\_mutex\_init(&\_mutex,NULL); } MutexLock::~MutexLock() { pthread\_mutex\_destroy(&\_mutex); } void MutexLock::lock() { pthread\_mutex\_lock(&\_mutex); } void MutexLock::unlock() { pthread\_mutex\_unlock(&\_mutex); } pthread\_mutex\_t\* MutexLock::getMutexPtr() { return &\_mutex; } }; |
Condition.cpp | Buffer.cpp |
#include"Condition.h" #include<iostream> using namespace std; namespace meihao { Condition::Condition(MutexLock& mutex):\_mutex(mutex) { pthread\_cond\_init(&\_cond,NULL); } Condition::~Condition() { pthread\_cond\_destroy(&\_cond); } void Condition::wait() { pthread\_cond\_wait(&\_cond,\_mutex.getMutexPtr()); } void Condition::notify() { pthread\_cond\_signal(&\_cond); } void Condition::notifyall() { pthread\_cond\_broadcast(&\_cond); } }; | #include<iostream> #include"Buffer.h" using namespace std; namespace meihao { Buffer::Buffer(int size):\_queSize(size) ,\_mutex() ,\_notFull(\_mutex) ,\_notEmpty(\_mutex) ,\_flag(true) // 开始,标志位true,表示buffer正常运行 // 后面线程池要结束的时候会修改为false { } bool Buffer::full() { return \_queSize == \_que.size(); } bool Buffer::empty() { return \_que.size() == 0; } void Buffer::push(DataType elem) { \_mutex.lock(); while(full()) { \_notFull.wait(); // 队列满,等待条件变量notFull满足 // 放到while里面是为了防止异常唤醒,唤醒的时候再次确认是否真的条件满足 } \_que.push(elem); \_notEmpty.notify(); \_mutex.unlock(); } DataType Buffer::pop() { \_mutex.lock(); while(\_flag&&empty()) // flag如果为false表示线程池等待回收线程stop线程池 { \_notEmpty.wait(); } if(\_flag) { DataType tmp = \_que.front(); \_que.pop(); \_notFull.notify(); \_mutex.unlock(); return tmp; } else { \_mutex.unlock(); return NULL; } } void Buffer::wakeupEmpty() { \_flag = false; \_notEmpty.notifyall(); // 线程池在要关闭的时候会等待所有线程执行完挂在等待条件变量notEmpty上 //这个时候直接唤醒所有等待的线程 } }; |
test.cpp | Threadpool.cpp |
#include<iostream> #include"Threadpool.h" #include<unistd.h> using namespace std; class MyTask:public meihao::Task { public: void process() { ::srand(time(NULL)); int num = ::rand()%100; cout<<"produce a num:"<<num<<endl; } }; int main() { meihao::Threadpool threadpool(4,10); // 定义一个线程池,里面有4个线程,去处理10个任务 threadpool.start(); meihao::Task\* ptask = new MyTask(); // 定义一个具体的任务 int cnt = 10; while(cnt--) { threadpool.addTask(ptask); // 线程池中添加任务 sleep(1); } threadpool.stop(); return 0; } | #include"Threadpool.h" #include<iostream> #include"Threadpoolthread.h" #include<unistd.h> using namespace std; namespace meihao { Threadpool::Threadpool(int threadNum,int bufSize):\_threadNum(threadNum) ,\_bufSize(bufSize) ,\_buf(\_bufSize) ,\_isExit(false) { \_threadsVec.reserve(\_threadNum); } void Threadpool::start() { for(int i=0;i<\_threadNum;++i) { Thread\* pthread = new Threadpoolthread(\*this); // 创建指定个数个线程 \_threadsVec.push\_back(pthread); } for(auto& elem:\_threadsVec) { elem->start(); // 创造的线程开始启动 } } void Threadpool::stop() { if(!\_isExit) { while(!\_buf.empty()) // 如果缓冲池不空,也就是还有线程在执行 { sleep(1); // 等待所有线程执行完 } // 线程全部执行完将会都挂在\_buf里面的\_notEmpty的条件变量上 \_buf.wakeupEmpty(); // 唤醒所有等待\_notEmpty的线程 // 这个函数设置flag = false; 所有等待在Buffer里面的pop函数 // 里的线程将全部返回NULL \_isExit = true; for(auto& elem:\_threadsVec) { elem->join(); // 回收所有线程资源 delete elem; } } } void Threadpool::addTask(Task\* task) { \_buf.push(task); } Task\* Threadpool::getTask() { return \_buf.pop(); } void Threadpool::threadFunc() { while(!\_isExit) { Task\* task = getTask(); if(task) { task->process(); } } } }; |