IT科技

并发编程包之 Errgroup

时间:2010-12-5 17:23:32  作者:数据库   来源:域名  查看:  评论:0
内容摘要:本文转载自微信公众号「Golang来啦」,作者Seekload 。转载本文请联系Golang来啦公众号。四哥水平有限,如果有翻译或理解错误的点,烦请帮忙指出,感谢!这是系列文章的第二篇,第一篇文章点击

本文转载自微信公众号「Golang来啦」,并发编程包作者Seekload 。并发编程包转载本文请联系Golang来啦公众号。并发编程包

四哥水平有限,并发编程包如果有翻译或理解错误的并发编程包点,烦请帮忙指出,并发编程包感谢!

这是并发编程包系列文章的第二篇,第一篇文章点击这里查看。并发编程包

原文如下:

基于 goroutine 和 channel 的并发编程包并发特性,使得 Go 成为了强大的并发编程包并发语言。上一篇文章,并发编程包我们讨论了如何构建 workerPool 来提高程序的并发编程包并发性能,换句话说,并发编程包避免耗尽系统资源。并发编程包但那只是并发编程包一个简单的示例,演示我们应该如何实现。

基于对上一篇文章的学习,在这篇文章里面,我们将构建一个健壮的解决方案,以便在任何其他应用程序里面可以使用该方案。网络上有其他复杂架构的解决方案,比如使用调度器等等。实际上,网站模板我们并不需要这些复杂的设计,仅仅使用一个共享 channel 就可以解决问题。我们一起来看下,该如何构建呢?

代码结构

我们创建了一个通用的 workerPool 包,根据业务所需的并发性使用 worker 来处理任务。一起来看下目录结构:

workerpool ├── pool.go ├── task.go └── worker.go 

workerpool 目录在项目的根目录下。Task 是需要处理单个工作单元;Worker 是一个简单的 worker 函数,用于执行任务;而 Pool 用于创建、管理 workers。

实现

先看下 Task 代码:

// workerpool/task.go package workerpool import (  "fmt" ) type Task struct {   Err  error  Data interface{ }  f    func(interface{ }) error } func NewTask(f func(interface{ }) error, data interface{ }) *Task {   return &Task{ f: f, Data: data} } func process(workerID int, task *Task) {   fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)  task.Err = task.f(task.Data) } 

Task 是一个简单的结构体,保存处理任务所需要的一切数据。创建 task 时,传递了 Data 和待执行函数 f,process() 函数会处理任务。处理任务时,将 Data 作为参数传递给函数 f,并将执行结果保存在 Task.Err 里。

我们来看下 Worker 是如何处理任务的:

// workerpool/worker.go package workerpool import (  "fmt"  "sync" ) // Worker handles all the work type Worker struct {   ID       int  taskChan chan *Task } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker {   return &Worker{    ID:       ID,   taskChan: channel,  } } // Start starts the worker func (wr *Worker) Start(wg *sync.WaitGroup) {   fmt.Printf("Starting worker %d\n", wr.ID)  wg.Add(1)  go func() {    defer wg.Done()   for task := range wr.taskChan {     process(wr.ID, task)   }  }() } 

我们创建了一个小巧的 Worker 结构体,包含 worker ID 和 一个保存待处理任务的 channel。亿华云在 Start() 方法里,使用 for range 从 taskChan 读取任务并处理。可以想象的到,多个 worker 可以并发地执行任务。

workerPool

我们通过实现 Task 和 Worker 来处理任务,但是好像还缺点什么东西,谁负责生成这些 worker 并将任务发送给它们?答案是:Worker Pool。

// workerpoo/pool.go package workerpool import (  "fmt"  "sync"  "time" ) // Pool is the worker pool type Pool struct {   Tasks   []*Task  concurrency   int  collector     chan *Task  wg            sync.WaitGroup } // NewPool initializes a new pool with the given tasks and // at the given concurrency. func NewPool(tasks []*Task, concurrency int) *Pool {   return &Pool{    Tasks:       tasks,   concurrency: concurrency,   collector:   make(chan *Task, 1000),  } } // Run runs all work within the pool and blocks until its // finished. func (p *Pool) Run() {   for i := 1; i <= p.concurrency; i++ {    worker := NewWorker(p.collector, i)   worker.Start(&p.wg)  }  for i := range p.Tasks {    p.collector <- p.Tasks[i]  }  close(p.collector)  p.wg.Wait() } 

上面的代码,pool 保存了所有待处理的任务,并且生成与 concurrency 数量一致的 goroutine,用于并发地处理任务。workers 之间共享缓存 channel -- collector。

所以,当我们把这个工作池跑起来时,可以生成满足所需数量的 worker,workers 之间共享 collector channel。接着,使用 for range 读取 tasks,并将读取到的 task 写入 collector 里。我们使用 sync.WaitGroup 实现协程之间的同步。现在我们有了一个很好的解决方案,一起来测试下。

