广告位联系
返回顶部
分享到

深入理解Golang channel的应用

Golang 来源:互联网 作者:佚名 发布时间:2022-10-26 22:09:57 人浏览
摘要

channel是用于 goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

1

2

3

4

5

6

7

8

9

10

11

12

13

type hchan struct {

    qcount   uint           // total data in the queue

    dataqsiz uint           // size of the circular queue

    buf      unsafe.Pointer // points to an array of dataqsiz elements

    elemsize uint16

    closed   uint32

    elemtype *_type // element type

    sendx    uint   // send index

    recvx    uint   // receive index

    recvq    waitq  // list of recv waiters

    sendq    waitq  // list of send waiters

    lock mutex

}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

1

2

3

4

// 无缓冲通道

ch1 := make(chan int)

// 有缓冲通道

ch2 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

func makechan(t *chantype, size int) *hchan {

   elem := t.elem

     

   // mem:缓冲区大小

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))

   if overflow || mem > maxAlloc-hchanSize || size < 0 {

      panic(plainError( "makechan: size out of range" ))

   }

 

   var c *hchan

   switch {

   // 缓冲区大小为空,只申请hchanSize大小的内存

   case mem == 0:

       c = (*hchan)(mallocgc(hchanSize, nil, true))

       c.buf = c.raceaddr()

   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存

   case elem.ptrdata == 0:

       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))

       c.buf = add(unsafe.Pointer(c), hchanSize)

   // 否则就是带缓存,且有指针,分配两次内存

   default:

      // Elements contain pointers.

       c = new(hchan)

       c.buf = mallocgc(mem, elem, true)

   }

     

   // 保存元素类型,元素大小,容量

   c.elemsize = uint16(elem.size)

   c.elemtype = elem

   c.dataqsiz = uint(size)

   lockInit(&c.lock, lockRankHchan)

    

   return c

}

发送

执行以下代码时:

1

ch <- 3

编译器会转化为对chansend的调用

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

   // 如果channel是空

   if c == nil {

      // 非阻塞,直接返回

      if !block {

         return  false

      }

      // 否则阻塞当前协程

      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)

      throw( "unreachable" )

   }

 

   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回

   if !block && c.closed == 0 && full(c) {

      return  false

   }

 

   // 加锁

   lock(&c.lock)

 

   // 如果已经关闭,无法发送,直接panic

   if c.closed != 0 {

      unlock(&c.lock)

      panic(plainError( "send on closed channel" ))

   }

 

   // 从接收队列弹出一个协程的包装结构sudog

   if sg := c.recvq.dequeue(); sg != nil {

      // 如果能弹出,即有等到接收的协程,说明:

      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待

      // 将要发送的数据拷贝到该协程的接收指针上

      send(c, sg, ep, func() { unlock(&c.lock) }, 3)

      return  true

}

 

   // 缓冲区还有空间

   if c.qcount < c.dataqsiz {

      // qp:计算要发送到的位置的地址

      qp := chanbuf(c, c.sendx)

      // 将数据从ep拷贝到qp

      typedmemmove(c.elemtype, qp, ep)

      // 待发送位置移动

      c.sendx++

      // 由于是数组模拟队列,sendx到顶了需要归零

      if c.sendx == c.dataqsiz {

         c.sendx = 0

      }

      // 缓冲区数量++

      c.qcount++

      unlock(&c.lock)

      return  true

}

 

   // 往下就是缓冲区无数据,也没有等到接收协程的情况了

    

   // 如果是非阻塞模式,直接返回

   if !block {

      unlock(&c.lock)

      return  false

    }

 

   // 将当前协程包装成sudog,阻塞到channel上

   gp := getg()

   mysg := acquireSudog()

   mysg.releasetime = 0

   if t0 != 0 {

      mysg.releasetime = -1

   }

   

   mysg.elem = ep

   mysg.waitlink = nil

   mysg.g = gp

   mysg.isSelect = false

   mysg.c = c

   gp.waiting = mysg

   gp.param = nil

    

   // 当前协程进入发送等待队列

   c.sendq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)

   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    

 // 被唤醒后从这里开始执行

    

   KeepAlive(ep)

 

   if mysg != gp.waiting {

      throw( "G waiting list is corrupted" )

   }

   gp.waiting = nil

   gp.activeStackChans = false

   closed := !mysg.success

   gp.param = nil

   if mysg.releasetime > 0 {

      blockevent(mysg.releasetime-t0, 2)

   }

   mysg.c = nil

   releaseSudog(mysg)

   // 被唤醒后发现channel关闭了,panic

   if closed {

      if c.closed == 0 {

         throw( "chansend: spurious wakeup" )

      }

      panic(plainError( "send on closed channel" ))

   }

   return  true

}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

