一篇帶給你Go并發(fā)編程Singleflight
這一篇文章的內(nèi)容是在 Week05: 評(píng)論系統(tǒng)架構(gòu)設(shè)計(jì) 當(dāng)中的可用性設(shè)計(jì)當(dāng)中提到的,但是這個(gè)屬于 Go 官方擴(kuò)展同步包 (golang.org/x/sync/singleflight) 的一個(gè)庫,為了讓內(nèi)容統(tǒng)一就放到這里了。
SingleFlight
為什么我們需要 SingleFlight(使用場景)?
一般情況下我們在寫一寫對(duì)外的服務(wù)的時(shí)候都會(huì)有一層 cache 作為緩存,用來減少底層數(shù)據(jù)庫的壓力,但是在遇到例如 redis 抖動(dòng)或者其他情況可能會(huì)導(dǎo)致大量的 cache miss 出現(xiàn)。
如下圖所示,可能存在來自桌面端和移動(dòng)端的用戶有 1000 的并發(fā)請(qǐng)求,他們都訪問的獲取文章列表的接口,獲取前 20 條信息,如果這個(gè)時(shí)候我們服務(wù)直接去訪問 redis 出現(xiàn) cache miss 那么我們就會(huì)去請(qǐng)求 1000 次數(shù)據(jù)庫,這時(shí)可能會(huì)給數(shù)據(jù)庫帶來較大的壓力(這里的 1000 只是一個(gè)例子,實(shí)際上可能遠(yuǎn)大于這個(gè)值)導(dǎo)致我們的服務(wù)異?;蛘叱瑫r(shí)。
這時(shí)候就可以使用 singleflight 庫了,直譯過來就是單飛,這個(gè)庫的主要作用就是將一組相同的請(qǐng)求合并成一個(gè)請(qǐng)求,實(shí)際上只會(huì)去請(qǐng)求一次,然后對(duì)所有的請(qǐng)求返回相同的結(jié)果。如下圖所示,使用 singleflight 之后,我們在一個(gè)請(qǐng)求的時(shí)間周期內(nèi)實(shí)際上只會(huì)向底層的數(shù)據(jù)庫發(fā)起一次請(qǐng)求大大減少對(duì)數(shù)據(jù)庫的壓力。
SingleFlight 包怎么用(使用教程)?
函數(shù)簽名
主要是一個(gè) Group 結(jié)構(gòu)體,三個(gè)方法,具體信息看下方注釋
- type Group
 - // Do 執(zhí)行函數(shù), 對(duì)同一個(gè) key 多次調(diào)用的時(shí)候,在第一次調(diào)用沒有執(zhí)行完的時(shí)候
 - // 只會(huì)執(zhí)行一次 fn 其他的調(diào)用會(huì)阻塞住等待這次調(diào)用返回
 - // v, err 是傳入的 fn 的返回值
 - // shared 表示是否真正執(zhí)行了 fn 返回的結(jié)果,還是返回的共享的結(jié)果
 - func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)
 - // DoChan 和 Do 類似,只是 DoChan 返回一個(gè) channel,也就是同步與異步的區(qū)別
 - func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
 - // Forget 用于通知 Group 刪除某個(gè) key 這樣后面繼續(xù)這個(gè) key 的調(diào)用的時(shí)候就不會(huì)在阻塞等待了
 - func (g *Group) Forget(key string)
 
使用示例
接下來我們看看實(shí)際上我們是怎么使用的,先使用一個(gè)普通的例子,這時(shí)一個(gè)獲取文章詳情的函數(shù),我們在函數(shù)里面使用一個(gè) count 模擬不同并發(fā)下的耗時(shí)的不同,并發(fā)越多請(qǐng)求耗時(shí)越多
- func getArticle(id int) (article string, err error) {
 - // 假設(shè)這里會(huì)對(duì)數(shù)據(jù)庫進(jìn)行調(diào)用, 模擬不同并發(fā)下耗時(shí)不同
 - atomic.AddInt32(&count, 1)
 - time.Sleep(time.Duration(count) * time.Millisecond)
 - return fmt.Sprintf("article: %d", id), nil
 - }
 
