广告位联系
返回顶部
分享到

Go并发编程sync.Cond的使用

Golang 来源:互联网 作者:酷站 发布时间:2022-05-03 21:25:53 人浏览
摘要

简介 Go标准库提供Cond原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond通常应用于等待某个条件的一组goroutine,等条件变为true的时候,其中一个goroutine或者所有的goroutine都

简介

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行。

Cond 是和某个条件相关,这个条件需要一组 goroutine 协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine 都会被阻塞住,只有这一组 goroutine 通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。

这个条件可以是我们自定义的 true/false 逻辑表达式。

但是 Cond 使用的比较少,因为在大部分场景下是可以被 Channel 和 WaitGroup 来替换的。

详细介绍

下面就是 Cond 的数据结构和对外提供的方法,Cond 内部维护了一个等待队列和锁实例。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

type Cond struct {

   noCopy noCopy

 

   // 锁

   L Locker

 

   // 等待队列

   notify  notifyList

   checker copyChecker

}

 

func NeWCond(l Locker) *Cond

func (c *Cond) Broadcast()

func (c *Cond) Signal()

func (c *Cond) Wait()

  • NeWCond: NeWCond 方法需要调用者传入一个 Locker 接口,这个接口就 Lock/UnLock 方法,所以我们可以传入一个 sync.Metex 对象

  • Signal:允许调用者唤醒一个等待当前 Cond 的 goroutine。如果 Cond 等待队列中有一个或者多个等待的 goroutine ,则从等待队列中移除第一个 goroutine 并把它唤醒

  • Broadcast:允许调用者唤醒所有等待当前 Cond 的 goroutine。如果 Cond 等待队列中有一个或者多个等待的 goroutine,则清空所有等待的 goroutine,并全部唤醒

  • Wait:会把调用者放入 Cond 的等待队列中并阻塞,直到被 Signal 或者 Broadcast 的方法从等待队列中移除并唤醒

案例:Redis连接池

可以看一下下面的代码,使用了 Cond 实现一个 Redis 的连接池,最关键的代码就是在链表为空的时候需要调用 Cond 的 Wait 方法,将 gorutine 进行阻塞。然后 goruntine 在使用完连接后,将连接返回池子后,需要通知其他阻塞的 goruntine 来获取连接。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

package main

 

import (

   "container/list"

   "fmt"

   "math/rand"

   "sync"

   "time"

)

 

// 连接池

type Pool struct {

   lock    sync.Mutex // 锁

   clients list.List  // 连接

   cond    *sync.Cond // cond实例

   close   bool       // 是否关闭

}

 

// Redis Client

type Client struct {

   id int32

}

 

// 创建Redis Client

func NewClient() *Client {

   return &Client{

      id: rand.Int31n(100000),

   }

}

 

// 关闭Redis Client

func (this *Client) Close() {

   fmt.Printf("Client:%d 正在关闭", this.id)

}

 

// 创建连接池

func NewPool(maxConnNum int) *Pool {

   pool := new(Pool)

   pool.cond = sync.NewCond(&pool.lock)

 

   // 创建连接

   for i := 0; i < maxConnNum; i++ {

      client := NewClient()

      pool.clients.PushBack(client)

   }

 

   return pool

}

 

// 从池子中获取连接

func (this *Pool) Pull() *Client {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   // 已关闭

   if this.close {

      fmt.Println("Pool is closed")

      return nil

   }

 

   // 如果连接池没有连接 需要阻塞

   for this.clients.Len() <= 0 {

      this.cond.Wait()

   }

 

   // 从链表中取出头节点,删除并返回

   ele := this.clients.Remove(this.clients.Front())

   return ele.(*Client)

}

 

// 将连接放回池子

func (this *Pool) Push(client *Client) {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   if this.close {

      fmt.Println("Pool is closed")

      return

   }

 

   // 向链表尾部插入一个连接

   this.clients.PushBack(client)

 

   // 唤醒一个正在等待的goruntine

   this.cond.Signal()

}

 

// 关闭池子

func (this *Pool) Close() {

   this.lock.Lock()

   defer this.lock.Unlock()

 

   // 关闭连接

   for e := this.clients.Front(); e != nil; e = e.Next() {

      client := e.Value.(*Client)

      client.Close()

   }

 

   // 重置数据

   this.close = true

   this.clients.Init()

}

 

