微服務(wù)架構(gòu)下的熔斷框架:Hystrix-Go
本文轉(zhuǎn)載自微信公眾號(hào)「Golang夢(mèng)工廠」,作者AsongGo 。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang夢(mèng)工廠公眾號(hào)。
背景
伴隨著微服務(wù)架構(gòu)被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務(wù),就離不開(kāi)這幾個(gè)字:高內(nèi)聚低耦合;微服務(wù)的架構(gòu)設(shè)計(jì)最終目的也就是實(shí)現(xiàn)這幾個(gè)字。在微服務(wù)架構(gòu)中,微服務(wù)就是完成一個(gè)單一的業(yè)務(wù)功能,每個(gè)微服務(wù)可以獨(dú)立演進(jìn),一個(gè)應(yīng)用可能會(huì)有多個(gè)微服務(wù)組成,微服務(wù)之間的數(shù)據(jù)交可以通過(guò)遠(yuǎn)程調(diào)用來(lái)完成,這樣在一個(gè)微服務(wù)架構(gòu)下就會(huì)形成這樣的依賴關(guān)系:
微服務(wù)A調(diào)用微服務(wù)C、D,微服務(wù)B又依賴微服務(wù)B、E,微服務(wù)D依賴于服務(wù)F,這只是一個(gè)簡(jiǎn)單的小例子,實(shí)際業(yè)務(wù)中服務(wù)之間的依賴關(guān)系比這還復(fù)雜,這樣在調(diào)用鏈路上如果某個(gè)微服務(wù)的調(diào)用響應(yīng)時(shí)間過(guò)長(zhǎng)或者不可用,那么對(duì)上游服務(wù)(按調(diào)用關(guān)系命名)的調(diào)用就會(huì)占用越來(lái)越多的系統(tǒng)資源,進(jìn)而引起系統(tǒng)崩潰,這就是微服務(wù)的雪蹦效應(yīng)。
為了解決微服務(wù)的雪蹦效應(yīng),提出來(lái)使用熔斷機(jī)制為微服務(wù)鏈路提供保護(hù)機(jī)制。熔斷機(jī)制大家應(yīng)該都不陌生,電路的中保險(xiǎn)絲就是一種熔斷機(jī)制,在微服務(wù)中的熔斷機(jī)制是什么樣的呢?
當(dāng)鏈路中的某個(gè)微服務(wù)不可用或者響應(yīng)的時(shí)間太長(zhǎng)時(shí),會(huì)進(jìn)行服務(wù)的降級(jí),進(jìn)而熔斷該節(jié)點(diǎn)微服務(wù)的調(diào)用,快速返回錯(cuò)誤的響應(yīng)信息,當(dāng)檢測(cè)到該節(jié)點(diǎn)微服務(wù)調(diào)用響應(yīng)正常后,恢復(fù)調(diào)用鏈路。
本文我們就介紹一個(gè)開(kāi)源熔斷框架:hystrix-go。
熔斷框架(hystrix-go)
Hystrix是一個(gè)延遲和容錯(cuò)庫(kù),旨在隔離對(duì)遠(yuǎn)程系統(tǒng)、服務(wù)和第三方服務(wù)的訪問(wèn)點(diǎn),停止級(jí)聯(lián)故障并在故障不可避免的復(fù)雜分布式系統(tǒng)中實(shí)現(xiàn)彈性。hystrix-go 旨在允許 Go 程序員輕松構(gòu)建具有與基于 Java 的 Hystrix 庫(kù)類似的執(zhí)行語(yǔ)義的應(yīng)用程序。所以本文就從使用開(kāi)始到源碼分析一下hystrix-go。
快速安裝
- go get -u github.com/afex/hystrix-go/hystrix
快速使用
hystrix-go真的是開(kāi)箱即用,使用還是比較簡(jiǎn)單的,主要分為兩個(gè)步驟:
- 配置熔斷規(guī)則,否則將使用默認(rèn)配置。可以調(diào)用的方法
- func Configure(cmds map[string]CommandConfig)
- func ConfigureCommand(name string, config CommandConfig)
Configure方法內(nèi)部也是調(diào)用的ConfigureCommand方法,就是傳參數(shù)不一樣,根據(jù)自己的代碼風(fēng)格選擇。
- 定義依賴于外部系統(tǒng)的應(yīng)用程序邏輯 - runFunc 和服務(wù)中斷期間執(zhí)行的邏輯代碼 - fallbackFunc,可以調(diào)用的方法:
- func Go(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用Goc方法
- func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
- func Do(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用的是Doc方法
- func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) // 內(nèi)部調(diào)用Goc方法,處理了異步過(guò)程
Go和Do的區(qū)別在于異步還是同步,Do方法在調(diào)用Doc方法內(nèi)處理了異步過(guò)程,他們最終都是調(diào)用的Goc方法。后面我們進(jìn)行分析。
舉一個(gè)例子:我們?cè)贕in框架上加一個(gè)接口級(jí)的熔斷中間件
- // 代碼已上傳github: 文末查看地址
- var CircuitBreakerName = "api_%s_circuit_breaker"
- func CircuitBreakerWrapper(ctx *gin.Context){
- name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL)
- hystrix.Do(name, func() error {
- ctx.Next()
- code := ctx.Writer.Status()
- if code != http.StatusOK{
- return errors.New(fmt.Sprintf("status code %d", code))
- }
- return nil
- }, func(err error) error {
- if err != nil{
- // 監(jiān)控上報(bào)(未實(shí)現(xiàn))
- _, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) //寫(xiě)入文件(字符串)
- fmt.Printf("circuitBreaker and err is %s\n",err.Error())
- // 返回熔斷錯(cuò)誤
- ctx.JSON(http.StatusServiceUnavailable,gin.H{
- "msg": err.Error(),
- })
- }
- return nil
- })
- }
- func init() {
- hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{
- Timeout: int(3*time.Second), // 執(zhí)行command的超時(shí)時(shí)間為3s
- MaxConcurrentRequests: 10, // command的最大并發(fā)量
- RequestVolumeThreshold: 100, // 統(tǒng)計(jì)窗口10s內(nèi)的請(qǐng)求數(shù)量,達(dá)到這個(gè)請(qǐng)求數(shù)量后才去判斷是否要開(kāi)啟熔斷
- SleepWindow: int(2 * time.Second), // 當(dāng)熔斷器被打開(kāi)后,SleepWindow的時(shí)間就是控制過(guò)多久后去嘗試服務(wù)是否可用了
- ErrorPercentThreshold: 20, // 錯(cuò)誤百分比,請(qǐng)求數(shù)量大于等于RequestVolumeThreshold并且錯(cuò)誤率到達(dá)這個(gè)百分比后就會(huì)啟動(dòng)熔斷
- })
- if checkFileIsExist(filename) { //如果文件存在
- f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) //打開(kāi)文件
- } else {
- f, errfile = os.Create(filename) //創(chuàng)建文件
- }
- }
- func main() {
- defer f.Close()
- hystrixStreamHandler := hystrix.NewStreamHandler()
- hystrixStreamHandler.Start()
- go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
- r := gin.Default()
- r.GET("/api/ping/baidu", func(c *gin.Context) {
- _, err := http.Get("https://www.baidu.com")
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()})
- return
- }
- c.JSON(http.StatusOK, gin.H{"msg": "success"})
- }, CircuitBreakerWrapper)
- r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
- }
- func checkFileIsExist(filename string) bool {
- if _, err := os.Stat(filename); os.IsNotExist(err) {
- return false
- }
- return true
- }
指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/ping/baidu
運(yùn)行結(jié)果:
- circuitBreaker and err is status code 500
- circuitBreaker and err is status code 500
- .....
- circuitBreaker and err is hystrix: max concurrency
- circuitBreaker and err is hystrix: max concurrency
- .....
- circuitBreaker and err is hystrix: circuit open
- circuitBreaker and err is hystrix: circuit open
- .....
對(duì)錯(cuò)誤進(jìn)行分析:
- circuitBreaker and err is status code 500:因?yàn)槲覀冴P(guān)閉了網(wǎng)絡(luò),所以請(qǐng)求是沒(méi)有響應(yīng)的
- circuitBreaker and err is hystrix: max concurrency:我們?cè)O(shè)置的最大并發(fā)量MaxConcurrentRequests是10,我們的壓測(cè)工具使用的是100并發(fā),所有會(huì)觸發(fā)這個(gè)熔斷
- circuitBreaker and err is hystrix: circuit open:我們?cè)O(shè)置熔斷開(kāi)啟的請(qǐng)求數(shù)量RequestVolumeThreshold是100,所以當(dāng)10s內(nèi)的請(qǐng)求數(shù)量大于100時(shí)就會(huì)觸發(fā)熔斷。
簡(jiǎn)單對(duì)上面的例子做一個(gè)解析:
- 添加接口級(jí)的熔斷中間件
- 初始化熔斷相關(guān)配置
- 開(kāi)啟dashboard 可視化hystrix的上報(bào)信息,瀏覽器打開(kāi)http://localhost:81,可以看到如下結(jié)果:
hystrix-go流程分析
本來(lái)想對(duì)源碼進(jìn)行分析,代碼量有點(diǎn)大,所以就針對(duì)流程來(lái)分析,順便看一些核心代碼。
配置熔斷規(guī)則
既然是熔斷,就要有熔斷規(guī)則,我們可以調(diào)用兩個(gè)方法配置熔斷規(guī)則,不會(huì)最終調(diào)用的都是ConfigureCommand,這里沒(méi)有特別的邏輯,如果我們沒(méi)有配置,系統(tǒng)將使用默認(rèn)熔斷規(guī)則:
- var (
- // DefaultTimeout is how long to wait for command to complete, in milliseconds
- DefaultTimeout = 1000
- // DefaultMaxConcurrent is how many commands of the same type can run at the same time
- DefaultMaxConcurrent = 10
- // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
- DefaultVolumeThreshold = 20
- // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
- DefaultSleepWindow = 5000
- // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
- DefaultErrorPercentThreshold = 50
- // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
- DefaultLogger = NoopLogger{}
- )
配置規(guī)則如下:
- Timeout:定義執(zhí)行command的超時(shí)時(shí)間,時(shí)間單位是ms,默認(rèn)時(shí)間是1000ms;
- MaxConcurrnetRequests:定義command的最大并發(fā)量,默認(rèn)值是10并發(fā)量;
- SleepWindow:熔斷器被打開(kāi)后使用,在熔斷器被打開(kāi)后,根據(jù)SleepWindow設(shè)置的時(shí)間控制多久后嘗試服務(wù)是否可用,默認(rèn)時(shí)間為5000ms;
- RequestVolumeThreshold:判斷熔斷開(kāi)關(guān)的條件之一,統(tǒng)計(jì)10s(代碼中寫(xiě)死了)內(nèi)請(qǐng)求數(shù)量,達(dá)到這個(gè)請(qǐng)求數(shù)量后再根據(jù)錯(cuò)誤率判斷是否要開(kāi)啟熔斷;
- ErrorPercentThreshold:判斷熔斷開(kāi)關(guān)的條件之一,統(tǒng)計(jì)錯(cuò)誤百分比,請(qǐng)求數(shù)量大于等于RequestVolumeThreshold并且錯(cuò)誤率到達(dá)這個(gè)百分比后就會(huì)啟動(dòng)熔斷 默認(rèn)值是50;
這些規(guī)則根據(jù)command的name進(jìn)行區(qū)分存放到一個(gè)map中。
執(zhí)行command
執(zhí)行command主要可以調(diào)用四個(gè)方法,分別是:
- func Go(name string, run runFunc, fallback fallbackFunc)
- func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
- func Do(name string, run runFunc, fallback fallbackFunc)
- func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
Do內(nèi)部調(diào)用的Doc方法,Go內(nèi)部調(diào)用的是Goc方法,在Doc方法內(nèi)部最終調(diào)用的還是Goc方法,只是在Doc方法內(nèi)做了同步邏輯:
- func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
- ..... 省略部分封裝代碼
- var errChan chan error
- if fallback == nil {
- errChan = GoC(ctx, name, r, nil)
- } else {
- errChan = GoC(ctx, name, r, f)
- }
- select {
- case <-done:
- return nil
- case err := <-errChan:
- return err
- }
- }
因?yàn)樗麄冏罱K都是調(diào)用的Goc方法,所以我們執(zhí)行分析Goc方法的內(nèi)部邏輯;代碼有點(diǎn)長(zhǎng),我們分邏輯來(lái)分析:
創(chuàng)建command對(duì)象
- cmd := &command{
- run: run,
- fallback: fallback,
- start: time.Now(),
- errChan: make(chan error, 1),
- finished: make(chan bool, 1),
- }
- // 獲取熔斷器
- circuit, _, err := GetCircuit(name)
- if err != nil {
- cmd.errChan <- err
- return cmd.errChan
- }
介紹一下command的數(shù)據(jù)結(jié)構(gòu):
- type command struct {
- sync.Mutex
- ticket *struct{}
- start time.Time
- errChan chan error
- finished chan bool
- circuit *CircuitBreaker
- run runFuncC
- fallback fallbackFuncC
- runDuration time.Duration
- events []string
- }
字段介紹:
- ticket:用來(lái)做最大并發(fā)量控制,這個(gè)就是一個(gè)令牌
- start:記錄command執(zhí)行的開(kāi)始時(shí)間
- errChan:記錄command執(zhí)行錯(cuò)誤
- finished:標(biāo)志command執(zhí)行結(jié)束,用來(lái)做協(xié)程同步
- circuit:存儲(chǔ)熔斷器相關(guān)信息
- run:應(yīng)用程序
- fallback:應(yīng)用程序執(zhí)行失敗后要執(zhí)行的函數(shù)
- runDuration:記錄command執(zhí)行消耗時(shí)間
- events:events主要是存儲(chǔ)事件類型信息,比如執(zhí)行成功的success,或者失敗的timeout、context_canceled等
上段代碼重點(diǎn)是GetCircuit方法,這一步的目的就是獲取熔斷器,使用動(dòng)態(tài)加載的方式,如果沒(méi)有就創(chuàng)建一個(gè)熔斷器,熔斷器結(jié)構(gòu)如下:
- type CircuitBreaker struct {
- Name string
- open bool
- forceOpen bool
- mutex *sync.RWMutex
- openedOrLastTestedTime int64
- executorPool *executorPool
- metrics *metricExchange
- }
解釋一下這幾個(gè)字段:
- name:熔斷器的名字,其實(shí)就是創(chuàng)建的command名字
- open:判斷熔斷器是否打開(kāi)的標(biāo)志
- forceopen:手動(dòng)觸發(fā)熔斷器的開(kāi)關(guān),單元測(cè)試使用
- mutex:使用讀寫(xiě)鎖保證并發(fā)安全
- openedOrLastTestedTime:記錄上一次打開(kāi)熔斷器的時(shí)間,因?yàn)橐鶕?jù)這個(gè)時(shí)間和SleepWindow時(shí)間來(lái)做恢復(fù)嘗試
- executorPool:用來(lái)做流量控制,因?yàn)槲覀冇幸粋€(gè)最大并發(fā)量控制,就是根據(jù)這個(gè)來(lái)做的流量控制,每次請(qǐng)求都要獲取令牌
metrics:用來(lái)上報(bào)執(zhí)行狀態(tài)的事件,通過(guò)它把執(zhí)行狀態(tài)信息存儲(chǔ)到實(shí)際熔斷器執(zhí)行各個(gè)維度狀態(tài) (成功次數(shù),失敗次數(shù),超時(shí)……) 的數(shù)據(jù)集合中。
后面會(huì)單獨(dú)分析executorPool、metrics的實(shí)現(xiàn)邏輯。
定義令牌相關(guān)的方法和變量
因?yàn)槲覀冇幸粋€(gè)條件是最大并發(fā)控制,采用的是令牌的方式進(jìn)行流量控制,每一個(gè)請(qǐng)求都要獲取一個(gè)令牌,使用完畢要把令牌還回去,先看一下這段代碼:
- ticketCond := sync.NewCond(cmd)
- ticketChecked := false
- // When the caller extracts error from returned errChan, it's assumed that
- // the ticket's been returned to executorPool. Therefore, returnTicket() can
- // not run after cmd.errorWithFallback().
- returnTicket := func() {
- cmd.Lock()
- // Avoid releasing before a ticket is acquired.
- for !ticketChecked {
- ticketCond.Wait()
- }
- cmd.circuit.executorPool.Return(cmd.ticket)
- cmd.Unlock()
- }
使用sync.NewCond創(chuàng)建一個(gè)條件變量,用來(lái)協(xié)調(diào)通知你可以歸還令牌了。
然后定義一個(gè)返回令牌的方法,調(diào)用Return方法歸還令牌。
定義上報(bào)執(zhí)行事件的方法
前面我們也提到了,我們的熔斷器會(huì)上報(bào)執(zhí)行狀態(tài)的事件,通過(guò)它把執(zhí)行狀態(tài)信息存儲(chǔ)到實(shí)際熔斷器執(zhí)行各個(gè)維度狀態(tài) (成功次數(shù),失敗次數(shù),超時(shí)……) 的數(shù)據(jù)集合中。所以要定義一個(gè)上報(bào)的方法:
- reportAllEvent := func() {
- err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
- if err != nil {
- log.Printf(err.Error())
- }
- }
開(kāi)啟協(xié)程一:執(zhí)行應(yīng)用程序邏輯 - runFunc
協(xié)程一的主要目的就是執(zhí)行應(yīng)用程序邏輯:
- go func() {
- defer func() { cmd.finished <- true }() // 標(biāo)志協(xié)程一的command執(zhí)行結(jié)束,同步到協(xié)程二
- // 當(dāng)最近執(zhí)行的并發(fā)數(shù)量超過(guò)閾值并且錯(cuò)誤率很高時(shí),就會(huì)打開(kāi)熔斷器。
- // 如果熔斷器打開(kāi),直接拒絕拒絕請(qǐng)求并返回令牌,當(dāng)感覺(jué)健康狀態(tài)恢復(fù)時(shí),熔斷器將允許新的流量。
- if !cmd.circuit.AllowRequest() {
- cmd.Lock()
- // It's safe for another goroutine to go ahead releasing a nil ticket.
- ticketChecked = true
- ticketCond.Signal() // 通知釋放ticket信號(hào)
- cmd.Unlock()
- // 使用sync.Onece保證只執(zhí)行一次。
- returnOnce.Do(func() {
- // 返還令牌
- returnTicket()
- // 執(zhí)行fallback邏輯
- cmd.errorWithFallback(ctx, ErrCircuitOpen)
- // 上報(bào)狀態(tài)事件
- reportAllEvent()
- })
- return
- }
- // 控制并發(fā)
- cmd.Lock()
- select {
- // 獲取到令牌
- case cmd.ticket = <-circuit.executorPool.Tickets:
- // 發(fā)送釋放令牌信號(hào)
- ticketChecked = true
- ticketCond.Signal()
- cmd.Unlock()
- default:
- // 沒(méi)有令牌可用了, 也就是達(dá)到最大并發(fā)數(shù)量則直接處理fallback邏輯
- ticketChecked = true
- ticketCond.Signal()
- cmd.Unlock()
- returnOnce.Do(func() {
- returnTicket()
- cmd.errorWithFallback(ctx, ErrMaxConcurrency)
- reportAllEvent()
- })
- return
- }
- // 執(zhí)行應(yīng)用程序邏輯
- runStart := time.Now()
- runErr := run(ctx)
- returnOnce.Do(func() {
- defer reportAllEvent() // 狀態(tài)事件上報(bào)
- // 統(tǒng)計(jì)應(yīng)用程序執(zhí)行時(shí)長(zhǎng)
- cmd.runDuration = time.Since(runStart)
- // 返還令牌
- returnTicket()
- // 如果應(yīng)用程序執(zhí)行失敗執(zhí)行fallback函數(shù)
- if runErr != nil {
- cmd.errorWithFallback(ctx, runErr)
- return
- }
- cmd.reportEvent("success")
- })
- }()
總結(jié)一下這個(gè)協(xié)程:
- 判斷熔斷器是否打開(kāi),如果打開(kāi)了熔斷器直接進(jìn)行熔斷,不在進(jìn)行后面的請(qǐng)求
- 運(yùn)行應(yīng)用程序邏輯
開(kāi)啟協(xié)程二:同步協(xié)程一并監(jiān)聽(tīng)錯(cuò)誤
先看代碼:
- go func() {
- // 使用定時(shí)器來(lái)做超時(shí)控制,這個(gè)超時(shí)時(shí)間就是我們配置的,默認(rèn)1000ms
- timer := time.NewTimer(getSettings(name).Timeout)
- defer timer.Stop()
- select {
- // 同步協(xié)程一
- case <-cmd.finished:
- // returnOnce has been executed in another goroutine
- // 是否收到context取消信號(hào)
- case <-ctx.Done():
- returnOnce.Do(func() {
- returnTicket()
- cmd.errorWithFallback(ctx, ctx.Err())
- reportAllEvent()
- })
- return
- // command執(zhí)行超時(shí)了
- case <-timer.C:
- returnOnce.Do(func() {
- returnTicket()
- cmd.errorWithFallback(ctx, ErrTimeout)
- reportAllEvent()
- })
- return
- }
- }()
這個(gè)協(xié)程的邏輯比較清晰明了,目的就是監(jiān)聽(tīng)業(yè)務(wù)執(zhí)行被取消以及超時(shí)。
畫(huà)圖總結(jié)command執(zhí)行流程
上面我們都是通過(guò)代碼來(lái)進(jìn)行分析的,看起來(lái)還是有點(diǎn)亂,最后畫(huà)個(gè)圖總結(jié)一下:
上面我們分析了整個(gè)具體流程,接下來(lái)我們針對(duì)一些核心點(diǎn)就行分析
上報(bào)狀態(tài)事件
hystrix-go為每一個(gè)Command設(shè)置了一個(gè)默認(rèn)統(tǒng)計(jì)控制器,用來(lái)保存熔斷器的所有狀態(tài),包括調(diào)用次數(shù)、失敗次數(shù)、被拒絕次數(shù)等,存儲(chǔ)指標(biāo)結(jié)構(gòu)如下:
- type DefaultMetricCollector struct {
- mutex *sync.RWMutex
- numRequests *rolling.Number
- errors *rolling.Number
- successes *rolling.Number
- failures *rolling.Number
- rejects *rolling.Number
- shortCircuits *rolling.Number
- timeouts *rolling.Number
- contextCanceled *rolling.Number
- contextDeadlineExceeded *rolling.Number
- fallbackSuccesses *rolling.Number
- fallbackFailures *rolling.Number
- totalDuration *rolling.Timing
- runDuration *rolling.Timing
- }
使用rolling.Number結(jié)構(gòu)保存狀態(tài)指標(biāo),使用rolling.Timing保存時(shí)間指標(biāo)。
最終監(jiān)控上報(bào)都依靠metricExchange來(lái)實(shí)現(xiàn),數(shù)據(jù)結(jié)構(gòu)如下:
- type metricExchange struct {
- Name string
- Updates chan *commandExecution
- Mutex *sync.RWMutex
- metricCollectors []metricCollector.MetricCollector
- }
上報(bào)command的信息結(jié)構(gòu):
- type commandExecution struct {
- Types []string `json:"types"` // 區(qū)分事件類型,比如success、failure....
- Start time.Time `json:"start_time"` // command開(kāi)始時(shí)間
- RunDuration time.Duration `json:"run_duration"` // command結(jié)束時(shí)間
- ConcurrencyInUse float64 `json:"concurrency_inuse"` // command 線程池使用率
- }
說(shuō)了這么多,大家還是有點(diǎn)懵,其實(shí)用一個(gè)類圖就能表明他們之間的關(guān)系:
我們可以看到類mertricExchange提供了一個(gè)Monitor方法,這個(gè)方法主要邏輯就是監(jiān)聽(tīng)狀態(tài)事件,然后寫(xiě)入指標(biāo),所以整個(gè)上報(bào)流程就是這個(gè)樣子:
流量控制
hystrix-go對(duì)流量控制采用的是令牌算法,能得到令牌的就可以執(zhí)行后繼的工作,執(zhí)行完后要返還令牌。結(jié)構(gòu)體executorPool就是hystrix-go 流量控制的具體實(shí)現(xiàn)。字段Max就是每秒最大的并發(fā)值。
- type executorPool struct {
- Name string
- Metrics *poolMetrics // 上報(bào)執(zhí)行數(shù)量指標(biāo)
- Max int // 最大并發(fā)數(shù)量
- Tickets chan *struct{} // 代表令牌
- }
這里還有一個(gè)上報(bào)指標(biāo),這個(gè)又單獨(dú)實(shí)現(xiàn)一套方法用來(lái)統(tǒng)計(jì)執(zhí)行數(shù)量,比如執(zhí)行的總數(shù)量、最大并發(fā)數(shù)等,我們依賴畫(huà)一個(gè)類圖來(lái)表示:
上報(bào)執(zhí)行數(shù)量邏輯與上報(bào)狀態(tài)事件的邏輯是一樣的,使用channel進(jìn)行數(shù)據(jù)通信的,上報(bào)與返還令牌都在Return方法中:
- func (p *executorPool) Return(ticket *struct{}) {
- if ticket == nil {
- return
- }
- p.Metrics.Updates <- poolMetricsUpdate{
- activeCount: p.ActiveCount(),
- }
- p.Tickets <- ticket
- }
主要邏輯兩步:
- 上報(bào)當(dāng)前可用的令牌數(shù)
- 返回令牌
熔斷器
我們最后來(lái)分析熔斷器中一個(gè)比較重要的方法:AllowRequest,我們?cè)趫?zhí)行Command是會(huì)根據(jù)這個(gè)方法來(lái)判斷是否可以執(zhí)行command,接下來(lái)我們就來(lái)看一下這個(gè)判斷的主要邏輯:
- func (circuit *CircuitBreaker) AllowRequest() bool {
- return !circuit.IsOpen() || circuit.allowSingleTest()
- }
內(nèi)部就是調(diào)用IsOpen()、allowSingleTest這兩個(gè)方法:
- IsOpen()
- func (circuit *CircuitBreaker) IsOpen() bool {
- circuit.mutex.RLock()
- o := circuit.forceOpen || circuit.open
- circuit.mutex.RUnlock()
- // 熔斷已經(jīng)開(kāi)啟
- if o {
- return true
- }
- // 判斷10s內(nèi)的并發(fā)數(shù)是否超過(guò)設(shè)置的最大并發(fā)數(shù),沒(méi)有超過(guò)時(shí),不需要開(kāi)啟熔斷器
- if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
- return false
- }
- // 此時(shí)10s內(nèi)的并發(fā)數(shù)已經(jīng)超過(guò)設(shè)置的最大并發(fā)數(shù)了,如果此時(shí)系統(tǒng)錯(cuò)誤率超過(guò)了預(yù)設(shè)值,那就開(kāi)啟熔斷器
- if !circuit.metrics.IsHealthy(time.Now()) {
- //
- circuit.setOpen()
- return true
- }
- return false
- }
- allowSingleTest()
先解釋一下為什么要有這個(gè)方法,還記得我們之前設(shè)置了一個(gè)熔斷規(guī)則中的SleepWindow嗎,如果在開(kāi)啟熔斷的情況下,在SleepWindow時(shí)間后進(jìn)行嘗試,這個(gè)方法的目的就是干這個(gè)的:
- func (circuit *CircuitBreaker) allowSingleTest() bool {
- circuit.mutex.RLock()
- defer circuit.mutex.RUnlock()
- // 獲取當(dāng)前時(shí)間戳
- now := time.Now().UnixNano()
- openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
- // 當(dāng)前熔斷器是開(kāi)啟狀態(tài),當(dāng)前的時(shí)間已經(jīng)大于 (上次開(kāi)啟熔斷器的時(shí)間 +SleepWindow 的時(shí)間)
- if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
- // 替換openedOrLastTestedTime
- swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
- if swapped {
- log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
- }
- return swapped
- }
這里只看到了熔斷器被開(kāi)啟的設(shè)置了,但是沒(méi)有關(guān)閉熔斷器的邏輯,因?yàn)殛P(guān)閉熔斷器的邏輯是在上報(bào)狀態(tài)指標(biāo)的方法ReportEvent內(nèi)實(shí)現(xiàn),我們最后再看一下ReportEvent的實(shí)現(xiàn):
- func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
- if len(eventTypes) == 0 {
- return fmt.Errorf("no event types sent for metrics")
- }
- circuit.mutex.RLock()
- o := circuit.open
- circuit.mutex.RUnlock()
- // 上報(bào)的狀態(tài)事件是success 并且當(dāng)前熔斷器是開(kāi)啟狀態(tài),則說(shuō)明下游服務(wù)正常了,可以關(guān)閉熔斷器了
- if eventTypes[0] == "success" && o {
- circuit.setClose()
- }
- var concurrencyInUse float64
- if circuit.executorPool.Max > 0 {
- concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
- }
- select {
- // 上報(bào)狀態(tài)指標(biāo),與上文的monitor呼應(yīng)
- case circuit.metrics.Updates <- &commandExecution{
- Types: eventTypes,
- Start: start,
- RunDuration: runDuration,
- ConcurrencyInUse: concurrencyInUse,
- }:
- default:
- return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
- }
- return nil
- }
可視化hystrix的上報(bào)信息
通過(guò)上面的分析我們知道hystrix-go上報(bào)了狀態(tài)事件、執(zhí)行數(shù)量事件,那么這些指標(biāo)我們可以怎么查看呢?
設(shè)計(jì)者早就想到了這個(gè)問(wèn)題,所以他們做了一個(gè)dashborad,可以查看hystrix的上報(bào)信息,使用方法只需在服務(wù)啟動(dòng)時(shí)添加如下代碼:
- hystrixStreamHandler := hystrix.NewStreamHandler()
- hystrixStreamHandler.Start()
- go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
然后打開(kāi)瀏覽器:http://127.0.0.1:81/hystrix-dashboard,進(jìn)行觀測(cè)吧。
總結(jié)
故事終于接近尾聲了,一個(gè)熔斷機(jī)制的實(shí)現(xiàn)確實(shí)不簡(jiǎn)單,要考慮的因素也是方方面面,尤其在微服務(wù)架構(gòu)下,熔斷機(jī)制是必不可少的,不僅要在框架層面實(shí)現(xiàn)熔斷機(jī)制,還要根據(jù)具體業(yè)務(wù)場(chǎng)景使用熔斷機(jī)制,這些都是值得我們深思熟慮的。本文介紹的熔斷框架實(shí)現(xiàn)的還是比較完美的,這種優(yōu)秀的設(shè)計(jì)思路值得我們學(xué)習(xí)。
文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/hystrix_demo,歡迎star。