100 行写一个 go 的协程池 (任务池)

Wesley13
• 阅读 961

前言


go 的 goroutine 提供了一种较线程而言更廉价的方式处理并发场景, go 使用二级线程的模式, 将 goroutine 以 M:N 的形式复用到系统线程上, 节省了 cpu 调度的开销, 也避免了用户级线程(协程)进行系统调用时阻塞整个系统线程的问题。【1】

但 goroutine 太多仍会导致调度性能下降、GC 频繁、内存暴涨, 引发一系列问题。在面临这样的场景时, 限制 goroutine 的数量、重用 goroutine 显然很有价值。

本文正是针对上述情况而提供一种简单的解决方案, 编写一个协程池(任务池)来实现对 goroutine 的管控。

思路

要解决这个问题, 要思考两个问题

  • goroutine 的数量如何限制, goroutine 如何重用

  • 任务如何执行

goroutine 的数量如何限制, goroutine 如何重用

说到限制和重用, 那么最先想到的就是池化。比如 TCP 连接池, 线程池, 都是有效限制、重用资源的最好实践。所以, 我们可以创建一个 goroutine 池, 用来管理 goroutine。

任务如何执行

在使用原生 goroutine 的场景中, 运行一个任务直接启动一个 goroutine 来运行, 在池化的场景而言, 任务也是要在 goroutine 中执行, 但是任务需要任务池来放入 goroutine。

生产者消费者模型

在连接池中, 连接在使用时从池中取出, 用完后放入池中。对于 goroutine 而言, goroutine 通过语言关键字启动, 无法像连接一样操作。那么如何让 goroutine 可以执行任务, 且执行后可以重新用来执行其它任务呢?这里就需要使用生产者消费者模型了:

生产者 --(生产任务)--> 队列 --(消费任务)--> 消费者

用来执行任务的 goroutine 可以作为消费者, 操作任务池的 goroutine 作为生产者, 而队列则可以使用 go 的 buffer channel, 任务池的建模到此结束。

实现

Talk is cheap. Show me the code.

任务的定义

任务要包含需要执行的函数、以及函数要传的参数, 因为参数类型、个数不确定, 这里使用可变参数和空接口的形式

type Task struct {    Handler func(v ...interface{})    Params  []interface{}}

任务池的定义

任务池的定义包括了池的容量 capacity、当前运行的 worker(goroutine)数量 runningWorkers、任务队列(channel)taskC、关闭任务池的 channel closeC 以及任务池的状态 state(运行中或已关闭, 用于安全关闭任务池)

type Pool struct {    capacity       uint64    runningWorkers uint64    state          int64    taskC          chan *Task    closeC         chan bool}

任务池的构造函数:

