C, Cpp, Cpp11 三种方式实现线程池(源码)

nitesy
• 阅读 365

1. 线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便 但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用: 执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列 在创建线程后自动启动这些任务。线程池线程都是后台线程。 每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。 如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。 如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。 超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:

  • 线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

    1. 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
      • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
      • 已处理的任务会被从任务队列中删除
      • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
    2. 工作的线程(任务队列任务的消费者) ,N个
      • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
      • 工作的线程相当于是任务队列的消费者角色,
      • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
      • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
    3. 管理者线程(不处理任务队列中的任务),1个
      • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
        • 当任务过多的时候, 可以适当的创建一些新的工作线程
        • 当任务过少的时候, 可以适当的销毁一些工作的线程

C, Cpp, Cpp11 三种方式实现线程池(源码)


2. C实现线程池

2.1 头文件

#pragma once
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<unistd.h>
#define NUMBER 2

typedef struct Task
{
    void (*function)(void* args);
    void* args;
}Task;

typedef struct ThreadPool
{
    //任务队列
    Task* task_queue;
    int queue_capacity;            //容量
    int queue_size;                //当前任务个数
    int queue_front;            //队头->消费用
    int queue_tail;                //队尾->生产用


    pthread_t managerID;                        //管理者线程ID(1个)
    pthread_t* threadID;                        //工作的线程ID(多个,用指针)
    int min_num;                                //最小的线程数量
    int max_num;                                //最大的线程数量
    int busy_num;                                //在忙的线程数量
    int live_num;                                //存活的线程数量
    int exit_num;                                //将要销毁的线程数量
    pthread_mutex_t mutex_pool;                    //线程池的锁
    pthread_mutex_t mutex_busy_num;                //busy_num的锁
    pthread_cond_t producer_wait_consumer;        //生产者等待消费者
    pthread_cond_t consumer_wait_producer;        //消费者等待生产者

    int shutdown;                                //是否销毁线程池,销毁为1,否则为0
}ThreadPool;


//创建线程池并且初始化
ThreadPool* Thread_Pool_Create(int min, int max, int queuesize);

//销毁线程池
int Thread_Pool_Destroy(ThreadPool* pool);

//给线程池添加任务(相当于生产者)
void Thread_Pool_Add(ThreadPool* pool, void(*function)(void*), void* args);

//获取线程池中工作的线程个数
int Get_Busy_Num(ThreadPool* pool);

//获取线程池中存活的个数
int Get_Live_Num(ThreadPool* pool);

//以下为被封装的函数
//工作的线程(消费者线程)任务函数
void* Worker(void* args);

//管理者线程任务函数
void* Manager(void* args);

//单个线程的退出
void Thread_Exit(ThreadPool* pool);

2.2 源文件的定义

#include"threadpool.h"

ThreadPool* Thread_Pool_Create(int min, int max, int queuesize)
{
    //开线程池
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do
    {
        if (pool == NULL)
        {
            printf("malloc threadpool fail\n");
            break;
        }

        // 开线程ID
        pool->threadID = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if (pool->threadID == NULL)
        {
            printf("malloc threadID fail\n");
            break;
        }
        //初始线程ID都初始化为0
        memset(pool->threadID, 0, sizeof(pthread_t) * max);
        pool->max_num = max;
        pool->min_num = min;
        pool->busy_num = 0;
        pool->live_num = min;
        pool->exit_num = 0;

        //初始化锁和条件变量
        if (pthread_mutex_init(&pool->mutex_pool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutex_busy_num, NULL) != 0 ||
            pthread_cond_init(&pool->producer_wait_consumer, NULL) != 0 ||
            pthread_cond_init(&pool->consumer_wait_producer, NULL) != 0)
        {
            printf("mutex or cond init fail\n");
            break;
        }

        //初始化任务队列
        pool->task_queue = (Task*)malloc(sizeof(Task) * queuesize);
        if (pool->task_queue == NULL)
        {
            printf("malloc task_queue fail...\n");
            break;
        }

        pool->queue_capacity = queuesize;
        pool->queue_size = 0;
        pool->queue_front = 0;
        pool->queue_tail = 0;

        pool->shutdown = 0;

        //创建线程
        pthread_create(&pool->managerID, NULL, Manager, pool);

        for (int i = 0; i < min; ++i)
        {
            pthread_create(&pool->threadID[i], NULL, Worker, pool);
        }
        return pool; //到这里说明成功,返回创建出来的线程池
    } while (0);

    //跳到这里说明失败了,释放资源
    if (pool && pool->threadID)
        free(pool->threadID);
    if (pool)
        free(pool);

    return NULL;
}



