Go单飞详解(singleflight)
tbghg

应用场景

singleflight 包提供了一种阻止重复函数调用的机制,主要用在缓存击穿的情况下

某些热点数据的缓存过期或突然失效时,如果有大量的请求访问该数据,在数据写回到缓存前,这些请求都会统一打到DB上,可能会把DB压垮。

一般的方案有:

  1. 确保热点key不过期
  2. 热点数据读DB写入到缓存时,加个分布式锁

当然还有一种方式,与2类似,就是sync下的singleflight,直译过来就是单飞,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。

使用

singleflight.Group类型提供了三个方法:

1
2
3
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
  • Do方法
    • 接受一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误。
    • 使用Do方法的时候,它会根据提供的Key判断是否去真正调用fn函数。
    • 同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果。
  • DoChan方法:
    • 类似Do方法,只不过是一个异步调用。
    • 它会返回一个通道,等fn函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget方法:
    • 在SingleFlight中删除一个Key。
    • 这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果。

下面看个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"golang.org/x/sync/singleflight"
"log"
"sync"
)

var gsf singleflight.Group

func main() {
var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// v 获取到的值,err 返回的error,shared 结果是否被共享
v, err, shared := gsf.Do("key", getNum)
if err != nil {
log.Print(err)
return
}
fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
}()
}
wg.Wait()
}

func getNum() (interface{}, error) {
fmt.Println("我被执行了,获取结果10")
return 10, nil
}
/*
output:
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
*/

可以发现getNum只被执行了一次,其他请求则是复用第一次请求的结果。

重点注意:单飞的fn函数执行完之后,就会把该key删除,之后再有相同的key时会再调一遍,可以参考下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func main() {
var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
if i == 3 {
time.Sleep(time.Second * 3)
}
go func() {
defer wg.Done()
// v 获取到的值,err 返回的error,shared 结果是否被共享
v, err, shared := gsf.Do("key", getNum)
if err != nil {
log.Print(err)
return
}
fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
}()
}
wg.Wait()
}

func getNum() (interface{}, error) {
fmt.Println("我被执行了,获取结果10")
time.Sleep(time.Second * 2)
return 10, nil
}
/*
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
*/

可以看到,getNum被调用了两次

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Group struct {
mu sync.Mutex
m map[string]*call
}

type call struct {
wg sync.WaitGroup

// 这些字段在WaitGroup完成之前被写入一次,并且仅在WaitGroup完成之后被读取
val interface{}
err error

// 以前有个forgotten bool,但0.3.0版本是没有了的

// 只用在Shared = c.dups > 0,判断是否共享
dups int
chans []chan<- Result
}

type Result struct {
Val interface{}
Err error
Shared bool
}

singleflight.Group由一个互斥锁sync.Mutex和一个映射表组成,每一个singleflight.call结构体都保存了当前调用对应的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
// 等待fn运行完
c.wg.Wait()

// 检测是fn造成panic,还是内部runtime.Goexit
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}

func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

先加锁,判断g.m[key]是否存在,不存在则c.wg.Add并执行fn,存在则c.wg.Wait完成后处理结果。

其中Forget方法只是删除了g.m中的key,所以在此之前:正在调用fn的请求还是会执行,c.wg.Wait的请求也会获得fn的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 判断是否正常退出fn函数
normalReturn := false
// 判断函数中是否panic
recovered := false

// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
// 两个defer去判断是fn内部panic还是runtime.Goexit
defer func() {
// the given function invoked runtime.Goexit
// fn没有正常退出,且recovered仍为false
// 说明没走到最下面的 if !normalReturn 判断,认为发生了runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}

g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
// 删除该key,下次执行时遇到该key重新调fn去
if g.m[key] == c {
delete(g.m, key)
}

if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
// 写到ch中,DoChan方法调用的话会用到
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()

c.val, c.err = fn()
normalReturn = true
}()

if !normalReturn {
recovered = true
}
}

// 与Do类似,不再赘述
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

go g.doCall(c, key, fn)

return ch
}

singleflight存在的问题

  • 一人超时,全员超时
  • 一人错,全员错
  • singleflight是一把大锁,在高并发时锁冲突严重,故需要针对性的做些优化
  • singleflight不携带context,缺少一些控制机制

参考文章

 评论
评论插件加载失败
正在加载评论插件