func main() {

 

   var wg sync.WaitGroup

 

   pool := NewPool(3)

   for i := 1; i <= 10; i++ {

      wg.Add(1)

      go func(index int) {

 

         defer wg.Done()

 

         // 获取一个连接

         client := pool.Pull()

 

         fmt.Printf("Time:%s | 【goruntine#%d】获取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id)

         time.Sleep(time.Second * 5)

         fmt.Printf("Time:%s | 【goruntine#%d】使用完毕,将client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id)

 

         // 将连接放回池子

         pool.Push(client)

      }(i)

   }

 

   wg.Wait()

}

运行结果:

Time:15:10:25 | 【goruntine#7】获取到client[31847]
Time:15:10:25 | 【goruntine#5】获取到client[27887]
Time:15:10:25 | 【goruntine#10】获取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完毕,将client[27887]放回池子
Time:15:10:30 | 【goruntine#6】获取到client[27887]               
Time:15:10:30 | 【goruntine#10】使用完毕,将client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完毕,将client[31847]放回池子 
Time:15:10:30 | 【goruntine#1】获取到client[31847]               
Time:15:10:30 | 【goruntine#9】获取到client[98081]               
Time:15:10:35 | 【goruntine#6】使用完毕,将client[27887]放回池子
Time:15:10:35 | 【goruntine#3】获取到client[27887]              
Time:15:10:35 | 【goruntine#1】使用完毕,将client[31847]放回池子
Time:15:10:35 | 【goruntine#4】获取到client[31847]              
Time:15:10:35 | 【goruntine#9】使用完毕,将client[98081]放回池子
Time:15:10:35 | 【goruntine#2】获取到client[98081]              
Time:15:10:40 | 【goruntine#3】使用完毕,将client[27887]放回池子
Time:15:10:40 | 【goruntine#8】获取到client[27887]              
Time:15:10:40 | 【goruntine#2】使用完毕,将client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完毕,将client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完毕,将client[27887]放回池子

注意点

  • 在调用 Wait 方法前,需要先加锁,就像我上面例子中 Pull 方法也是先加锁

看一下源码就知道了,因为 Wait 方法的执行逻辑是先将 goruntine 添加到等待队列中,然后释放锁,然后阻塞,等唤醒后,会继续加锁。如果在调用 Wait 前不加锁,但是里面会解锁,执行的时候就会报错。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

//

//    c.L.Lock()

//    for !condition() {

//        c.Wait()

//    }

//    ... make use of condition ...

//    c.L.Unlock()

//

func (c *Cond) Wait() {

   c.checker.check()

    

   // 添加到等待队列

   t := runtime_notifyListAdd(&c.notify)

   c.L.Unlock()

    

   // 阻塞

   runtime_notifyListWait(&c.notify, t)

   c.L.Lock()

}

  • 还是 Wait 方法,在唤醒后需要继续检查 Cond 条件

就拿上面的 redis 连接案例来进行说明吧,我这里是使用了 for 循环来进行检测。如果将 for 循环改成使用 if,也就是只判断一次,会有什么问题?可以停下来先想想

上面说了调用者也可以使用 Broadcast 方法来唤醒 goruntine ,如果使用的是 Broadcast 方法,所有的 goruntine 都会被唤醒,然后大家都去链表中去获取 redis 连接了,就会出现部分 goruntine拿不到连接,

实际上没有那么多连接可以获取,因为每次只会放回一个连接到池子中。

1

2

3

4

5

6

7

8

// 如果连接池没有连接 需要阻塞

for this.clients.Len() <= 0 {

  this.cond.Wait()

}

 

// 获取连接

ele := this.clients.Remove(this.clients.Front())

return ele.(*Client)


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://juejin.cn/post/7093041338836320292
相关文章
  • 基于GORM实现CreateOrUpdate的方法
    CreateOrUpdate 是业务开发中很常见的场景,我们支持用户对某个业务实体进行创建/配置。希望实现的 repository 接口要达到以下两个要求: 如果
  • Golang中的内存逃逸的介绍
    什么是内存逃逸分析 内存逃逸分析是go的编译器在编译期间,根据变量的类型和作用域,确定变量是堆上还是栈上 简单说就是编译器在编译
  • Golang自旋锁的介绍
    自旋锁 获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。 它是为实现保护共享资源而提出的
  • Go语言读写锁RWMutex的源码

    Go语言读写锁RWMutex的源码
    在前面两篇文章中初见 Go Mutex、Go Mutex 源码详解,我们学习了Go语言中的Mutex,它是一把互斥锁,每次只允许一个goroutine进入临界区,可以保
  • Go项目实现优雅关机与平滑重启功能
    什么是优雅关机? 优雅关机就是服务端关机命令发出后不是立即关机,而是等待当前还在处理的请求全部处理完毕后再退出程序,是一种对
  • Go语言操作Excel利器之excelize类库的介绍
    在开发中一些需求需要通过程序操作excel文档,例如导出excel、导入excel、向excel文档中插入图片、表格和图表等信息,使用Excelize就可以方便
  • 利用Go语言快速实现一个极简任务调度系统

    利用Go语言快速实现一个极简任务调度系统
    任务调度(Task Scheduling)是很多软件系统中的重要组成部分,字面上的意思是按照一定要求分配运行一些通常时间较长的脚本或程序。在爬
  • GoLang中的iface 和 eface 的区别介绍

    GoLang中的iface 和 eface 的区别介绍
    GoLang之iface 和 eface 的区别是什么? iface和eface都是 Go 中描述接口的底层结构体,区别在于iface描述的接口包含方法,而eface则是不包含任何方
  • Golang接口使用的教程
    go语言并没有面向对象的相关概念,go语言提到的接口和java、c++等语言提到的接口不同,它不会显示的说明实现了接口,没有继承、子类、
  • go colly 爬虫实现示例介绍
    贡献某CC,go源码爬虫一个,基于colly,效果是根据输入的浏览器cookie及excel必要行列号,从excel中读取公司名称,查询公司法人及电话号码。
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计