int Thread_Pool_Destroy(ThreadPool* pool)
{
    //如果线程池本就不存在
    if (pool == NULL)
        return -1;

    //关闭线程池
    pool->shutdown = 1;

    //阻塞并回收管理者线程
    pthread_join(pool->managerID, NULL);

    //唤醒阻塞的消费者进程
    for (int i = 0; i < pool->live_num; ++i)
        pthread_cond_signal(&pool->consumer_wait_producer);

    //释放堆内存
    if (pool->task_queue)
        free(pool->task_queue);
    if (pool->threadID)
        free(pool->threadID);

    //销毁锁和条件变量
    pthread_mutex_destroy(&pool->mutex_pool);
    pthread_mutex_destroy(&pool->mutex_busy_num);
    pthread_cond_destroy(&pool->consumer_wait_producer);
    pthread_cond_destroy(&pool->producer_wait_consumer);

    //销毁线程池
    free(pool);
    pool = NULL;

    return 0;
}



void Thread_Pool_Add(ThreadPool* pool, void(*function)(void*), void* args)
{
    pthread_mutex_lock(&pool->mutex_pool);
    //如果任务队列满了
    while (pool->queue_size == pool->queue_capacity && !pool->shutdown)
    {
        //生产者阻塞,等待消费者进程
        pthread_cond_wait(&pool->producer_wait_consumer, &pool->mutex_pool);
    }

    //如果线程池要关闭
    if (pool->shutdown)
    {
        pthread_mutex_unlock(&pool->mutex_pool);
        return;
    }

    //添加任务
    //队尾添加任务,队尾向后移动,队列任务数加一
    pool->task_queue[pool->queue_tail].function = function;
    pool->task_queue[pool->queue_tail].args = args;
    pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
    pool->queue_size++;

    //提醒消费者消费
    pthread_cond_signal(&pool->consumer_wait_producer);
    pthread_mutex_unlock(&pool->mutex_pool);
}



int Get_Busy_Num(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutex_busy_num);
    int busynum = pool->busy_num;
    pthread_mutex_unlock(&pool->mutex_busy_num);
    return busynum;
}



int Get_Live_Num(ThreadPool* pool)
{
    pthread_mutex_lock(&pool->mutex_pool);
    int livenum = pool->live_num;
    pthread_mutex_unlock(&pool->mutex_pool);
    return livenum;
}



void* Worker(void* args)
{
    ThreadPool* pool = (ThreadPool*)args;

    while (1)
    {
        pthread_mutex_lock(&pool->mutex_pool);

        //如果当前任务队列为空
        while (pool->queue_size == 0 && !pool->shutdown)
        {
            //阻塞消费者线程
            pthread_cond_wait(&pool->consumer_wait_producer,&pool->mutex_pool);

            //如果要销毁线程
            if (pool->exit_num > 0)
            {
                pool->exit_num--;
                //同时如果存活线程大于最小值
                if (pool->live_num > pool->min_num)
                {
                    pool->live_num--;
                    pthread_mutex_unlock(&pool->mutex_pool);
                    Thread_Exit(pool);
                }
            }
        }


        //如果线程池要关闭
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutex_pool);
            Thread_Exit(pool);
        }


        //从任务队列取出任务
        Task task;
        task.function = pool->task_queue[pool->queue_front].function;
        task.args = pool->task_queue[pool->queue_front].args;

        //队头向后移动以下次取任务
        pool->queue_front = (pool->queue_front + 1) % pool->queue_capacity;
        pool->queue_size--;

        //解锁提醒生产者
        pthread_cond_signal(&pool->producer_wait_consumer);
        pthread_mutex_unlock(&pool->mutex_pool);

        //打印,busynum++
        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutex_busy_num);
        pool->busy_num++;
        pthread_mutex_unlock(&pool->mutex_busy_num);

        //执行函数,完成任务,销毁任务队列中这个数据(函数)
        task.function(task.args);
        free(task.args);
        task.args = NULL;

        //打印,busynum--
        printf("thread %ld end working\n", pthread_self());
        pthread_mutex_lock(&pool->mutex_busy_num);
        pool->busy_num--;
        pthread_mutex_unlock(&pool->mutex_busy_num);

        if (pool->shutdown)
            Thread_Exit(pool);
    }
    return NULL;
}



void* Manager(void* args)
{
    ThreadPool* pool = (ThreadPool*)args;
    while (!pool->shutdown)
    {
        //每隔三秒检查一次
        sleep(3);

        //取出线程池中任务的数量和当前的线程数量
        pthread_mutex_lock(&pool->mutex_pool);
        int queue_size = pool->queue_size;
        int live_num = pool->live_num;
        pthread_mutex_unlock(&pool->mutex_pool);

        //取出忙的线程数量
        pthread_mutex_lock(&pool->mutex_busy_num);
        int busy_num = pool->busy_num;
        pthread_mutex_unlock(&pool->mutex_busy_num);

        //添加线程
        //任务个数>存活线程个数 && 存活线程个数<最大线程数(消费者太少了,任务队列消化太慢
        if (queue_size > live_num && live_num < pool->max_num)
        {
            pthread_mutex_lock(&pool->mutex_pool);
            int counter = 0;
            for (int i = 0; i < pool->max_num &&
                             counter < NUMBER && 
                            pool->live_num < pool->max_num; ++i)
            {
                if (pool->threadID[i] == 0)
                {
                    pthread_create(&pool->threadID[i], NULL, Worker, pool);
                    counter++;
                    pool->live_num++;
                }
            }
            pthread_mutex_unlock(&pool->mutex_pool);
        }

        //销毁线程
        //忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busy_num * 2 < live_num && live_num > pool->min_num)
        {
            pthread_mutex_lock(&pool->mutex_pool);
            pool->exit_num = NUMBER;
            pthread_mutex_unlock(&pool->mutex_pool);

            //让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->consumer_wait_producer);
            }
        }
    }
    return NULL;
}

