使用C++11 STL线程库实现一个线程池。处理机制是抢占式的,即所有线程从一个队列(std::queue)中获取任务执行(计算字符串简单HASH值),使用std::mutex和std::conditional_variable实现队列访问并发协调。
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <string>
#include <queue>
#include <condition_variable>
#include <algorithm>
#include <sstream>
using namespace std;
static std::mutex G_lockPrint;
void print_message(int value, const string& str) {
lock_guard<mutex> lock(G_lockPrint);
cout<<setw(8)<<right<<this_thread::get_id();
cout<<setw(12)<<right<<value<<" "<<str<<endl;
}
#define THREAD_COUNT 10
int main()
{
thread thpool[THREAD_COUNT];
mutex quelock;
condition_variable quecv;
queue<string> strqueue;
volatile bool stop = false;
for(int i = 0; i < THREAD_COUNT; ++i ) {
thpool[i] = thread([&quelock, &quecv, &strqueue, &stop]()
{
string str;
while ( !stop ) {
{
unique_lock<mutex> lock(quelock);
if ( strqueue.empty() ) {
auto ret = quecv.wait_for(lock, chrono::seconds(1));
if ( ret == cv_status::timeout) continue;
}
if ( !strqueue.empty() ) {
str = strqueue.front();
strqueue.pop();
} else {
continue;
}
}
int hash = 0;
for(size_t i = 0; i < str.length(); ++i) {
hash = (hash << 5) - i + str[i];
}
print_message(hash, str);
} // end while
}
);
}
for(int i = 0; i < 100000; ++i) {
stringstream ss;
ss<<"aaaaa_"<<i;
lock_guard<mutex> lock(quelock);
strqueue.push(ss.str());
quecv.notify_one();
}
while (1) {
this_thread::sleep_for(chrono::seconds(1));
lock_guard<mutex> lock(quelock);
if ( strqueue.empty()) break;
}
stop = true;
for(int i = 0; i < THREAD_COUNT; ++i ) thpool[i].join();
cout<<"program exit"<<endl;
return 0;
}