广告位联系
返回顶部
分享到

Golang errgroup设计及实现原理解析

Golang 来源:互联网 作者:佚名 发布时间:2022-08-29 21:19:45 人浏览
摘要

开篇 继上次学习了信号量 semaphore扩展库的设计思路和实现之后,今天我们继续来看golang.org/x/sync包下的另一个经常被 Golang 开发者使用的大杀器:errgroup。 业务研发中我们经常会遇到需

开篇

继上次学习了信号量 semaphore 扩展库的设计思路和实现之后,今天我们继续来看 golang.org/x/sync 包下的另一个经常被 Golang 开发者使用的大杀器:errgroup。

业务研发中我们经常会遇到需要调用多个下游的场景,比如加载一个商品的详情页,你可能需要访问商品服务,库存服务,券服务,用户服务等,才能从各个数据源获取到所需要的信息,经过一些鉴权逻辑后,组装成前端需要的数据格式下发。

串行调用当然可以,但这样就潜在的给各个调用预设了【顺序】,必须执行完 A,B,C 之后才能执行 D 操作。但如果我们对于顺序并没有强需求,从语义上看两个调用是完全独立可并发的,那么我们就可以让他们并发执行。

这个时候就可以使用 errgroup 来解决问题。一定意义上讲,errgroup 是基于 WaitGroup 在错误传递上进行一些优化而提供出来的能力。它不仅可以支持 context.Context 的相关控制能力,还可以将子任务的 error 向上传递。

errgroup 源码拆解

errgroup 定义在 golang.org/x/sync/errgroup,承载核心能力的结构体是 Group。

Group

1

2

3

4

5

6

7

8

9

10

11

12

13

type token struct{}

// A Group is a collection of goroutines working on subtasks that are part of

// the same overall task.

//

// A zero Group is valid, has no limit on the number of active goroutines,

// and does not cancel on error.

type Group struct {

    cancel func()

    wg sync.WaitGroup

    sem chan token

    errOnce sync.Once

    err     error

}

Group 就是对我们上面提到的一堆子任务执行计划的抽象。每一个子任务都会有自己对应的 goroutine 来执行。

通过这个结构我们也可以看出来,errgroup 底层实现多个 goroutine 调度,等待的能力还是基于 sync.WaitGroup。

WithContext

我们可以调用 errgroup.WithContext 函数来创建一个 Group。

1

2

3

4

5

6

7

8

9

// WithContext returns a new Group and an associated Context derived from ctx.

//

// The derived Context is canceled the first time a function passed to Go

// returns a non-nil error or the first time Wait returns, whichever occurs

// first.

func WithContext(ctx context.Context) (*Group, context.Context) {

    ctx, cancel := context.WithCancel(ctx)

    return &Group{cancel: cancel}, ctx

}

这里可以看到,Group 的 cancel 函数本质就是为了支持 context 的 cancel 能力,初始化的 Group 只有一个 cancel 属性,其他都是默认的。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。

Wait

本质上和 WaitGroup 的 Wait 方法语义一样,既然我们是个 group task,就需要等待所有任务都执行完,这个语义就由 Wait 方法提供。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil。

1

2

3

4

5

6

7

8

9

// Wait blocks until all function calls from the Go method have returned, then

// returns the first non-nil error (if any) from them.

func (g *Group) Wait() error {

    g.wg.Wait()

    if g.cancel != nil {

        g.cancel()

    }

    return g.err

}

Wait 的实现非常简单。一个前置的 WaitGroup Wait,结束后只做了两件事:

  • 如果对于公共的 Context 有 cancel 函数,就将其 cancel,因为事情办完了;
  • 返回公共的 Group 结构中的 err 给调用方。

Go

Group 的核心能力就在于能够并发执行多个子任务,从调用者的角度,我们只需要传入要执行的函数,签名为:func() error即可,非常通用。如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。底层的调度逻辑由 Group 的 Go 方法实现:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

// Go calls the given function in a new goroutine.

// It blocks until the new goroutine can be added without the number of

// active goroutines in the group exceeding the configured limit.

//

// The first call to return a non-nil error cancels the group; its error will be

// returned by Wait.

func (g *Group) Go(f func() error) {

    if g.sem != nil {

        g.sem <- token{}

    }

    g.wg.Add(1)

    go func() {

        defer g.done()

        if err := f(); err != nil {

            g.errOnce.Do(func() {

                g.err = err

                if g.cancel != nil {

                    g.cancel()

                }

            })

        }

    }()

}

func (g *Group) done() {

    if g.sem != nil {

        <-g.sem

    }

    g.wg.Done()

}