void Thread_Exit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->max_num; ++i)
    {
        if (pool->threadID[i] == tid)
        {
            pool->threadID[i] = 0;
            printf("thread %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

2.3 测试部分

#include "threadpool.h"

void Task_Test(void* args)
{
    int num = *(int*)args;
    printf("thread %ld is working,number = %d\n", pthread_self(), num);
    sleep(1);
    return;
}

int main()
{
    //创建线程池
    ThreadPool* pool = Thread_Pool_Create(3, 10, 100);
    for (int i = 0; i < 100; ++i)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        Thread_Pool_Add(pool, Task_Test, num);
    }

    sleep(30);

    Thread_Pool_Destroy(pool);
    return 0;
}

3. C++实现线程池

3.1 头文件

#define _CRT_SECURE_NO_WARNINGS
#pragma once
#include<iostream>
#include<string.h>
#include<string>
#include<pthread.h>
#include<stdlib.h>
#include<queue>
#include<unistd.h>
using namespace std;


using callback = void(*)(void*);
//任务的结构体
template<typename T>
struct Task
{
    Task()
    {
        function = nullptr;
        args = nullptr;
    }
    Task(callback fun, void* args)
    {
        function = fun;
        this -> args = (T*)args;
    }
    callback function;
    T* args;
};

//任务队列
template<typename T>
class TaskQueue
{
public:
    TaskQueue()
    {
        pthread_mutex_init(&mutex,NULL);
    }

    ~TaskQueue()
    {
        pthread_mutex_destroy(&mutex);
    }

    //添加任务
    void AddTask(Task<T> task)
    {
        pthread_mutex_lock(&mutex);
        queue.push(task);
        pthread_mutex_unlock(&mutex);
    }
    void AddTask(callback fun, void* args)
    {
        pthread_mutex_lock(&mutex);
        Task<T> task(fun,args); 
        queue.push(task);
        pthread_mutex_unlock(&mutex);
    }

    //取出一个任务
    Task<T> TakeTask()
    {
        Task<T> task;
        pthread_mutex_lock(&mutex);
        if (queue.size() > 0)
        {
            task = queue.front();
            queue.pop();
        }
        pthread_mutex_unlock(&mutex);
        return task;
    }

    //获取当前队列中的任务个数
    inline int GetTaskNum()
    {
        return queue.size();
    }
private:
    pthread_mutex_t mutex; //互斥锁
    std::queue<Task<T>> queue;
};


//线程池
template<typename T>
class ThreadPool
{
public:
    ThreadPool(int min , int max)
    {
        //实例化任务队列
        taskqueue = new TaskQueue<T>;

        //初始化线程池
        min_num = min;
        max_num = max;
        busy_num = 0;
        live_num = min;

        //根据线程最大上限,给线程数组分配内存
        threadID = new pthread_t[max];
        if (threadID == nullptr)
        {
            cout << "new threadID fail" << endl;
        }

        //初始化线程ID
        memset(threadID, 0, sizeof(pthread_t) * max);

        //初始化互斥锁和条件变量
        if (pthread_mutex_init(&mutex_pool, NULL) != 0 ||
            pthread_cond_init(&notempty, NULL) != 0)
        {
            cout << "mutex or cond init fail" << endl;
        }

        //创建线程
        for (size_t i = 0; i < min; ++i)
        {
            pthread_create(&threadID[i], NULL, Work, this);
            cout << "create thread ID :" << to_string(threadID[i]) << endl;
        }
        pthread_create(&managerID, NULL, Manage, this);

    }

    ~ThreadPool()
    {
        shutdown = true;
        //销毁管理者进程
        pthread_join(managerID, NULL);

        //唤醒消费者
        for (int i = 0; i < live_num; ++i)
        {
            pthread_cond_signal(&notempty);
        }

        if (taskqueue)
        {
            delete taskqueue;
        }

        if (threadID)
        {
            delete[] threadID;
        }

        pthread_mutex_destroy(&mutex_pool);
        pthread_cond_destroy(&notempty);
    }

    //添加任务
    void Add_Task(Task<T> task)
    {
        if (shutdown)
            return;

        //添加任务,不需加锁,队列中有
        taskqueue->AddTask(task);

        //唤醒消费者
        pthread_cond_signal(&notempty);
    }

    //获取忙线程个数
    int Get_Busy_Num()
    {
        int busynum = 0;
        pthread_mutex_lock(&mutex_pool);
        busynum = busy_num;
        pthread_mutex_unlock(&mutex_pool);
        return busynum;
    }

    //获取存活线程个数
    int Get_Live_Num()
    {
        int livenum = 0;
        pthread_mutex_lock(&mutex_pool);     
        livenum = live_num;             
        pthread_mutex_unlock(&mutex_pool);     
        return livenum;                         
    }

private:
    //工作的线程任务函数
    static void* Work(void* args)
    {
        ThreadPool* pool = static_cast<ThreadPool*>(args);

        while (true)
        {
            //访问任务队列加锁
            pthread_mutex_lock(&pool->mutex_pool);
            //判断任务队列是否为空,空了就堵塞
            while (pool->taskqueue->GetTaskNum() == 0 && !pool->shutdown)
            {
                cout << "thread :" << to_string(pthread_self()) << " waiting..." << endl;
                pthread_cond_wait(&pool->notempty, &pool->mutex_pool);

                //解除后 判断是否要销毁进程
                if (pool->exit_num > 0)
                {
                    pool->exit_num--;
                    if (pool->live_num > pool->min_num)
                    {
                        pool->live_num--;
                        pthread_mutex_unlock(&pool->mutex_pool);
                        pool->Thread_Exit();
                    }
                }
            }

            //判断线程池是否要关闭了
            if (pool->shutdown)
            {
                pthread_mutex_unlock(&pool->mutex_pool);
                pool->Thread_Exit();
            }

            //从任务队列取出任务
            Task<T> task = pool->taskqueue->TakeTask();
            pool->busy_num++;
            pthread_mutex_unlock(&pool->mutex_pool);

            cout << "thread :" << to_string(pthread_self()) << " start working..." << endl;
            task.function(task.args);

            delete task.args;
            task.args = nullptr;

            //任务结束
            cout << "thread :" << to_string(pthread_self()) << " end working..." << endl;
            pthread_mutex_lock(&pool->mutex_pool);
            pool->busy_num--;
            pthread_mutex_unlock(&pool->mutex_pool);

        }
        return nullptr;
    }

    //管理者线程任务函数
    static void* Manage(void* args)
    {
        ThreadPool* pool = static_cast<ThreadPool*>(args);
        while (!pool->shutdown)
        {
            //5秒检测一次
            sleep(5);
            pthread_mutex_lock(&pool->mutex_pool);
            int livenum = pool->live_num;
            int busynum = pool->busy_num;
            int queuesize = pool->taskqueue->GetTaskNum();
            pthread_mutex_unlock(&pool->mutex_pool);

            const int NUMBER = 2;
            //创建
            if (queuesize > livenum && livenum < pool->max_num)
            {
                pthread_mutex_lock(&pool->mutex_pool);
                int num = 0;
                for (int i = 0; i < pool->max_num && 
                                    num < NUMBER && 
                    pool->live_num < pool->max_num ; ++i)
                {
                    if (pool->threadID[i] == 0)
                    {
                        pthread_create(&pool->threadID[i], NULL, Work, pool);
                        num++;
                        pool->live_num++;
                    }
                }
                pthread_mutex_unlock(&pool->mutex_pool);
            }

            //销毁
            if (busynum * 2 < livenum && livenum > pool->min_num)
            {
                pthread_mutex_lock(&pool->mutex_pool);
                pool->exit_num = NUMBER;
                pthread_mutex_unlock(&pool->mutex_pool);
                for (int i = 0; i < NUMBER; ++i)
                {
                    pthread_cond_signal(&pool->notempty);
                }
            }
        }
        return nullptr;
    }

    void Thread_Exit()
    {
        pthread_t tid = pthread_self();
        for (int i = 0; i < max_num; ++i)
        {
            if (threadID[i] == tid)
            {
                cout << "thread :" << to_string(pthread_self()) << "exiting" << endl;
                threadID[i] = 0;
                break;
            }
        }
        pthread_exit(NULL);
    }
private:
    pthread_mutex_t mutex_pool;
    pthread_cond_t notempty;
    pthread_t* threadID;
    pthread_t managerID;
    TaskQueue<T>* taskqueue;
    int min_num;
    int max_num;
    int busy_num;
    int live_num;
    int exit_num;
    bool shutdown = false;

};

3.2 测试部分

#include"ThreadPool.h"

void Task_Test(void* args)
{
    int num = *(int*)args;
    cout<<"thread :" << pthread_self() << " is working " << "number =" << num <<endl;
    sleep(1);
    return;
}

int main()
{
    //创建线程池
    ThreadPool<int> pool(3, 10);
    for (int i = 0; i < 100; ++i)
    {
        int* num = new int(i+100);
        pool.Add_Task(Task<int>(Task_Test,num));
    }
    sleep(40);
    return 0;
}


以上只是基于C修改出对应于C++的代码

并且以上代码存在一个问题 输出的结果有时会因为线程原因出现混乱 可以通过加锁来解决,但锁的数量超过1就容易导致死锁问题,所以暂且搁置


4. C++11实现线程池

并非原创,摘于此处

4.1 头文件

#pragma once
#include<queue>
#include<thread>
#include<condition_variable>
#include<atomic>
#include<stdexcept>
#include<future>
#include<vector>
#include<functional>

namespace std
{
    #define THREADPOOL_MAX_NUM 16
    class threadpool
    {
        unsigned short _initsize;
        using Task = function<void()>;
        vector<thread> _pool;
        queue<Task> _tasks;
        mutex _lock;
        mutex _lockGrow;
        condition_variable _task_cv;
        atomic<bool> _run{true};
        atomic<int> _spa_trd_num{0};

    public:
        inline threadpool(unsigned short size = 4)
        {
            _initsize = size;
            Add_Thread(size);
        }
        inline ~threadpool()
        {
            _run = false;
            _task_cv.notify_all();
            for (thread& thread : _pool)
            {
                if (thread.joinable())
                    thread.join();
            }
        }

        template<typename F,typename... Args>
        auto commit(F&& f, Args&& ...args) -> future<decltype(f(args...)) >
        {
            if (!_run)
                throw runtime_error{"commit auto stop"};
            using RetType = decltype(f(args...));
            auto task = make_shared<packaged_task<RetType()>>(bind(forward<F>(f), forward<Args>(args)...));
            future<RetType> future = task->get_future();
            {
                lock_guard<mutex> lock{_lock};
                _tasks.emplace([task]() {(*task)(); });
            }
            if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
                Add_Thread(1);
            _task_cv.notify_one();
            return future;
        }

        template<typename F>
        void commit2(F&& f)
        {
            if (!_run)
                return;
            {
                lock_guard<mutex> lock{_lock};
                _tasks.emplace(forward<F>(f));
            }
            if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
                Add_Thread(1);
            _task_cv.notify_one();
        }

        int idlCount() { return _spa_trd_num; }
        int thrCount() { return _pool.size(); }

    private:
        void Add_Thread(unsigned short size)
        {
            if (!_run)
                throw runtime_error{"Add_Thread stop"};
            unique_lock<mutex> lockgrow{_lockGrow};
            for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
            {
                _pool.emplace_back([this]
                {
                    while (true)
                    {
                        Task task;
                        {
                            unique_lock<mutex> lock{_lock};
                            _task_cv.wait(lock, [this] {return !_run || !_tasks.empty(); });
                            if (!_run && _tasks.empty())
                                return;
                            _spa_trd_num--;
                            task = move(_tasks.front());
                            _tasks.pop();
                        }
                        task();
                        if (_spa_trd_num > 0 && _pool.size() > _initsize)
                            return;
                        {
                            unique_lock<mutex> lock{_lock};
                            _spa_trd_num++;
                        }
                    }
                });
                {
                    unique_lock<mutex> lock{_lock};
                    _spa_trd_num++;
                }
            }    
        }
    };
}

要使用pthread依赖库


4.2 测试部分

#include"ThreadPool.hpp"
#include<iostream>

void fun1(int slp)
{
    printf("fun1  %ld\n", std::this_thread::get_id());
    if (slp > 0)
    {
        printf("fun1 sleep %ld  =========  %ld\n", slp, std::this_thread::get_id());
        std::this_thread::sleep_for(std::chrono::milliseconds(slp));
    }
}

struct gfun
{
    int operator()(int n)
    {
        printf("gfun  %ld\n", n, std::this_thread::get_id());
        return 42;
    }
};

class A
{
public:
    static int Afun(int n = 0) //函数必须是 static 的才能直接使用线程池
    {
        std::cout << n << "Afun  " << std::this_thread::get_id() << std::endl;
        return n;
    }

    static std::string Bfun(int n, std::string str, char c) 
    {
        std::cout << n << "Bfun   " << str.c_str() << "  " << (int)c << "  " << std::this_thread::get_id() << std::endl;
        return str;
    }
};

int main()
try {
    std::threadpool executor{ 50 };
    std::future<void> ff = executor.commit(fun1, 0);
    std::future<int> fg = executor.commit(gfun{}, 0);
    //std::future<int> gg = executor.commit(A::Afun, 9999); //IDE提示错误,但可以编译运行
    std::future<std::string> gh = executor.commit(A::Bfun, 9998, "mult args", 123);
    std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh !  " << std::this_thread::get_id() << std::endl; return "hello,fh ret !\n"; });

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << fg.get() << "  " << fh.get().c_str() << "  " << std::this_thread::get_id() << std::endl;

    std::cout << " =======  fun1,55 ========= " << std::this_thread::get_id() << std::endl;
    executor.commit(fun1, 55).get();    //调用.get()获取返回值会等待线程执行完

    std::threadpool pool(4);
    std::vector< std::future<int> > results;

    for (int i = 0; i < 8; ++i)
    {
        results.emplace_back(
            pool.commit([i] {
                std::cout << "hello " << i << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(3));
                std::cout << "world " << i << std::endl;
                return i * i;
                })
        );
    }
    std::this_thread::sleep_for(std::chrono::seconds(15));
    for (auto&& result : results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;
    return 0;
}
catch (std::exception& e)
{
    std::cout << "some error " << std::this_thread::get_id() << e.what() << std::endl;
}
  • 测试结果 C, Cpp, Cpp11 三种方式实现线程池(源码)
点赞
收藏
评论区
推荐文章
Wesley13 Wesley13
3年前
java各种面试问题
二、Java多线程相关线程池的原理,为什么要创建线程池?创建线程池的方式;线程的生命周期,什么时候会出现僵死进程;说说线程安全问题,什么实现线程安全,如何实现线程安全;创建线程池有哪几个核心参数?如何合理配置线程池的大小?volatile、ThreadLocal的使用场景和原理;
Wesley13 Wesley13
3年前
java 代码实现使用Druid 链接池获取数据库链接
因为原先使用的c3p0链接池,时常出现:APPARENTDEADLOCK!!!Creatingemergencythreadsforunassignedpendingtasks,以及出现线程死锁的情况导致服务器经常需要重启,很是头疼。所以考虑使用Druid链接池来代替原先的c3p0.AlibabaDruid中文文档(https:/
Wesley13 Wesley13
3年前
java线程
1.进程和线程的区别是什么?进程是执行着的应用程序,而线程是进程内部的一个执行序列。一个进程可以有多个线程。线程又叫轻量级进程。2.创建线程有几种不同方式?你喜欢哪种?为什么?有三种方式可以用来创建线程:继承Thread类实现Runnable接口应用程序可以使用Executor框架来创建线程池实现Runnabl
Stella981 Stella981
3年前
Executor线程池
线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。_0_|_1_线程实现方式Thread、Runnable、Callable//实现Runnable接口的类将被Thread执行,表示一个基本任务p
Wesley13 Wesley13
3年前
C++面向对象实现封装线程池
<htmlxmlns"http://www.w3.org/1999/xhtml"<head<stylebody,table{fontfamily:微软雅黑;fontsize:13.5pt}table{bordercollapse:collapse;border:solidgray;borderwidth:2px
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
可莉 可莉
3年前
12、线程池ThreadPool
1、总共有5种线程池:1、单个线程ExecutorServicepool1Executors.newSingleThreadExecutor();!(https://oscimg.oschina.net/oscnet/3367924c291d0b7c6d59666588e
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Wesley13 Wesley13
3年前
C++11 STL线程库实现一个简单的线程池
使用C11STL线程库实现一个线程池。处理机制是抢占式的,即所有线程从一个队列(std::queue)中获取任务执行(计算字符串简单HASH值),使用std::mutex和std::conditional\_variable实现队列访问并发协调。include<iostreaminclude<ioma
Wesley13 Wesley13
3年前
JAVA面试专题二:线程
文章目录       1、创建线程有几种不同的方式?你喜欢哪一种?为什么?       2、什么是线程池?为什么要使用它?       3、线程池的有几种实现方式       4、Runnable接口和Callable接口的区别       5、start()方法和run()方法的区别       
nitesy
nitesy
Lv1
技术交流(吹水)群: q976036684
文章
6
粉丝
1
获赞
2