我們使用 singleflight 的時(shí)候就只需要 new(singleflight.Group) 然后調(diào)用一下相對(duì)應(yīng)的 Do 方法就可了,是不是很簡單
- func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
 - v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
 - return getArticle(id)
 - })
 - return v.(string), err
 - }
 
效果測試
光說不練假把式,寫一個(gè)簡單的測試代碼,下面我們啟動(dòng) 1000 個(gè) Goroutine 去并發(fā)調(diào)用這兩個(gè)方法
- var count int32
 - func main() {
 - time.AfterFunc(1*time.Second, func() {
 - atomic.AddInt32(&count, -count)
 - })
 - var (
 - wg sync.WaitGroup
 - now = time.Now()
 - n = 1000
 - sg = &singleflight.Group{}
 - )
 - for i := 0; i < n; i++ {
 - wg.Add(1)
 - go func() {
 - // res, _ := singleflightGetArticle(sg, 1)
 - res, _ := getArticle(1)
 - if res != "article: 1" {
 - panic("err")
 - }
 - wg.Done()
 - }()
 - }
 - wg.Wait()
 - fmt.Printf("同時(shí)發(fā)起 %d 次請(qǐng)求,耗時(shí): %s", n, time.Since(now))
 - }
 
可以看到這個(gè)是調(diào)用 getArticle 方法的耗時(shí),花費(fèi)了 1s 多
- # 直接調(diào)用的請(qǐng)求耗時(shí)
 - ❯ go run ./1.go
 - 同時(shí)發(fā)起 1000 次請(qǐng)求,耗時(shí): 1.0022831s
 
而使用 singleflight 的方法,花費(fèi)了不到 3ms
- # 使用 singleflight 的請(qǐng)求耗時(shí)
 - ❯ go run ./1.go
 - 同時(shí)發(fā)起 1000 次請(qǐng)求,耗時(shí): 2.5119ms
 
當(dāng)然每個(gè)庫都有自己的使用場景,軟件領(lǐng)域里面沒有銀彈,如果我們用的不太好的話甚至可能會(huì)得到適得其反的效果,而多看源碼不僅能夠幫助我們進(jìn)行學(xué)習(xí),也可以盡量少踩坑
它是如何實(shí)現(xiàn)的(源碼分析)?
本文基于 [https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight](https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/singleflight) 進(jìn)行分析,這個(gè)庫的實(shí)現(xiàn)很簡單,但是功能很強(qiáng)大,還有一些小技巧,非常值得學(xué)習(xí)
Group
- type Group struct {
 - mu sync.Mutex // protects m
 - m map[string]*call // lazily initialized
 - }
 
Group 結(jié)構(gòu)體由一個(gè)互斥鎖和一個(gè) map 組成,可以看到注釋 map 是懶加載的,所以 Group 只要聲明就可以使用,不用進(jìn)行額外的初始化零值就可以直接使用。call 保存了當(dāng)前調(diào)用對(duì)應(yīng)的信息,map 的鍵就是我們調(diào)用 Do 方法傳入的 key
- type call struct {
 - wg sync.WaitGroup
 - // 函數(shù)的返回值,在 wg 返回前只會(huì)寫入一次
 - val interface{}
 - err error
 - // 使用調(diào)用了 Forgot 方法
 - forgotten bool
 - // 統(tǒng)計(jì)調(diào)用次數(shù)以及返回的 channel
 - dups int
 - chans []chan<- Result
 - }
 
Do
- func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
 - g.mu.Lock()
 - // 前面提到的懶加載
 - if g.m == nil {
 - g.m = make(map[string]*call)
 - }
 - // 會(huì)先去看 key 是否已經(jīng)存在
 - if c, ok := g.m[key]; ok {
 - // 如果存在就會(huì)解鎖
 - c.dups++
 - g.mu.Unlock()
 - // 然后等待 WaitGroup 執(zhí)行完畢,只要一執(zhí)行完,所有的 wait 都會(huì)被喚醒
 - c.wg.Wait()
 - // 這里區(qū)分 panic 錯(cuò)誤和 runtime 的錯(cuò)誤,避免出現(xiàn)死鎖,后面可以看到為什么這么做
 - if e, ok := c.err.(*panicError); ok {
 - panic(e)
 - } else if c.err == errGoexit {
 - runtime.Goexit()
 - }
 - return c.val, c.err, true
 - }
 - // 如果我們沒有找到這個(gè) key 就 new call
 - c := new(call)
 - // 然后調(diào)用 waitgroup 這里只有第一次調(diào)用會(huì) add 1,其他的都會(huì)調(diào)用 wait 阻塞掉
 - // 所以這要這次調(diào)用返回,所有阻塞的調(diào)用都會(huì)被喚醒
 - c.wg.Add(1)
 - g.m[key] = c
 - g.mu.Unlock()
 - // 然后我們調(diào)用 doCall 去執(zhí)行
 - g.doCall(c, key, fn)
 - return c.val, c.err, c.dups > 0
 - }
 
doCall
這個(gè)方法的實(shí)現(xiàn)有點(diǎn)意思,使用了兩個(gè) defer 巧妙的將 runtime 的錯(cuò)誤和我們傳入 function 的 panic 區(qū)別開來避免了由于傳入的 function panic 導(dǎo)致的死鎖
- func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
 - normalReturn := false
 - recovered := false
 - // 第一個(gè) defer 檢查 runtime 錯(cuò)誤
 - defer func() {
 - }()
 - // 使用一個(gè)匿名函數(shù)來執(zhí)行
 - func() {
 - defer func() {
 - if !normalReturn {
 - // 如果 panic 了我們就 recover 掉,然后 new 一個(gè) panic 的錯(cuò)誤
 - // 后面在上層重新 panic
 - if r := recover(); r != nil {
 - c.err = newPanicError(r)
 - }
 - }
 - }()
 - c.val, c.err = fn()
 - // 如果 fn 沒有 panic 就會(huì)執(zhí)行到這一步,如果 panic 了就不會(huì)執(zhí)行到這一步
 - // 所以可以通過這個(gè)變量來判斷是否 panic 了
 - normalReturn = true
 - }()
 - // 如果 normalReturn 為 false 就表示,我們的 fn panic 了
 - // 如果執(zhí)行到了這一步,也說明我們的 fn recover 住了,不是直接 runtime exit
 - if !normalReturn {
 - recovered = true
 - }
 - }
 
再來看看第一個(gè) defer 中的代碼
- defer func() {
 - // 如果既沒有正常執(zhí)行完畢,又沒有 recover 那就說明需要直接退出了
 - if !normalReturn && !recovered {
 - c.err = errGoexit
 - }
 - c.wg.Done()
 - g.mu.Lock()
 - defer g.mu.Unlock()
 - // 如果已經(jīng) forgot 過了,就不要重復(fù)刪除這個(gè) key 了
 - if !c.forgotten {
 - delete(g.m, key)
 - }
 - if e, ok := c.err.(*panicError); ok {
 - // 如果返回的是 panic 錯(cuò)誤,為了避免 channel 死鎖,我們需要確保這個(gè) panic 無法被恢復(fù)
 - if len(c.chans) > 0 {
 - go panic(e)
 - select {} // Keep this goroutine around so that it will appear in the crash dump.
 - } else {
 - panic(e)
 - }
 - } else if c.err == errGoexit {
 - // 已經(jīng)準(zhǔn)備退出了,也就不用做其他操作了
 - } else {
 - // 正常情況下向 channel 寫入數(shù)據(jù)
 - for _, ch := range c.chans {
 - ch <- Result{c.val, c.err, c.dups > 0}
 - }
 - }
 - }()
 
DoChan
Do chan 和 Do 類似,其實(shí)就是一個(gè)是同步等待,一個(gè)是異步返回,主要實(shí)現(xiàn)上就是,如果調(diào)用 DoChan 會(huì)給 call.chans 添加一個(gè) channel 這樣等第一次調(diào)用執(zhí)行完畢之后就會(huì)循環(huán)向這些 channel 寫入數(shù)據(jù)
- func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 - ch := make(chan Result, 1)
 - g.mu.Lock()
 - if g.m == nil {
 - g.m = make(map[string]*call)
 - }
 - if c, ok := g.m[key]; ok {
 - c.dups++
 - c.chans = append(c.chans, ch)
 - g.mu.Unlock()
 - return ch
 - }
 - c := &call{chans: []chan<- Result{ch}}
 - c.wg.Add(1)
 - g.m[key] = c
 - g.mu.Unlock()
 - go g.doCall(c, key, fn)
 - return ch
 - }
 
Forget
forget 用于手動(dòng)釋放某個(gè) key 下次調(diào)用就不會(huì)阻塞等待了
- func (g *Group) Forget(key string) {
 - g.mu.Lock()
 - if c, ok := g.m[key]; ok {
 - c.forgotten = true
 - }
 - delete(g.m, key)
 - g.mu.Unlock()
 - }
 
有哪些注意事項(xiàng)(避坑指南)?
單飛雖好但也不要濫用哦,還是存在一些坑的
1. 一個(gè)阻塞,全員等待
使用 singleflight 我們比較常見的是直接使用 Do 方法,但是這個(gè)極端情況下會(huì)導(dǎo)致整個(gè)程序 hang 住,如果我們的代碼出點(diǎn)問題,有一個(gè)調(diào)用 hang 住了,那么會(huì)導(dǎo)致所有的請(qǐng)求都 hang 住
還是之前的例子,我們加一個(gè) select 模擬阻塞
- func singleflightGetArticle(sg *singleflight.Group, id int) (string, error) {
 - v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
 - // 模擬出現(xiàn)問題,hang 住
 - select {}
 - return getArticle(id)
 - })
 - return v.(string), err
 - }
 
執(zhí)行就會(huì)發(fā)現(xiàn)死鎖了
- fatal error: all goroutines are asleep - deadlock!
 - goroutine 1 [select (no cases)]:
 
這時(shí)候我們可以使用 DoChan 結(jié)合 select 做超時(shí)控制
- func singleflightGetArticle(ctx context.Context, sg *singleflight.Group, id int) (string, error) {
 - result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
 - // 模擬出現(xiàn)問題,hang 住
 - select {}
 - return getArticle(id)
 - })
 - select {
 - case r := <-result:
 - return r.Val.(string), r.Err
 - case <-ctx.Done():
 - return "", ctx.Err()
 - }
 - }
 
調(diào)用的時(shí)候傳入一個(gè)含 超時(shí)的 context 即可,執(zhí)行時(shí)就會(huì)返回超時(shí)錯(cuò)誤
- ❯ go run ./1.go
 - panic: context deadline exceeded
 
2. 一個(gè)出錯(cuò),全部出錯(cuò)
這個(gè)本身不是什么問題,因?yàn)?singleflight 就是這么設(shè)計(jì)的,但是實(shí)際使用的時(shí)候 如果我們一次調(diào)用要 1s,我們的數(shù)據(jù)庫請(qǐng)求或者是 下游服務(wù)可以支撐 10rps 的請(qǐng)求的時(shí)候這會(huì)導(dǎo)致我們的錯(cuò)誤閾提高,因?yàn)閷?shí)際上我們可以一秒內(nèi)嘗試 10 次,但是用了 singleflight 之后只能嘗試一次,只要出錯(cuò)這段時(shí)間內(nèi)的所有請(qǐng)求都會(huì)受影響
這種情況我們可以啟動(dòng)一個(gè) Goroutine 定時(shí) forget 一下,相當(dāng)于將 rps 從 1rps 提高到了 10rps
- go func() {
 - time.Sleep(100 * time.Millisecond)
 - // logging
 - g.Forget(key)
 - }()
 
總結(jié)
這篇文章從使用場景,到使用方法,再到源碼分析和可能存在的坑給大家介紹了 singleflight,希望你能有所收獲,沒事看看官方的代碼還是很有收獲的,這次又學(xué)到了一個(gè)騷操作,用雙重 defer 來避免死鎖,你學(xué)廢了么?
我們下一篇會(huì)開啟一個(gè)新的系列,Go 可用性,敬請(qǐng)期待!
文章博客地址:https://lailin.xyz


















 
 
 







 
 
 
 