我们重点来分析下 Go 这里发生了什么。

WaitGroup 加 1 用作计数;

启动一个新的 goroutine 执行调用方传入的 f() 函数;

  • 若 err 为 nil 说明执行正常;
  • 若 err 不为 nil,说明执行出错,此时将这个返回的 err 赋值给全局 Group 的变量 err,并将 context cancel 掉。注意,这里的处理在 once 分支中,也就是只有第一个来的错误会被处理。

在 defer 语句中调用 Group 的 done 方法,底层依赖 WaitGroup 的 Done,说明这一个子任务结束。

这里也可以看到,其实所谓 errgroup,我们并不是将所有子任务的 error 拼成一个字符串返回,而是直接在 Go 方法那里将第一个错误返回,底层依赖了 once 的能力。

SetLimit

其实看到这里,你有没有觉得 errgroup 的功能有点鸡肋?底层核心技术都是靠 WaitGroup 完成的,自己只不过是起了个 goroutine 执行方法,err 还只能保留一个。而且 Group 里面的 sem 那个 chan 是用来干什么呢?

这一节我们就来看看,Golang 对 errgroup 能力的一次扩充。

到目前为止,errgroup 是可以做到一开始人们对它的期望的,即并发执行子任务。但问题在于,这里是每一个子任务都开了个goroutine,如果是在高并发的环境里,频繁创建大量goroutine 这样的用法很容易对资源负载产生影响。开发者们提出,希望有办法限制 errgroup 创建的 goroutine 数量,参照这个 proposal: #27837

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

// SetLimit limits the number of active goroutines in this group to at most n.

// A negative value indicates no limit.

//

// Any subsequent call to the Go method will block until it can add an active

// goroutine without exceeding the configured limit.

//

// The limit must not be modified while any goroutines in the group are active.

func (g *Group) SetLimit(n int) {

    if n < 0 {

        g.sem = nil

        return

    }

    if len(g.sem) != 0 {

        panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))

    }

    g.sem = make(chan token, n)

}

SetLimit 的参数 n 就是我们对这个 Group 期望的最大 goroutine 数量,这里其实除去校验逻辑,只干了一件事:g.sem = make(chan token, n),即创建一个长度为 n 的 channel,赋值给 sem。

回忆一下我们在 Go 方法 defer 调用 done 中的表现,是不是清晰了很多?我们来理一下:

首先,Group 结构体就包含了 sem 变量,只作为通信,元素是空结构体,不包含实际意义:

1

2

3

4

5

6

7

type Group struct {

    cancel func()

    wg sync.WaitGroup

    sem chan token

    errOnce sync.Once

    err     error

}

如果你对整个 Group 的 Limit 没有要求,which is fine,你就直接忽略这个 SetLimit,errgroup 的原有能力就可以支持你的诉求。

但是,如果你希望保持 errgroup 的 goroutine 在一个上限之内,请在调用 Go 方法前,先 SetLimit,这样 Group 的 sem 就赋值为一个长度为 n 的 channel。

那么,当你调用 Go 方法时,注意下面的框架代码,此时 g.sem 不为 nil,所以我们会塞一个 token 进去,作为占坑,语义上表示我申请一个 goroutine 用。

1

2

3

4

5

6

7

8

9

10

func (g *Group) Go(f func() error) {

    if g.sem != nil {

        g.sem <- token{}

    }

    g.wg.Add(1)

    go func() {

        defer g.done()

                ...

    }()

}

当然,如果此时 goroutine 数量已经达到上限,这里就会 block 住,直到别的 goroutine 干完活,sem 输出了一个 token之后,才能继续往里面塞。

在每个 goroutine 执行完毕后 defer 的 g.done 方法,则是完成了这一点:

1

2

3

4

5

6

func (g *Group) done() {

    if g.sem != nil {

        <-g.sem

    }

    g.wg.Done()

}

这样就把 sem 的用法串起来了。我们通过创建一个定长的channel来实现对于 goroutine 数量的管控,对于channel实际包含的元素并不关心,所以用一个空结构体省一省空间,这是非常优秀的设计,大家平常也可以参考。

TryGo

TryGo 和 SetLimit 这套体系其实都是不久前欧长坤大佬提交到 errgroup 的能力。

一如既往,所有带 TryXXX的函数,都不会阻塞。 其实办的事情非常简单,和 Go 方法一样,传进来一个 func() error来执行。

和 Go 方法的区别在于,如果判断 limit 已经不够了,此时不再阻塞,而是直接 return false,代表执行失败。其他的部分完全一样。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

// TryGo calls the given function in a new goroutine only if the number of

// active goroutines in the group is currently below the configured limit.

//

// The return value reports whether the goroutine was started.

func (g *Group) TryGo(f func() error) bool {

    if g.sem != nil {

        select {

        case g.sem <- token{}:

            // Note: this allows barging iff channels in general allow barging.

        default:

            return false

        }

    }

    g.wg.Add(1)

    go func() {

        defer g.done()

        if err := f(); err != nil {

            g.errOnce.Do(func() {

                g.err = err

                if g.cancel != nil {

                    g.cancel()

                }

            })

        }

    }()

    return true

}

使用方法

这里我们先看一个最常见的用法,针对一组任务,每一个都单独起 goroutine 执行,最后获取 Wait 返回的 err 作为整个 Group 的 err。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

package main

import (

    "errors"

    "fmt"

    "time"

    "golang.org/x/sync/errgroup"

)

func main() {

    var g errgroup.Group

    // 启动第一个子任务,它执行成功

    g.Go(func() error {

        time.Sleep(5 * time.Second)

        fmt.Println("exec #1")

        return nil

    })

    // 启动第二个子任务,它执行失败

    g.Go(func() error {

        time.Sleep(10 * time.Second)

        fmt.Println("exec #2")

        return errors.New("failed to exec #2")

    })

    // 启动第三个子任务,它执行成功

    g.Go(func() error {

        time.Sleep(15 * time.Second)

        fmt.Println("exec #3")

        return nil

    })

    // 等待三个任务都完成

    if err := g.Wait(); err == nil {

        fmt.Println("Successfully exec all")

    } else {

        fmt.Println("failed:", err)

    }

}

你会发现,最后 err 打印出来就是第二个子任务的 err。

当然,上面这个 case 是我们正好只有一个报错,但是,如果实际有多个任务都挂了呢?

从完备性来考虑,有没有什么办法能够将多个任务的错误都返回呢?

这一点其实 errgroup 库并没有提供非常好的支持,需要开发者自行做一些改造。因为 Group 中只有一个 err 变量,我们不可能基于 Group 来实现这一点。

通常情况下,我们会创建一个 slice 来存储 f() 执行的 err。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

package main

import (

    "errors"

    "fmt"

    "time"

    "golang.org/x/sync/errgroup"

)

func main() {

    var g errgroup.Group

    var result = make([]error, 3)

    // 启动第一个子任务,它执行成功

    g.Go(func() error {

        time.Sleep(5 * time.Second)

        fmt.Println("exec #1")

        result[0] = nil // 保存成功或者失败的结果

        return nil

    })

    // 启动第二个子任务,它执行失败

    g.Go(func() error {

        time.Sleep(10 * time.Second)

        fmt.Println("exec #2")

        result[1] = errors.New("failed to exec #2") // 保存成功或者失败的结果

        return result[1]

    })

    // 启动第三个子任务,它执行成功

    g.Go(func() error {

        time.Sleep(15 * time.Second)

        fmt.Println("exec #3")

        result[2] = nil // 保存成功或者失败的结果

        return nil

    })

    if err := g.Wait(); err == nil {

        fmt.Printf("Successfully exec all. result: %v\n", result)

    } else {

        fmt.Printf("failed: %v\n", result)

    }

}

可以看到,我们声明了一个 result slice,长度为 3。这里不用担心并发问题,因为每个 goroutine 读写的位置是确定唯一的。

本质上可以理解为,我们把 f() 返回的 err 不仅仅给了 Group 一份,还自己存了一份,当出错的时候,Wait 返回的错误我们不一定真的用,而是直接用自己错的这一个 error slice。

Go 官方文档中的利用 errgroup 实现 pipeline 的示例也很有参考意义:由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。

这里贴出来简化代码学习一下.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

package main

import (

    "context"

    "crypto/md5"

    "fmt"

    "io/ioutil"

    "log"

    "os"

    "path/filepath"

    "golang.org/x/sync/errgroup"

)

// Pipeline demonstrates the use of a Group to implement a multi-stage

// pipeline: a version of the MD5All function with bounded parallelism from

// https://blog.golang.org/pipelines.

func main() {

    m, err := MD5All(context.Background(), ".")

    if err != nil {

        log.Fatal(err)

    }

    for k, sum := range m {

        fmt.Printf("%s:\t%x\n", k, sum)

    }

}

type result struct {

    path string

    sum  [md5.Size]byte

}

// MD5All reads all the files in the file tree rooted at root and returns a map

// from file path to the MD5 sum of the file's contents. If the directory walk

// fails or any read operation fails, MD5All returns an error.