type sudog struct {

   // 协程

   g *g

   // 前一个,后一个指针

   next *sudog

   prev *sudog

   // 等到发送的数据在哪,等待从哪个位置接收数据

   elem unsafe.Pointer

   acquiretime int64

   releasetime int64

   ticket      uint32

   isSelect bool

   success bool

 

   parent   *sudog // semaRoot binary tree

   waitlink *sudog // g.waiting list or semaRoot

   waittail *sudog // semaRoot

   // 在哪个channel上等待

   c        *hchan // channel

}

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

1

2

3

4

5

6

7

8

9

func full(c *hchan) bool {

   // 无缓冲

   if c.dataqsiz == 0 {

      // 并且没有其他协程在等待

      return c.recvq.first == nil

   }

   // 有缓冲,但容量装满了

   return c.qcount == c.dataqsiz

}

2.send方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

/**

c:要操作的channel

sg:弹出的接收者协程

ep:要发送的数据在的位置

*/

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem

   if sg.elem != nil {

      sendDirect(c.elemtype, sg, ep)

      sg.elem = nil

   }

   gp := sg.g

   unlockf()

   gp.param = unsafe.Pointer(sg)

   sg.success = true

   if sg.releasetime != 0 {

      sg.releasetime = cputicks()

   }

   // 唤醒该接收者协程

   goready(gp, skip+1)

}

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

1

2

3

4

5

6

7

8

func chanrecv1(c *hchan, elem unsafe.Pointer) {

        chanrecv(c, elem, true)

}

 

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {

        _, received = chanrecv(c, elem, true)

        return

}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞

   if c == nil {

      if !block {

         return

   }

      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)

      throw( "unreachable" )

   }

 

   // 非阻塞,并且channel为空

   if !block && empty(c) {

      // 如果还没关闭,直接返回

   if atomic.Load(&c.closed) == 0 {

      return

   }

      // 否则已经关闭,

      // 如果为空,返回该类型的零值

   if empty(c) {

     if ep != nil {

        typedmemclr(c.elemtype, ep)

     }

     return  true, false

       }

   }

 

   lock(&c.lock)

    

   // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值

   if c.closed != 0 && c.qcount == 0 {

      unlock(&c.lock)

      if ep != nil {

         typedmemclr(c.elemtype, ep)

      }

      return  true, false

}

     

   // 如果有发送者正在阻塞,说明:

   // 1.无缓冲

   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待

   if sg := c.sendq.dequeue(); sg != nil {

      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文

      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

      return  true, true

}

     

   // 如果缓存区有数据,

   if c.qcount > 0 {

      // qp为缓冲区中下一次接收的位置

      qp := chanbuf(c, c.recvx)

      // 将数据从qp拷贝到ep

      if ep != nil {

         typedmemmove(c.elemtype, ep, qp)

      }

      typedmemclr(c.elemtype, qp)

      c.recvx++

      if c.recvx == c.dataqsiz {

         c.recvx = 0

      }

      c.qcount--

      unlock(&c.lock)

      return  true, true

}

 

   // 接下来就是既没有发送者在等待,也缓冲区也没数据

   if !block {

      unlock(&c.lock)

      return  false, false

}

 

   // 将当前协程包装成sudog,阻塞到channel中

   gp := getg()

   mysg := acquireSudog()

   mysg.releasetime = 0

   if t0 != 0 {

      mysg.releasetime = -1

   }

   // 记录接收地址

   mysg.elem = ep

   mysg.waitlink = nil

   gp.waiting = mysg

   mysg.g = gp

   mysg.isSelect = false

   mysg.c = c

   gp.param = nil

   c.recvq.enqueue(mysg)

 

   atomic.Store8(&gp.parkingOnChan, 1)

   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

 

   // 从这里唤醒

   if mysg != gp.waiting {

      throw( "G waiting list is corrupted" )

   }

   gp.waiting = nil

   gp.activeStackChans = false

   if mysg.releasetime > 0 {

      blockevent(mysg.releasetime-t0, 2)

   }

   success := mysg.success

   gp.param = nil

   mysg.c = nil

   releaseSudog(mysg)

   return  true, success

}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

