channel是用于 goroutine 之间的同步、通信的数据结构
channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率
channel的用途包括但不限于以下几点:
- 协程间通信,同步
- 定时任务:和timer结合
- 解耦生产方和消费方,实现阻塞队列
- 控制并发数
本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑
整体结构
Go channel的数据结构如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
|
qcount:已经存储了多少个元素
dataqsie:最多存储多少个元素,即缓冲区容量
buf:指向缓冲区的位置,实际上是一个数组
elemsize:每个元素占多大空间
closed:channel能够关闭,这里记录其关闭状态
elemtype:保存数据的类型信息,用于go运行时使用
sendx,recvx:
- 记录下一个要发送到的位置,下一次从哪里还是接收
- 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
- 因此channel的缓冲也被称为环形缓冲区
recvq,sendq:
当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送
lock:channel支持协程间并发访问,因此需要一把锁来保护
创建
创建channel会被编译器编译为调用makechan函数
1
2
3
4
|
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
|
会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值
可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// mem:缓冲区大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError( "makechan: size out of range" ))
}
var c *hchan
switch {
// 缓冲区大小为空,只申请hchanSize大小的内存
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// 元素类型不包含指针,一次性分配hchanSize+mem大小的内存
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 否则就是带缓存,且有指针,分配两次内存
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 保存元素类型,元素大小,容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
|
发送
执行以下代码时:
编译器会转化为对chansend的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果channel是空
if c == nil {
// 非阻塞,直接返回
if !block {
return false
}
// 否则阻塞当前协程
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw( "unreachable" )
}
// 非阻塞,没有关闭,且容量满了,无法发送,直接返回
if !block && c.closed == 0 && full(c) {
return false
}
// 加锁
lock(&c.lock)
// 如果已经关闭,无法发送,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError( "send on closed channel" ))
}
// 从接收队列弹出一个协程的包装结构sudog
if sg := c.recvq.dequeue(); sg != nil {
// 如果能弹出,即有等到接收的协程,说明:
// 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
// 将要发送的数据拷贝到该协程的接收指针上
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 缓冲区还有空间
if c.qcount < c.dataqsiz {
// qp:计算要发送到的位置的地址
qp := chanbuf(c, c.sendx)
// 将数据从ep拷贝到qp
typedmemmove(c.elemtype, qp, ep)
// 待发送位置移动
c.sendx++
// 由于是数组模拟队列,sendx到顶了需要归零
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区数量++
c.qcount++
unlock(&c.lock)
return true
}
// 往下就是缓冲区无数据,也没有等到接收协程的情况了
// 如果是非阻塞模式,直接返回
if !block {
unlock(&c.lock)
return false
}
// 将当前协程包装成sudog,阻塞到channel上
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 当前协程进入发送等待队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 被唤醒后从这里开始执行
KeepAlive(ep)
if mysg != gp.waiting {
throw( "G waiting list is corrupted" )
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
// 被唤醒后发现channel关闭了,panic
if closed {
if c.closed == 0 {
throw( "chansend: spurious wakeup" )
}
panic(plainError( "send on closed channel" ))
}
return true
}
|
整体流程为:
如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回
从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:
- 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
- 将要发送的数据拷贝到该协程的接收指针上,返回
- 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝
否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回
接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上
将协程阻塞到channel的等待队列时,将其包装成了sudog结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
type sudog struct {
// 协程
g *g
// 前一个,后一个指针
next *sudog
prev *sudog
// 等到发送的数据在哪,等待从哪个位置接收数据
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
// 在哪个channel上等待
c *hchan // channel
}
|
其目的是:
- g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
- elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝
来看看一些子函数:
1.判断channel是否是满的
1
2
3
4
5
6
7
8
9
|
func full(c *hchan) bool {
// 无缓冲
if c.dataqsiz == 0 {
// 并且没有其他协程在等待
return c.recvq.first == nil
}
// 有缓冲,但容量装满了
return c.qcount == c.dataqsiz
}
|
2.send方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/**
c:要操作的channel
sg:弹出的接收者协程
ep:要发送的数据在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒该接收者协程
goready(gp, skip+1)
}
|
接收
从channel中接收数据有几种写法:
根据带不带ok,决定用下面哪个方法
1
2
3
4
5
6
7
8
|
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
|
根据接不接收返回值,决定elem是不是nil
最终都会调用chanrecv方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 如果channel为nil,根据参数中是否阻塞来决定是否阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw( "unreachable" )
}
// 非阻塞,并且channel为空
if !block && empty(c) {
// 如果还没关闭,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
// 否则已经关闭,
// 如果为空,返回该类型的零值
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
lock(&c.lock)
// 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果有发送者正在阻塞,说明:
// 1.无缓冲
// 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
if sg := c.sendq.dequeue(); sg != nil {
// 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果缓存区有数据,
if c.qcount > 0 {
// qp为缓冲区中下一次接收的位置
qp := chanbuf(c, c.recvx)
// 将数据从qp拷贝到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 接下来就是既没有发送者在等待,也缓冲区也没数据
if !block {
unlock(&c.lock)
return false, false
}
// 将当前协程包装成sudog,阻塞到channel中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 记录接收地址
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 从这里唤醒
if mysg != gp.waiting {
throw( "G waiting list is corrupted" )
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
|
接收流程如为:
如果channel为nil,根据参数中是否阻塞来决定是否阻塞
如果channel已经关闭,且缓冲区没有元素,返回该类型零值
如果有发送者正在阻塞,说明:
- 要么是无缓冲
- 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
- 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者
如果缓存区有数据, 则从缓冲区将数据复制到ep,返回
接下来就是既没有发送者在等待,也缓冲区也没数据的情况:
将当前协程包装成sudog,阻塞到channel中
来看其中的子函数recv():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
/**
c:操作的channel
sg:阻塞的发送协程
ep:接收者接收数据的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
// 接下来是有缓冲,且缓冲区满的情况
} else {
// qp为channel缓冲区中,接收者下一次接收的地址
qp := chanbuf(c, c.recvx)
// 将数据从qp拷贝到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者的数据从sg.elem拷贝到qp
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送者
goready(gp, skip+1)
}
|
关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
func closechan(c *hchan) {
// 不能关闭空channel
if c == nil {
panic(plainError( "close of nil channel" ))
}
lock(&c.lock)
// 不能重复关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError( "close of closed channel" ))
}
// 修改关闭状态
c.closed = 1
var glist gList
// 释放所有的接收者协程,并为它们赋予零值
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 释放所有的发送者协程
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)
// 执行唤醒操作
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
|
关闭的流程比较简单,可以看出:
不能关闭空channel,不能重复关闭channel
先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:
接收者:会收到该类型的零值
这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑
发送者:会被唤醒,然后panic
因此不能在有多个sender的时候贸然关闭channel
|