Golang channel深入挖掘
tbghg

Go 程序员面试笔试宝典》学习整理

什么是CSP

虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

golang主张通过通信来实现内存共享而非通过共享内存来实现通信。这就是 Go 的并发哲学,它依赖 CSP 模型(Communicating Sequential Processes),基于 channel 实现。

CSP 全称是 “Communicating Sequential Processes”(通信顺序进程),是 Tony Hoare 在 1978 年发表在 ACM 的一篇论文。论文里指出一门编程语言应该重视 input 和 output 的原语,尤其是并发编程的代码。

Go 是第一个将 CSP 的这些思想引入,并且发扬光大的语言。仅管内存同步访问控制在某些情况下大有用处,Go 里也有相应的 sync 包支持,但是这在大型程序很容易出错。

大多数的编程语言的并发编程模型是基于线程和内存同步访问控制,Go 的并发编程的模型则用 goroutine 和 channel 来替代。Goroutine 和线程类似,channel 和 mutex (用于内存同步访问控制)类似。

Channel 则天生就可以和其他 channel 组合。我们可以把收集各种子系统结果的 channel 输入到同一个 channel。Channel 还可以和 select, cancel, timeout 结合起来。而 mutex 就没有这些功能。

Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。

channel底层数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// go 1.9.2
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters

// 保护 hchan 中所有字段
lock mutex
}
  • buf 指向底层循环数组,只有缓冲型的 channel 才有。
  • sendxrecvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。
  • sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。
  • waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:
1
2
3
4
type waitq struct {
first *sudog
last *sudog
}
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

创建channel

1
2
3
4
// var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

通道是引用类型,通道类型的空值是nil,声明的通道后需要使用make函数初始化之后才能使用。

1
2
3
4
5
6
7
// make(chan 元素类型, [缓冲大小])
// 无缓冲通道
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
// 有缓冲通道
ch7 := make(chan int,7)

最终创建 chan 的函数是 makechan

1
func makechan(t *chantype, size int64) *hchan

从函数原型来看,创建的 chan 是一个指针。所以我们能在函数间直接传递 channel,而不用传递 channel 的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// 省略了检查 channel size,align 的代码
// ……

var c *hchan
// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
// 只进行一次内存分配
if elem.kind&kindNoPointers != 0 size == 0 {
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
c.buf = unsafe.Pointer(c)
}
} else {
// 进行两次内存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循环数组长度
c.dataqsiz = uint(size)

// 返回 hchan 指针
return c
}

channel操作

简单使用

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

1
2
3
4
5
6
7
8
9
10
// PS: 下面就是个写法示例,不是用来跑的
// 创建
ch := make(chan int)
// 发送
ch <- 10 // 把10发送到ch中
// 接收
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
// 关闭
close(ch)

常见操作情况总结:

channel nil closed
接收 阻塞 阻塞 正常接收 有值就接着正常读取,没有值返回零值和false
发送 阻塞 正常发送 阻塞 panic
关闭 panic 关闭成功 关闭成功 panic

无缓冲通道

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。
相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}

有缓冲通道

使用make函数初始化通道的时候为其指定通道的容量

1
2
3
4
5
func main() {
ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
ch <- 10
fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量,当然如果容量等于零仍然会阻塞。

可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,不过一般不会这样去做。

单向通道

限制通道在函数中只能发送或只能接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// chan<- int是一个只能发送的通道,可以发送但是不能接收
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out
}

func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}

// <-chan int是一个只能接收的通道,可以接收但是不能发送
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}

func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}

优雅地关闭channel

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

有一条广泛流传的关闭 channel 的原则:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
(很经典也很重要)

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时关闭 channel。

比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样没法确定其他 sender 的情况,这时也不能贸然关闭 channel。

但是上面所说的并不是最本质的,最本质的原则就只有一条:

don’t close (or send values to) closed channels.

有两个不那么优雅地关闭 channel 的方法:

  1. 使用 defer-recover 机制,放心大胆地关闭 channel 或者向 channel 发送数据。即使发生了 panic,有 defer-recover 在兜底。
  2. 使用 sync.Once 来保证只关闭一次。

根据 sender 和 receiver 的个数,分下面几种情况:

  1. 一个 sender,一个 receiver
  2. 一个 sender, M 个 receiver
  3. N 个 sender,一个 reciver
  4. N 个 sender, M 个 receiver