func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {

    // ctx is canceled when g.Wait() returns. When this version of MD5All returns

    // - even in case of error! - we know that all of the goroutines have finished

    // and the memory they were using can be garbage-collected.

    g, ctx := errgroup.WithContext(ctx)

    paths := make(chan string)

    g.Go(func() error {

        defer close(paths)

        return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {

            if err != nil {

                return err

            }

            if !info.Mode().IsRegular() {

                return nil

            }

            select {

            case paths <- path:

            case <-ctx.Done():

                return ctx.Err()

            }

            return nil

        })

    })

    // Start a fixed number of goroutines to read and digest files.

    c := make(chan result)

    const numDigesters = 20

    for i := 0; i < numDigesters; i++ {

        g.Go(func() error {

            for path := range paths {

                data, err := ioutil.ReadFile(path)

                if err != nil {

                    return err

                }

                select {

                case c <- result{path, md5.Sum(data)}:

                case <-ctx.Done():

                    return ctx.Err()

                }

            }

            return nil

        })

    }

    go func() {

        g.Wait()

        close(c)

    }()

    m := make(map[string][md5.Size]byte)

    for r := range c {

        m[r.path] = r.sum

    }

    // Check whether any of the goroutines failed. Since g is accumulating the

    // errors, we don't need to send them (or check for them) in the individual

    // results sent on the channel.

    if err := g.Wait(); err != nil {

        return nil, err

    }

    return m, nil

}

其实本质上还是 channel发挥了至关重要的作用,这里建议大家有时间尽量看看源文章:pkg.go.dev/golang.org/…

对于用 errgroup 实现 pipeline 模式有很大帮助。

结束语

今天我们学习了 errgroup 的源码已经新增的 SetLimit 能力,其实看过了 sync 相关的这些库,整体用到的能力其实大差不差,很多设计思想都是非常精巧的。比如 errgroup 中利用定长 channel 实现控制 goroutine 数量,空结构体节省内存空间。

并且 sync 包的实现一般都还是非常简洁的,比如 once,singleflight,semaphore 等。建议大家有空的话自己过一遍,对并发和设计模式的理解会更上一个台阶。

errgroup 本身并不复杂,业界也有很多封装实现,大家可以对照源码再思考一下还有什么可以改进的地方。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://juejin.cn/post/7126565797870174222
相关文章
  • 基于GORM实现CreateOrUpdate的方法
    CreateOrUpdate 是业务开发中很常见的场景,我们支持用户对某个业务实体进行创建/配置。希望实现的 repository 接口要达到以下两个要求: 如果
  • Golang中的内存逃逸的介绍
    什么是内存逃逸分析 内存逃逸分析是go的编译器在编译期间,根据变量的类型和作用域,确定变量是堆上还是栈上 简单说就是编译器在编译
  • Golang自旋锁的介绍
    自旋锁 获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。 它是为实现保护共享资源而提出的
  • Go语言读写锁RWMutex的源码

    Go语言读写锁RWMutex的源码
    在前面两篇文章中初见 Go Mutex、Go Mutex 源码详解,我们学习了Go语言中的Mutex,它是一把互斥锁,每次只允许一个goroutine进入临界区,可以保
  • Go项目实现优雅关机与平滑重启功能
    什么是优雅关机? 优雅关机就是服务端关机命令发出后不是立即关机,而是等待当前还在处理的请求全部处理完毕后再退出程序,是一种对
  • Go语言操作Excel利器之excelize类库的介绍
    在开发中一些需求需要通过程序操作excel文档,例如导出excel、导入excel、向excel文档中插入图片、表格和图表等信息,使用Excelize就可以方便
  • 利用Go语言快速实现一个极简任务调度系统

    利用Go语言快速实现一个极简任务调度系统
    任务调度(Task Scheduling)是很多软件系统中的重要组成部分,字面上的意思是按照一定要求分配运行一些通常时间较长的脚本或程序。在爬
  • GoLang中的iface 和 eface 的区别介绍

    GoLang中的iface 和 eface 的区别介绍
    GoLang之iface 和 eface 的区别是什么? iface和eface都是 Go 中描述接口的底层结构体,区别在于iface描述的接口包含方法,而eface则是不包含任何方
  • Golang接口使用的教程
    go语言并没有面向对象的相关概念,go语言提到的接口和java、c++等语言提到的接口不同,它不会显示的说明实现了接口,没有继承、子类、
  • go colly 爬虫实现示例介绍
    贡献某CC,go源码爬虫一个,基于colly,效果是根据输入的浏览器cookie及excel必要行列号,从excel中读取公司名称,查询公司法人及电话号码。
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计