var ErrInvalidPoolCap = errors.New("invalid pool cap")const (    RUNNING = 1    STOPED = 0)func NewPool(capacity uint64) (*Pool, error) {    if capacity <= 0 {        return nil, ErrInvalidPoolCap    }    return &Pool{        capacity: capacity,        state:    RUNNING,        // 初始化任务队列, 队列长度为容量        taskC:    make(chan *Task, capacity),        closeC:   make(chan bool),    }, nil}

启动 worker

新建 run() 方法作为启动 worker 的方法:

func (p *Pool) run() {    p.runningWorkers++ // 运行中的任务加一    go func() {        defer func() {            p.runningWorkers-- // worker 结束, 运行中的任务减一        }()        for {            select { // 阻塞等待任务、结束信号到来            case task, ok := <-p.taskC: // 从 channel 中消费任务                if !ok { // 如果 channel 被关闭, 结束 worker 运行                    return                }                // 执行任务                task.Handler(task.Params...)            case <-p.closeC: // 如果收到关闭信号, 结束 worker 运行                return            }        }    }()}

上述代码中, runningWorkers 的加减直接使用了自增运算, 但是考虑到启动多个 worker 时, runningWorkers 就会有数据竞争, 所以我们使用 sync.atomic 包来保证 runningWorkers 的自增操作是原子的。

对 runningWorkers 的操作进行封装:

func (p *Pool) incRunning() { // runningWorkers + 1    atomic.AddUint64(&p.runningWorkers, 1)}func (p *Pool) decRunning() { // runningWorkers - 1    atomic.AddUint64(&p.runningWorkers, ^uint64(0))}func (p *Pool) GetRunningWorkers() uint64 {    return atomic.LoadUint64(&p.runningWorkers)}

打铁乘热, 对于 capacity 的操作也考虑数据竞争, 封装 GetCap() 方法:

func (p *Pool) GetCap() uint64 {    return atomic.LoadUint64(&p.capacity)}

run() 方法改造:

func (p *Pool) run() {    p.incRunning()    go func() {        defer func() {            p.decRunning()        }()        for {            select {            case task, ok := <-p.taskC:                if !ok {                    return                }                task.Handler(task.Params...)            case <-p.closeC:                return            }        }    }()}

生产任务

新建 Put() 方法用来将任务放入池中:

func (p *Pool) Put(task *Task) {    if p.GetRunningWorkers() < p.GetCap() { // 如果任务池满, 则不再创建 worker        // 创建启动一个 worker        p.run()    }    // 将任务推入队列, 等待消费    p.taskC <- task}

任务池安全关闭

当有关闭任务池来节省 goroutine 资源的场景时, 我们需要有一个关闭任务池的方法。

直接销毁 worker 关闭 channel 并不合适, 因为此时可能还有任务在队列中没有被消费掉。要确保所有任务被安全消费后再销毁掉 worker。

首先, 在关闭任务池时, 需要先关闭掉生产任务的入口。改造 Put() 方法:

var ErrPoolAlreadyClosed = errors.New("pool already closed")func (p *Pool) Put(task *Task) error {    if p.state == STOPED { // 如果任务池处于关闭状态, 再 put 任务会返回 ErrPoolAlreadyClosed 错误        return ErrPoolAlreadyClosed    }        if p.GetRunningWorkers() < p.GetCap() {        p.run()    }    p.taskC <- task        return nil}

在 run() 方法中已经对 closeC 进行了监听, 销毁 worker 只需等待任务被消费完后向 closeC 发出信号。Close() 方法如下:

func (p *Pool) Close() {    p.state = STOPED // 设置 state 为已停止    for len(p.taskC) > 0 { // 阻塞等待所有任务被 worker 消费    }    p.closeC <- true // 发送销毁 worker 信号    close(p.taskC) // 关闭任务队列}

panic handler

每个 worker 都是一个 goroutine, 如果 goroutine 中产生了 panic, 会导致整个程序崩溃。为了保证程序的安全进行, 任务池需要对每个 worker 中的 panic 进行 recover 操作, 并提供可订制的 panic handler。

更新任务池定义:

type Pool struct {    capacity       uint64    runningWorkers uint64    state          int64    taskC          chan *Task    closeC         chan bool    PanicHandler   func(interface{})}

更新 run() 方法:

func (p *Pool) run() {    p.incRunning()    go func() {        defer func() {            p.decRunning()            if r := recover(); r != nil { // 恢复 panic                if p.PanicHandler != nil { // 如果设置了 PanicHandler, 调用                    p.PanicHandler(r)                } else { // 默认处理                    log.Printf("Worker panic: %s\n", r)                }            }        }()        for {            select {            case task, ok := <-p.taskC:                if !ok {                    return                }                task.Handler(task.Params...)            case <-p.closeC:                return            }        }    }()}

使用

OK, 我们的任务池就这么简单的写好了, 试试:

func main() {    // 创建任务池    pool, err := NewPool(10)    if err != nil {        panic(err)    }    for i := 0; i < 20; i++ {        // 任务放入池中        pool.Put(&Task{            Handler: func(v ...interface{}) {                fmt.Println(v)            },            Params: []interface{}{i},        })    }    time.Sleep(1e9) // 等待执行}

详细例子见 mortar/examples

benchmark

作为协程池, 性能和内存占用的指标测试肯定是少不了的, 测试数据才是最有说服力的

测试流程

100w 次执行,原子增量操作

测试任务:

var sum int64func demoTask(v ...interface{}) {    for i := 0; i < 100; i++ {        atomic.AddInt64(&sum, 1)    }}

测试方法:

var runTimes = 1000000// 原生 goroutinefunc BenchmarkGoroutineSetTimes(b *testing.B) {    for i := 0; i < runTimes; i++ {        go demoTask()    }}// 使用协程池func BenchmarkPutSetTimes(b *testing.B) {    pool, err := NewPool(20)    if err != nil {        b.Error(err)    }    ctx := context.Background()    task := &Task{        Handler: demoTask,    }    for i := 0; i < runTimes; i++ {        pool.Put(ctx, task)    }}

对比结果

模式

操作时间消耗 ns/op

内存分配大小 B/op

内存分配次数 allocs/op

原生 goroutine (100w goroutine)

1596177880

103815552

240022

任务池开启 20 个 worker 20 goroutine)

1378909099

15312

89

使用任务池和原生 goroutine 性能相近(略好于原生)

使用任务池比直接 goroutine 内存分配节省 7000 倍左右, 内存分配次数减少 2700 倍左右

100 行写一个 go 的协程池 (任务池)

本文分享自微信公众号 - 会呼吸的Coder(BreathCoder)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

点赞
收藏
评论区
推荐文章
风斗 风斗
3年前
Kotlin 协程中,关于 runBlocking, launch ,withContext ,async,doAsync 之间的简单区别
引入大佬的话,Kotlin的协程,本质上是一个线程框架,它可以方便的切换线程的上下文(如主线程切换到子线程/子线程切回主线程)。而平时我们要想在AndroidStudio使用协程,先要在gradle引入协程依赖:implementation"org.jetbrains.kotlinx:kotlinxcoroutinescore:1.3.3"
GO语言协程的理解
以下内容大部分摘自许世伟的《GO语言核心编程》最近面试,在自己的简历上写了简单会一些GO语言。结果被面试官问了GO语言goroutine的原理。自己看倒是看过,时间长了又给忘了。特此写下此文以长记性。协程:协程本质上是一种用户态线程,不需要操作系统来进行抢占式调度,并且在真正的实现中寄存于线程中,因此系统开销极小,可以有效的提高线程任务的并发性,而避
Wesley13 Wesley13
3年前
java线程笔记
线程个人理解:线程是一种运行单元,是进程内的拆分(就像一个房子,里面可以有电视在播放、人在吃饭),不同的线程可以同时干不同的事情。其他:进程:进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。协程:协程是比线程的更小的一种程序的拆分,和
Stella981 Stella981
3年前
Gevent简明教程
1、前述进程线程协程异步并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。多进程编程在python中有类似C的os.fork,更高层封装的有multiprocessing标准库多线程编程python中有Thread和threading异步编程在linux下主要有三种实现selec
Stella981 Stella981
3年前
Goroutine(协程)为何能处理大并发?
简单来说:协程十分轻量,可以在一个进程中执行有数以十万计的协程,依旧保持高性能。进程、线程、协程的关系和区别:进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度。线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的)。协程和线程一样共享堆
Wesley13 Wesley13
3年前
Go 并发
Go并发并发指的是同时处理多个任务的能力。并行指的是并行处理多个任务的能力。并行不一定加快运行速度,因为并行组件之间可能需要互相通信。Go中使用协程,信道来处理并发。协程Go中主要通过协程实现并发。协程是与其他函数或方法一起并发运行的函数或方法,协程可以看作是轻量级线程,但是创建成本更小,我们经常
Stella981 Stella981
3年前
Go实现FastCgi Proxy Client 系列(三)优化篇
墨迹一点个人琐碎最近比较忙,以致于很久都没有写blog了,但是,golang的水平自认为是总算入门了。协程的个人理解网上的说法一般都是协程是轻量级线程。我个人认为协程的好处1.小2.无需在用户态和内核态切换(完全在用户态)3.无需线程上下文切换的开销(因为之上的好处)4.编码简单(原
Stella981 Stella981
3年前
Noark入门之线程模型
0x00单线程多进程单线程与单进程多线程的目的都是想尽可能的利用CPU,减少CPU的空闲时间,特别是多核环境,今天咱不做深度解读,跳过...0x01线程池锁最早的一部分游戏服务器是采用线程池的方式来处理玩家的业务请求,以达最大限度的利用多核优势来提高处理业务能力。但线程池同时也带来了并发问题,为了解决同一玩家多个业务请求不被
Easter79 Easter79
3年前
Swoole2.0内置协程并发测试
Swoole2.0是一个革命性的版本,它内置了协程的支持。与Go语言协程不同,Swoole协程完全不需要开发者添加任何额外的关键词,直接以过去最传统的同步阻塞模式编写代码,底层自动进行协程调度实现异步IO。使并发编程变得非常简单。最新的版本中,内置协程已支持PHP7,同时兼具了性能和并发能力,Swoole的强大超乎想象。本文基于Github最新的Sw
Stella981 Stella981
3年前
Golang学习笔记:goroutine
1.goroutinegoroutine是go语言的并发体。在go语言里面能使用go关键字来实现并发。gofunc()1.1概念介绍goroutine本质上是协程,我刚刚学习的时候就粗略地认为goroutine是线程,直到最近才开始搞明白goroutine的基本概念。<fon