对于 1,2,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了,没有问题。重点关注第 3,4 种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func main() {
rand.Seed(time.Now().UnixNano())
const Max = 100000 // 生成的随机数的最大值
const NumSenders = 1000 // 开启的sender数

dataCh := make(chan int, 100)
stopCh := make(chan struct{}) // 作为信号channel

// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
select {
case <- stopCh:
// 正常情况下会阻塞,所以不会执行
// 关闭情况下会读取0值,执行return
return
case dataCh <- rand.Intn(Max):
// 发送数据
}
}
}()
}

// the receiver
go func() {
for value := range dataCh {
// 收到了Max-1就不再接收数据了
if value == Max-1 {
fmt.Println("send stop signal to senders.")
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <- time.After(time.Hour):
}
}

上面的代码并没有明确关闭 dataCh,在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭channel,让 gc 代劳。

第四种情况,这里有 M 个 receiver,如果直接还是采取第 3 种解决方案,由 receiver 直接关闭 stopCh 的话,就会重复关闭一个 channel,导致 panic。因此需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求(这里发送请求后关闭的同样是stopCh,多个sender的情况下是不会由sender主动关闭dataCh的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func main() {
rand.Seed(time.Now().UnixNano())

const Max = 100000 // 生成的随机数的最大值
const NumReceivers = 10 // 开启的receiver数
const NumSenders = 1000 // 开启的sender数

dataCh := make(chan int, 100)
stopCh := make(chan struct{})

// 必须是一个缓冲通道,toStop 就是中间人的角色
// 接收 senders 和 receivers 发送过来的关闭 dataCh 请求。
toStop := make(chan string, 1)

var stoppedBy string

// 中间人,收到toStop后执行一次close(stopCh)就退出
go func() {
stoppedBy = <-toStop
close(stopCh)
}()

// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}

// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}

select {
case <- time.After(time.Hour):
}
}

这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。

如果,我们把 toStop 的容量声明成 Num(senders) + Num(receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
// toStop部分
toStop := make(chan string, NumReceivers + NumSenders)
// sender部分
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
// receiver部分
if value == Max-1 {
toStop <- "receiver#" + id
return
}

直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。

Channel引发的泄漏

Channel 可能会引发 goroutine 泄漏。

泄漏的原因是 goroutine 操作 channel 后,处于发送或接收阻塞状态,而 channel 处于满或空的状态,一直得不到改变。
同时,垃圾回收器也不会回收此类资源,进而导致 gouroutine 会一直处于等待队列中。

另外,程序运行过程中,对于一个 channel,如果没有任何 goroutine 引用了,gc 会对其进行回收操作,不会引起内存泄漏。

情境一:select-case 误用导致的内存泄露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// out:
// NumGoroutine: 2
// 超时
// NumGoroutine: 3

func TestLeakOfMemory(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory()
time.Sleep(time.Second * 3)
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory() {
errCh := make(chan error) // (1)
go func() { // (5)
time.Sleep(2 * time.Second)
errCh <- errors.New("chan error") // (2)
fmt.Println("finish sending")
}()

var err error
select {
case <-time.After(time.Second): // (3) 大家也经常在这里使用 <-ctx.Done()
fmt.Println("超时")
case err = <-errCh: // (4)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(nil)
}
}
}

根据输出结果(开始有两个 goroutine,结束时有三个 goroutine),我们可以知道,直到测试函数结束前,仍有一个 goroutine 没有退出。原因是由于 (1) 处创建的 errCh 是不含缓存队列的 channel,由于没有发送方往 errCh 发送数据,所以 (4) 处代码一直阻塞。直到 (3) 处超时后,打印“超时”,函数退出,(4) 处代码都未接收成功。而 (2) 处的所在的 goroutine 在“超时”被打印后,才开始发送。由于外部的 goroutine 已经退出了,errCh 没有接收者,导致 (2) 处一直阻塞。因此 (2) 处代码所在的协程一直未退出,造成了内存泄漏。如果代码中有许多类似的代码,或在 for 循环中使用了上述形式的代码,随着时间的增长会造成多个未退出的 gorouting,最终导致程序 OOM。

解决办法:为 channel 增加一个缓存队列,即把 (1) 处代码改为 errCh := make(chan error, 1)

情景二:for-range 误用导致的内存泄露

上述示例中只有一个发送者,且只发送一次,所以增加一个缓存队列即可。但在其他情况下,可能不止有一个发送者(或者不只发送一次),所以这个方案要求,缓存队列的容量需要和发送次数一致。一旦缓存队列容量被用完后,再有发送者发送就会阻塞发送者 goroutine。如果恰好此时接收者退出了,那么仍然至少会有一个 goroutine 无法退出,从而造成内存泄漏。就比如下面的代码。不知道经过上面的讲解,读者是否能够发现其中的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func TestLeakOfMemory2(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory2()
time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory2() {
ich := make(chan int, 100) // (3)
// sender
go func() {
defer close(ich)
for i := 0; i < 10000; i++ {
ich <- i
time.Sleep(time.Millisecond) // 控制一下,别发太快
}
}()
// receiver
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for i := range ich { // (2)
if ctx.Err() != nil { // (1)
fmt.Println(ctx.Err())
return
}
fmt.Println(i)
}
}()
}

// Output:
// NumGoroutine: 2
// 0
// 1
// ...(省略)...
// 789
// context deadline exceeded
// NumGoroutine: 3

我们聪明地使用了 channel 的缓存队列。我们以为我们循环发送,发完之后就会把 channel 关闭。而且我们使用 for range 获取 channel 的值,会一直获取,直到 channel 关闭。但在代码 (1) 处,接收者的 goroutine 中,我们加了一个判断语句。这会让代码 (2) 处的 channel 还没被接收完就退出了接收者 goroutine。尽管代码 (3) 处有缓存,但是因为发送 channel 在 for 循环中,缓存队列很快就会被占满,阻塞在第 101 的位置。所以这种情况我们要使用一个额外的 stop channel 来终结发送者所在的 goroutine。方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func TestLeakOfMemory2(t *testing.T) {
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
chanLeakOfMemory2()
time.Sleep(time.Second * 3) // 等待 goroutine 执行,防止过早输出结果
fmt.Println("NumGoroutine:", runtime.NumGoroutine())
}

func chanLeakOfMemory2() {
ich := make(chan int, 100)
stopCh := make(chan struct{})
// sender
go func() {
defer close(ich)
for i := 0; i < 10000; i++ {
select {
case <-stopCh:
return
case ich <- i:
}
time.Sleep(time.Millisecond) // 控制一下,别发太快
}
}()
// receiver
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for i := range ich {
if ctx.Err() != nil {
fmt.Println(ctx.Err())
close(stopCh)
return
}
fmt.Println(i)
}
}()
}

// Output:
// NumGoroutine: 2
// 0
// 1
// ...(省略)...
// 789
// context deadline exceeded
// NumGoroutine: 2

总之,通常情况下,我们只会遇到这两种 go channel 造成内存泄漏的情况(一个发送者导致的内存泄漏和多个发送者导致的内存泄漏)

不论发送者发送一次还是多次,如果接收者所在 goroutine 能够在接收完 channel 中的数据之后结束,那么就不会造成内存泄漏;或者说接收者能够在发送者停止发送后再结束,就不会造成内存泄露。

如果接收者需要在 channel 关闭之前提前退出,为防止内存泄漏,在发送者与接收者发送次数是一对一时,应设置 channel 缓冲队列为 1;在发送者与接收者的发送次数是多对多时,应使用专门的 stop channel 通知发送者关闭相应 channel

其实归根到底是一方不再对channel操作时没有向另一方发送信号,导致另一方阻塞,也就是说关闭channel的方式不对(并不特指close,包括不再操作由gc处理的情况),所以优雅关闭channel还是很重要的

Channel应用

  1. 停止信号(见优雅关闭channel)
  2. 定时任务(与timer结合)
  3. 实现超时控制
  4. 定期执行某个任务
  5. 解耦生产方和消费方
  6. 控制并发数

定时任务

1
2
3
4
5
select {
case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}

等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。

1
2
3
4
5
6
7
8
9
10
func worker() {
ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行定时任务
fmt.Println("执行 1s 定时任务")
}
}
}

每隔 1 秒种,执行一次定时任务。

解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main() {
taskCh := make(chan int, 100)
go worker(taskCh)

// 塞任务
for i := 0; i < 10; i++ {
taskCh <- i
}

// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}

func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i++ {
go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}

5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。

控制并发数

有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。

1
2
3
4
5
6
7
8
9
10
11
12
13
var limit = make(chan int, 3)

func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}

参考文章

 评论
评论插件加载失败
正在加载评论插件