go语言之使用消费者模式实现协程池管理代码示例

2023-06-01 00:00:00 语言 示例 消费者

通俗来讲产生数据的模块,就形象地称为生产者,而处理数据的模块,就称为消费者


消费者模式实现协程池

频发的协程的开辟与剔除,如果对性能有着很高的要求,

建议优化成固定数目的协程取channel里面取数据进行消费的模式,可以避免协程的创建与注销


代码示例:

package main
import (
    "fmt"
    "strconv"
    "sync"
)
// 任务对象
type task struct {
    Production
    Consumer
}
// 设置消费者数目,也就是work pool大小
func (t *task) setConsumerPoolSize(poolSize int) {
    t.Production.Jobs = make(chan *Job, poolSize*10)
    t.Consumer.WorkPoolNum = poolSize
}
// 任务数据对象
type Job struct {
    Data string
}
func NewTask(handler func(jobs chan *Job) (b bool)) (t *task) {
    t = &task{
        Production: Production{Jobs: make(chan *Job, 100)},
        Consumer:   Consumer{WorkPoolNum: 10, Handler: handler},
    }
    return
}
type Production struct {
    Jobs chan *Job
}
func (c Production) AddData(data *Job) {
    c.Jobs <- data
}
type Consumer struct {
    WorkPoolNum int
    Handler     func(chan *Job) (b bool)
    Wg          sync.WaitGroup
}
// 异步开启多个work去处理任务,但是所有work执行完毕才会退出程序
func (c *Consumer) disposeData(data chan *Job) {
    for i := 0; i <= c.WorkPoolNum; i++ {
        c.Wg.Add(1)
        go func() {
            defer func() {
                c.Wg.Done()
            }()
            c.Handler(data)
        }()
    }
    c.Wg.Wait()
}
func main() {
    // 实现一个用于处理数据的闭包,实现业务代码
    consumerHandler := func(jobs chan *Job) (b bool) {
        for jobs := range jobs {
            fmt.Println(jobs)
        }
        return
    }
    // new一个任务处理对象
    t := NewTask(consumerHandler)
    t.setConsumerPoolSize(500) // 500个协程同时消费
    // 根据自己的业务去生成数据通过AddData方法添加数据到生成channel,这里是100万条数据
    go func() {
        for i := 0; i < 1000000; i++ {
            job := new(Job)
            iStr := strconv.Itoa(i)
            job.Data = "定义任务数据格式" + iStr
            t.AddData(job)
        }
    }()
    // 消费者消费数据
    t.Consumer.disposeData(t.Production.Jobs)


相关文章