/**

c:操作的channel

sg:阻塞的发送协程

ep:接收者接收数据的地址

*/

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep

   if c.dataqsiz == 0 {

      if ep != nil {

         recvDirect(c.elemtype, sg, ep)

      }

   // 接下来是有缓冲,且缓冲区满的情况  

   } else {

      // qp为channel缓冲区中,接收者下一次接收的地址

   qp := chanbuf(c, c.recvx)

      // 将数据从qp拷贝到ep

   if ep != nil {

         typedmemmove(c.elemtype, ep, qp)

    }

    // 将发送者的数据从sg.elem拷贝到qp

    typedmemmove(c.elemtype, qp, sg.elem)

    c.recvx++

    if c.recvx == c.dataqsiz {

       c.recvx = 0

    }

    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx

    c.sendx = c.recvx

}

   sg.elem = nil

   gp := sg.g

   unlockf()

   gp.param = unsafe.Pointer(sg)

   sg.success = true

   if sg.releasetime != 0 {

      sg.releasetime = cputicks()

   }

   // 唤醒发送者

   goready(gp, skip+1)

}

关闭

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

func closechan(c *hchan) {

   // 不能关闭空channel

   if c == nil {

      panic(plainError( "close of nil channel" ))

   }

 

   lock(&c.lock)

   // 不能重复关闭

   if c.closed != 0 {

      unlock(&c.lock)

      panic(plainError( "close of closed channel" ))

   }

 

   // 修改关闭状态

   c.closed = 1

 

   var glist gList

 

   // 释放所有的接收者协程,并为它们赋予零值

 for {

      sg := c.recvq.dequeue()

      if sg == nil {

         break

      }

      if sg.elem != nil {

         typedmemclr(c.elemtype, sg.elem)

         sg.elem = nil

      }

      if sg.releasetime != 0 {

         sg.releasetime = cputicks()

      }

      gp := sg.g

      gp.param = unsafe.Pointer(sg)

      sg.success = false

      glist.push(gp)

   }

 

   // 释放所有的发送者协程

 for {

      sg := c.sendq.dequeue()

      if sg == nil {

         break

     }

      sg.elem = nil

      if sg.releasetime != 0 {

         sg.releasetime = cputicks()

      }

      gp := sg.g

      gp.param = unsafe.Pointer(sg)

      sg.success = false

      glist.push(gp)

   }

   unlock(&c.lock)

 

   // 执行唤醒操作

 for !glist.empty() {

      gp := glist.pop()

      gp.schedlink = 0

      goready(gp, 3)

   }

}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://juejin.cn/post/7157663775955353614
相关文章
  • Go语言k8s kubernetes使用leader election实现选举

    Go语言k8s kubernetes使用leader election实现选举
    在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,
  • golang中的defer函数理解
    golang的defer 什么是defer defer的的官方文档:https://golang.org/ref/spec#Defer_statements go语言中defer可以完成延迟功能,当前函数执行完成后再执行defer的
  • Windows系统中搭建Go语言开发环境图文介绍

    Windows系统中搭建Go语言开发环境图文介绍
    本文详细讲述如何在 Windows 系统上搭建 Go语言的开发环境,以供借鉴或参考。文章将介绍Go语言的VSCode、GoLand、Vim三种开发环境,大家可以灵
  • 深入理解Golang channel的应用
    channel是用于 goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了
  • 基于GORM实现CreateOrUpdate的方法
    CreateOrUpdate 是业务开发中很常见的场景,我们支持用户对某个业务实体进行创建/配置。希望实现的 repository 接口要达到以下两个要求: 如果
  • Golang中的内存逃逸的介绍
    什么是内存逃逸分析 内存逃逸分析是go的编译器在编译期间,根据变量的类型和作用域,确定变量是堆上还是栈上 简单说就是编译器在编译
  • Golang自旋锁的介绍
    自旋锁 获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。 它是为实现保护共享资源而提出的
  • Go语言读写锁RWMutex的源码

    Go语言读写锁RWMutex的源码
    在前面两篇文章中初见 Go Mutex、Go Mutex 源码详解,我们学习了Go语言中的Mutex,它是一把互斥锁,每次只允许一个goroutine进入临界区,可以保
  • Go项目实现优雅关机与平滑重启功能
    什么是优雅关机? 优雅关机就是服务端关机命令发出后不是立即关机,而是等待当前还在处理的请求全部处理完毕后再退出程序,是一种对
  • Go语言操作Excel利器之excelize类库的介绍
    在开发中一些需求需要通过程序操作excel文档,例如导出excel、导入excel、向excel文档中插入图片、表格和图表等信息,使用Excelize就可以方便
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计