Go单飞详解(singleflight)
应用场景
singleflight 包提供了一种阻止重复函数调用的机制,主要用在缓存击穿的情况下
某些热点数据的缓存过期或突然失效时,如果有大量的请求访问该数据,在数据写回到缓存前,这些请求都会统一打到DB上,可能会把DB压垮。
一般的方案有:
- 确保热点key不过期
- 热点数据读DB写入到缓存时,加个分布式锁
当然还有一种方式,与2类似,就是sync下的singleflight,直译过来就是单飞,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。
使用
singleflight.Group类型提供了三个方法:
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
- Do方法
- 接受一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误。
- 使用Do方法的时候,它会根据提供的Key判断是否去真正调用fn函数。
- 同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果。
- DoChan方法:
- 类似Do方法,只不过是一个异步调用。
- 它会返回一个通道,等fn函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
- Forget方法:
- 在SingleFlight中删除一个Key。
- 这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果。
下面看个简单示例:
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"log"
"sync"
)
var gsf singleflight.Group
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
// v 获取到的值,err 返回的error,shared 结果是否被共享
v, err, shared := gsf.Do("key", getNum)
if err != nil {
log.Print(err)
return
}
fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
}()
}
wg.Wait()
}
func getNum() (interface{}, error) {
fmt.Println("我被执行了,获取结果10")
return 10, nil
}
/*
output:
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
*/
可以发现getNum只被执行了一次,其他请求则是复用第一次请求的结果。
重点注意:单飞的fn函数执行完之后,就会把该key删除,之后再有相同的key时会再调一遍,可以参考下面的代码:
func main() {
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
if i == 3 {
time.Sleep(time.Second * 3)
}
go func() {
defer wg.Done()
// v 获取到的值,err 返回的error,shared 结果是否被共享
v, err, shared := gsf.Do("key", getNum)
if err != nil {
log.Print(err)
return
}
fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
}()
}
wg.Wait()
}
func getNum() (interface{}, error) {
fmt.Println("我被执行了,获取结果10")
time.Sleep(time.Second * 2)
return 10, nil
}
/*
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
我被执行了,获取结果10
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
获得结果:10,是否被共享:true
*/
可以看到,getNum被调用了两次
源码
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
// 这些字段在WaitGroup完成之前被写入一次,并且仅在WaitGroup完成之后被读取
val interface{}
err error
// 以前有个forgotten bool,但0.3.0版本是没有了的
// 只用在Shared = c.dups > 0,判断是否共享
dups int
chans []chan<- Result
}
type Result struct {
Val interface{}
Err error
Shared bool
}
singleflight.Group由一个互斥锁sync.Mutex和一个映射表组成,每一个singleflight.call结构体都保存了当前调用对应的信息
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
// 等待fn运行完
c.wg.Wait()
// 检测是fn造成panic,还是内部runtime.Goexit
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
先加锁,判断g.m[key]
是否存在,不存在则c.wg.Add
并执行fn,存在则c.wg.Wait
完成后处理结果。
其中Forget
方法只是删除了g.m中的key,所以在此之前:正在调用fn的请求还是会执行,c.wg.Wait
的请求也会获得fn的结果
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 判断是否正常退出fn函数
normalReturn := false
// 判断函数中是否panic
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
// 两个defer去判断是fn内部panic还是runtime.Goexit
defer func() {
// the given function invoked runtime.Goexit
// fn没有正常退出,且recovered仍为false
// 说明没走到最下面的 if !normalReturn 判断,认为发生了runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
// 删除该key,下次执行时遇到该key重新调fn去
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
// 写到ch中,DoChan方法调用的话会用到
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// 与Do类似,不再赘述
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
singleflight存在的问题
- 一人超时,全员超时
- 一人错,全员错
- singleflight是一把大锁,在高并发时锁冲突严重,故需要针对性的做些优化
- singleflight不携带context,缺少一些控制机制
参考文章
评论