// main.go package main import (  "fmt"  "time"  "github.com/Joker666/goworkerpool/workerpool" ) func main() {   var allTask []*workerpool.Task  for i := 1; i <= 100; i++ {    task := workerpool.NewTask(func(data interface{ }) error {     taskID := data.(int)    time.Sleep(100 * time.Millisecond)    fmt.Printf("Task %d processed\n", taskID)    return nil   }, i)   allTask = append(allTask, task)  }  pool := workerpool.NewPool(allTask, 5)  pool.Run() } 

上面的服务器托管代码,创建了 100 个任务并且使用 5 个并发处理这些任务。

输出如下:

Worker 3 processes task 98 Task 92 processed Worker 2 processes task 99 Task 98 processed Worker 5 processes task 100 Task 99 processed Task 100 processed Took ===============> 2.0056295s 

处理 100 个任务花费了 2s,如何我们将并发数提高到 10,我们会看到处理完所有任务只需要大约 1s。

我们通过实现 workerPool 构建了一个健壮的解决方案,具有并发性、错误处理、数据处理等功能。这是个通用的包,不耦合具体的实现。我们可以使用它来解决一些大问题。

进一步扩展:后台处理任务

实际上,我们还可以进一步扩展上面的解决方案,以便 worker 可以在后台等待我们投递新的任务并处理。为此,代码需要做一些修改,Task 结构体保持不变,但是需要小改下 Worker,看下面代码:

// workerpool/worker.go // Worker handles all the work type Worker struct {   ID       int  taskChan chan *Task  quit     chan bool } // NewWorker returns new instance of worker func NewWorker(channel chan *Task, ID int) *Worker {   return &Worker{    ID:       ID,   taskChan: channel,   quit:     make(chan bool),  } } .... // StartBackground starts the worker in background waiting func (wr *Worker) StartBackground() {   fmt.Printf("Starting worker %d\n", wr.ID)  for {    select {    case task := <-wr.taskChan:    process(wr.ID, task)   case <-wr.quit:    return   }  } } // Stop quits the worker func (wr *Worker) Stop() {   fmt.Printf("Closing worker %d\n", wr.ID)  go func() {    wr.quit <- true  }() } 

Worker 结构体新加 quit channel,并且新加了两个方法。StartBackgorund() 在 for 循环里使用 select-case 从 taskChan 队列读取任务并处理,如果从 quit 读取到结束信号就立即返回。Stop() 方法负责往 quit 写入结束信号。

添加完这两个新的方法之后,我们来修改下 Pool:

// workerpool/pool.go type Pool struct {   Tasks   []*Task  Workers []*Worker  concurrency   int  collector     chan *Task  runBackground chan bool  wg            sync.WaitGroup } // AddTask adds a task to the pool func (p *Pool) AddTask(task *Task) {   p.collector <- task } // RunBackground runs the pool in background func (p *Pool) RunBackground() {   go func() {    for {     fmt.Print("⌛ Waiting for tasks to come in ...\n")    time.Sleep(10 * time.Second)   }  }()  for i := 1; i <= p.concurrency; i++ {    worker := NewWorker(p.collector, i)   p.Workers = append(p.Workers, worker)   go worker.StartBackground()  }  for i := range p.Tasks {    p.collector <- p.Tasks[i]  }  p.runBackground = make(chan bool)  <-p.runBackground } // Stop stops background workers func (p *Pool) Stop() {   for i := range p.Workers {    p.Workers[i].Stop()  }  p.runBackground <- true } 

Pool 结构体添加了两个成员:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于维持 pool 存活状态。

添加了三个新的方法,AddTask() 方法用于往 collector 添加任务;RunBackground() 方法衍生出一个无限运行的 goroutine,以便 pool 维持存活状态,因为 runBackground 信道是空,读取空的 channel 会阻塞,所以 pool 能维持运行状态。接着,在协程里面启动 worker;Stop() 方法用于停止 worker,并且给 runBackground 发送停止信号以便结束 RunBackground() 方法。

我们来看下具体是如何工作的。

如果是在现实的业务场景中,pool 将会与 HTTP 服务器一块运行并消耗任务。我们通过 for 无限循环模拟这种这种场景,如果满足某一条件,pool 将会停止。

// main.go ... pool := workerpool.NewPool(allTask, 5) go func() {   for {    taskID := rand.Intn(100) + 20   if taskID%7 == 0 {     pool.Stop()   }   time.Sleep(time.Duration(rand.Intn(5)) * time.Second)   task := workerpool.NewTask(func(data interface{ }) error {     taskID := data.(int)    time.Sleep(100 * time.Millisecond)    fmt.Printf("Task %d processed\n", taskID)    return nil   }, taskID)   pool.AddTask(task)  } }() pool.RunBackground() 

当执行上面的代码时,我们就会看到有随机的 task 被投递到后台运行的 workers,其中某一个 worker 会读取到任务并完成处理。当满足某一条件时,程序便会停止退出。

总结

基于上一篇文章的初步解决方案,这篇文章讨论了通过 workPool 构建一个强大的解决方案。同时,我们进一步扩展了该方案,实现后台运行 pool 并处理投递的任务。

点击【阅读原文】直达代码仓库[1]。

参考资料

[1]代码仓库: https://github.com/Joker666/goworkerpool

via:https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7

作者:Hasan

copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap