使用增強(qiáng)版 Singleflight 合并事件推送,效果炸裂!
hello,大家好啊,我是小樓。
最近在工作中對(duì) Go 的 singleflight 包做了下增強(qiáng),解決了一個(gè)性能問(wèn)題,這里記錄下,希望對(duì)你也有所幫助。
singleflight 是什么
singleflight 直接翻譯為"單(次)飛(行)",它是對(duì)同一種請(qǐng)求的抑制,保證同一時(shí)刻相同的請(qǐng)求只有一個(gè)在執(zhí)行,且在它執(zhí)行期間的相同請(qǐng)求都會(huì) Hold 直到執(zhí)行完成,這些 Hold 的請(qǐng)求也使用這次執(zhí)行的結(jié)果。
舉個(gè)例子,當(dāng)程序中有讀(如 Redis、MySQL、Http、RPC等)請(qǐng)求,且并發(fā)非常高的情況,使用 singleflight 能得到比較好的效果,它限制了同一時(shí)刻只有一個(gè)請(qǐng)求在執(zhí)行,也就是并發(fā)永遠(yuǎn)為1。
singleflight 的原理
最初 singleflight 出現(xiàn)在 groupcache 項(xiàng)目中,這個(gè)項(xiàng)目也是 Go 團(tuán)隊(duì)所寫,后來(lái)該包被移到 Go 源碼中,在 Go 源碼中的版本經(jīng)過(guò)幾輪迭代,稍微有點(diǎn)復(fù)雜,我們以最原始的源碼來(lái)講解原理,更方便地看清本質(zhì)。
https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
singleflight 把每次請(qǐng)求定義為 call,每個(gè) call 對(duì)象包含了一個(gè) waitGroup,一個(gè) val,即請(qǐng)求的返回值,一個(gè) err,即請(qǐng)求返回的錯(cuò)誤。
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
再定義全局的 Group,包含一個(gè)互斥鎖 Mutex,一個(gè) key 為 string,value 為 call 的 map。
type Group struct {
mu sync.Mutex
m map[string]*call
}
Group 對(duì)象有一個(gè) Do 方法,其第一個(gè)參數(shù)是 string 類型的 key,這個(gè) key 也就是上面說(shuō)的 map 的 key,相同的 key 標(biāo)志著他們是相同的請(qǐng)求,只有相同的請(qǐng)求會(huì)被抑制;第二個(gè)參數(shù)是一個(gè)函數(shù) fn,這個(gè)函數(shù)是真正要執(zhí)行的函數(shù),例如調(diào)用 MySQL;返回值比較好理解,即最終調(diào)用的返回值和錯(cuò)誤信息。
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
// ①
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
// ②
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
// ③
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
return c.val, c.err
}
將整個(gè)代碼分成三塊:
- ① 懶加載方式初始化 map;
- ② 如果當(dāng)前 key 存在,即相同請(qǐng)求正在調(diào)用中,就等它完成,完成后直接使用它的 value 和 error;
- ③ 如果當(dāng)前 key 不存在,即沒(méi)有相同請(qǐng)求正在調(diào)用中,就創(chuàng)建一個(gè) call 對(duì)象,并把它放進(jìn) map,接著執(zhí)行 fn 函數(shù),當(dāng)函數(shù)執(zhí)行完喚醒 waitGroup,并刪除 map 相應(yīng)的 key,返回 value 和 error。
讀可以抑制,寫呢?
我們通過(guò)上面的介紹能了解,singleflight 能解決并發(fā)讀的問(wèn)題,但我又遇到一個(gè)并發(fā)寫的問(wèn)題。為了能讓大家快速進(jìn)入狀態(tài),先花一點(diǎn)篇幅描述一下遇到的實(shí)際問(wèn)題:
微服務(wù)中的注冊(cè)中心想必大家都有所了解,如果不了解,可以去查查相關(guān)概念,或者翻看我以前的文章,老讀者應(yīng)該能發(fā)現(xiàn)我寫了很多相關(guān)的文章。
服務(wù)提供方在注冊(cè)之后,會(huì)將變更事件推送到消費(fèi)方,推送事件的處理流程是:接收到事件,查詢組裝出最新的數(shù)據(jù),然后推送給訂閱者。存在兩種情況可能會(huì)導(dǎo)致短時(shí)間內(nèi)注冊(cè)請(qǐng)求非常多,推送事件多會(huì)影響整個(gè)注冊(cè)中心的性能:
- 接口級(jí)注冊(cè)(類似 Dubbo),每臺(tái)機(jī)器會(huì)注冊(cè)N多次
- 服務(wù)并發(fā)發(fā)布,例如每次發(fā)布重啟100臺(tái)機(jī)器,那么注冊(cè)的并發(fā)就可能是100
拿到這種問(wèn)題,第一想到的解法是:合并推送。但,怎么合并呢?
是不是每次推送的時(shí)候等一等,等事件都來(lái)了再一把推過(guò)去就可以了?但等多久呢?什么時(shí)候該等呢?粗暴點(diǎn),每秒鐘推送一次,這樣就能將一秒內(nèi)的時(shí)間都聚合,但這會(huì)影響推送的時(shí)效性,顯然不符合我們精益求精的要求。
直接使用 singleflight,能行嗎?
套用上面 singleflight ,在第一個(gè)事件推送過(guò)程中,其他相同的事件被 Hold 住,等第一個(gè)事件推送完成后,這些 Hold 的事件不再執(zhí)行推送直接返回。
稍微想一下就知道這樣是有問(wèn)題的,假設(shè)有三個(gè)事件 A、B、C,分別對(duì)應(yīng)到三個(gè)版本的數(shù)據(jù)A1、B1、C1,A 最先到達(dá),在 A 開(kāi)始推送后但沒(méi)完成時(shí) B、C 事件到達(dá),A 事件觸發(fā)推送了 A1 版本的數(shù)據(jù),B、C 事件在 A 事件推送完成后,直接丟棄,最終推送到消費(fèi)者上的數(shù)據(jù)版本為 A1,但我們肯定期望推送的數(shù)據(jù)版本為 C1,畫個(gè)圖線感受下:
增強(qiáng)一點(diǎn)點(diǎn) ????
假設(shè)有事件 A、B、C、D 先后到達(dá),A 事件仍然先正常執(zhí)行推送,在 A 事件推送的時(shí)候,B、C、D 事件 Hold 住,當(dāng) A 事件推送完成后,B 事件開(kāi)始推送,B 事件將把 A 事件推送時(shí)期積攢的事件都一起推送掉,即 B、C、D 一次性推送完成。
增強(qiáng)代碼參考
增強(qiáng)的定義為 WriteGroup,借用 singleflight 原先的實(shí)現(xiàn),具體代碼就不必解讀了,對(duì)照上面的例子應(yīng)該很好理解。
package singleflight
import (
"sync"
)
type WriteGroup struct {
mu sync.Mutex
wgs map[string]*sync.WaitGroup
group Group
}
func (g *WriteGroup) Do(key string, fn func() error) error {
g.mu.Lock()
if g.wgs == nil {
g.wgs = make(map[string]*sync.WaitGroup)
}
wg, ok := g.wgs[key]
if !ok {
wg = &sync.WaitGroup{}
wg.Add(1)
g.wgs[key] = wg
}
g.mu.Unlock()
if !ok {
err := fn()
g.mu.Lock()
wg.Done()
delete(g.wgs, key)
g.mu.Unlock()
return err
}
wg.Wait()
_, err := g.group.Do(key, func() (interface{}, error) {
return nil, fn()
})
return err
}
效果如何?
理論上,如果沒(méi)有并發(fā),事件和以前一樣推送,沒(méi)有合并,當(dāng)然這也沒(méi)毛病。當(dāng)并發(fā)大于 2 時(shí),開(kāi)始發(fā)揮威力。在實(shí)際的壓測(cè)上,注冊(cè)并發(fā) 1500 時(shí),合并的事件達(dá)到 99.9%,效果相當(dāng)炸裂!