Go单飞详解(singleflight)
tbghg

应用场景

singleflight 包提供了一种阻止重复函数调用的机制,主要用在缓存击穿的情况下

某些热点数据的缓存过期或突然失效时,如果有大量的请求访问该数据,在数据写回到缓存前,这些请求都会统一打到DB上,可能会把DB压垮。

一般的方案有:

  1. 确保热点key不过期
  2. 热点数据读DB写入到缓存时,加个分布式锁

当然还有一种方式,与2类似,就是sync下的singleflight,直译过来就是单飞,这个库的主要作用就是将一组相同的请求合并成一个请求,实际上只会去请求一次,然后对所有的请求返回相同的结果。

使用

singleflight.Group类型提供了三个方法:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
func (g *Group) Forget(key string)
  • Do方法
    • 接受一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误。
    • 使用Do方法的时候,它会根据提供的Key判断是否去真正调用fn函数。
    • 同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果。
  • DoChan方法:
    • 类似Do方法,只不过是一个异步调用。
    • 它会返回一个通道,等fn函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget方法:
    • 在SingleFlight中删除一个Key。
    • 这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果。

下面看个简单示例:

package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "log"
    "sync"
)

var gsf singleflight.Group

func main() {
    var wg sync.WaitGroup
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            // v 获取到的值,err 返回的error,shared 结果是否被共享
            v, err, shared := gsf.Do("key", getNum)
            if err != nil {
                log.Print(err)
                return
            }
            fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
        }()
    }
    wg.Wait()
}

func getNum() (interface{}, error) {
    fmt.Println("我被执行了,获取结果10")
    return 10, nil
}
/*
output:
    我被执行了,获取结果10
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
*/

可以发现getNum只被执行了一次,其他请求则是复用第一次请求的结果。

重点注意:单飞的fn函数执行完之后,就会把该key删除,之后再有相同的key时会再调一遍,可以参考下面的代码:

func main() {
    var wg sync.WaitGroup
    wg.Add(10)

    for i := 0; i < 10; i++ {
        if i == 3 {
            time.Sleep(time.Second * 3)
        }
        go func() {
            defer wg.Done()
            // v 获取到的值,err 返回的error,shared 结果是否被共享
            v, err, shared := gsf.Do("key", getNum)
            if err != nil {
                log.Print(err)
                return
            }
            fmt.Printf("获得结果:%v,是否被共享:%v\n", v, shared)
        }()
    }
    wg.Wait()
}

func getNum() (interface{}, error) {
    fmt.Println("我被执行了,获取结果10")
    time.Sleep(time.Second * 2)
    return 10, nil
}
/*
    我被执行了,获取结果10
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    我被执行了,获取结果10
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
    获得结果:10,是否被共享:true
*/

可以看到,getNum被调用了两次

源码

type Group struct {
    mu sync.Mutex
    m  map[string]*call
}

type call struct {
    wg sync.WaitGroup

    // 这些字段在WaitGroup完成之前被写入一次,并且仅在WaitGroup完成之后被读取
    val interface{}
    err error
    
    // 以前有个forgotten bool,但0.3.0版本是没有了的
    
    // 只用在Shared = c.dups > 0,判断是否共享
    dups  int
    chans []chan<- Result
}

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

singleflight.Group由一个互斥锁sync.Mutex和一个映射表组成,每一个singleflight.call结构体都保存了当前调用对应的信息

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        g.mu.Unlock()
        // 等待fn运行完
        c.wg.Wait()

        // 检测是fn造成panic,还是内部runtime.Goexit
        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        return c.val, c.err, true
    }
    c := new(call)
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    g.doCall(c, key, fn)
    return c.val, c.err, c.dups > 0
}

func (g *Group) Forget(key string) {
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
}

先加锁,判断g.m[key]是否存在,不存在则c.wg.Add并执行fn,存在则c.wg.Wait完成后处理结果。

其中Forget方法只是删除了g.m中的key,所以在此之前:正在调用fn的请求还是会执行,c.wg.Wait的请求也会获得fn的结果

// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    // 判断是否正常退出fn函数
    normalReturn := false
    // 判断函数中是否panic
    recovered := false

    // use double-defer to distinguish panic from runtime.Goexit,
    // more details see https://golang.org/cl/134395
    // 两个defer去判断是fn内部panic还是runtime.Goexit
    defer func() {
        // the given function invoked runtime.Goexit
        // fn没有正常退出,且recovered仍为false
        // 说明没走到最下面的 if !normalReturn 判断,认为发生了runtime.Goexit
        if !normalReturn && !recovered {
            c.err = errGoexit
        }

        g.mu.Lock()
        defer g.mu.Unlock()
        c.wg.Done()
        // 删除该key,下次执行时遇到该key重新调fn去
        if g.m[key] == c {
            delete(g.m, key)
        }

        if e, ok := c.err.(*panicError); ok {
            // In order to prevent the waiting channels from being blocked forever,
            // needs to ensure that this panic cannot be recovered.
            if len(c.chans) > 0 {
                go panic(e)
                select {} // Keep this goroutine around so that it will appear in the crash dump.
            } else {
                panic(e)
            }
        } else if c.err == errGoexit {
            // Already in the process of goexit, no need to call again
        } else {
            // Normal return
            // 写到ch中,DoChan方法调用的话会用到
            for _, ch := range c.chans {
                ch <- Result{c.val, c.err, c.dups > 0}
            }
        }
    }()

    func() {
        defer func() {
            if !normalReturn {
                // Ideally, we would wait to take a stack trace until we've determined
                // whether this is a panic or a runtime.Goexit.
                //
                // Unfortunately, the only way we can distinguish the two is to see
                // whether the recover stopped the goroutine from terminating, and by
                // the time we know that, the part of the stack trace relevant to the
                // panic has been discarded.
                if r := recover(); r != nil {
                    c.err = newPanicError(r)
                }
            }
        }()

        c.val, c.err = fn()
        normalReturn = true
    }()

    if !normalReturn {
        recovered = true
    }
}

// 与Do类似,不再赘述
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()
    
    go g.doCall(c, key, fn)

    return ch
}

singleflight存在的问题

  • 一人超时,全员超时
  • 一人错,全员错
  • singleflight是一把大锁,在高并发时锁冲突严重,故需要针对性的做些优化
  • singleflight不携带context,缺少一些控制机制

参考文章

 评论