偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

微服務(wù)架構(gòu)下的熔斷框架:Hystrix-Go

開(kāi)發(fā) 架構(gòu)
伴隨著微服務(wù)架構(gòu)被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務(wù),就離不開(kāi)這幾個(gè)字:高內(nèi)聚低耦合;微服務(wù)的架構(gòu)設(shè)計(jì)最終目的也就是實(shí)現(xiàn)這幾個(gè)字。

[[421890]]

本文轉(zhuǎn)載自微信公眾號(hào)「Golang夢(mèng)工廠」,作者AsongGo 。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang夢(mèng)工廠公眾號(hào)。

背景

伴隨著微服務(wù)架構(gòu)被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務(wù),就離不開(kāi)這幾個(gè)字:高內(nèi)聚低耦合;微服務(wù)的架構(gòu)設(shè)計(jì)最終目的也就是實(shí)現(xiàn)這幾個(gè)字。在微服務(wù)架構(gòu)中,微服務(wù)就是完成一個(gè)單一的業(yè)務(wù)功能,每個(gè)微服務(wù)可以獨(dú)立演進(jìn),一個(gè)應(yīng)用可能會(huì)有多個(gè)微服務(wù)組成,微服務(wù)之間的數(shù)據(jù)交可以通過(guò)遠(yuǎn)程調(diào)用來(lái)完成,這樣在一個(gè)微服務(wù)架構(gòu)下就會(huì)形成這樣的依賴關(guān)系:

微服務(wù)A調(diào)用微服務(wù)C、D,微服務(wù)B又依賴微服務(wù)B、E,微服務(wù)D依賴于服務(wù)F,這只是一個(gè)簡(jiǎn)單的小例子,實(shí)際業(yè)務(wù)中服務(wù)之間的依賴關(guān)系比這還復(fù)雜,這樣在調(diào)用鏈路上如果某個(gè)微服務(wù)的調(diào)用響應(yīng)時(shí)間過(guò)長(zhǎng)或者不可用,那么對(duì)上游服務(wù)(按調(diào)用關(guān)系命名)的調(diào)用就會(huì)占用越來(lái)越多的系統(tǒng)資源,進(jìn)而引起系統(tǒng)崩潰,這就是微服務(wù)的雪蹦效應(yīng)。

為了解決微服務(wù)的雪蹦效應(yīng),提出來(lái)使用熔斷機(jī)制為微服務(wù)鏈路提供保護(hù)機(jī)制。熔斷機(jī)制大家應(yīng)該都不陌生,電路的中保險(xiǎn)絲就是一種熔斷機(jī)制,在微服務(wù)中的熔斷機(jī)制是什么樣的呢?

當(dāng)鏈路中的某個(gè)微服務(wù)不可用或者響應(yīng)的時(shí)間太長(zhǎng)時(shí),會(huì)進(jìn)行服務(wù)的降級(jí),進(jìn)而熔斷該節(jié)點(diǎn)微服務(wù)的調(diào)用,快速返回錯(cuò)誤的響應(yīng)信息,當(dāng)檢測(cè)到該節(jié)點(diǎn)微服務(wù)調(diào)用響應(yīng)正常后,恢復(fù)調(diào)用鏈路。

本文我們就介紹一個(gè)開(kāi)源熔斷框架:hystrix-go。

熔斷框架(hystrix-go)

Hystrix是一個(gè)延遲和容錯(cuò)庫(kù),旨在隔離對(duì)遠(yuǎn)程系統(tǒng)、服務(wù)和第三方服務(wù)的訪問(wèn)點(diǎn),停止級(jí)聯(lián)故障并在故障不可避免的復(fù)雜分布式系統(tǒng)中實(shí)現(xiàn)彈性。hystrix-go 旨在允許 Go 程序員輕松構(gòu)建具有與基于 Java 的 Hystrix 庫(kù)類似的執(zhí)行語(yǔ)義的應(yīng)用程序。所以本文就從使用開(kāi)始到源碼分析一下hystrix-go。

快速安裝

  1. go get -u github.com/afex/hystrix-go/hystrix 

快速使用

hystrix-go真的是開(kāi)箱即用,使用還是比較簡(jiǎn)單的,主要分為兩個(gè)步驟:

  • 配置熔斷規(guī)則,否則將使用默認(rèn)配置。可以調(diào)用的方法
  1. func Configure(cmds map[string]CommandConfig)  
  2. func ConfigureCommand(name string, config CommandConfig) 

Configure方法內(nèi)部也是調(diào)用的ConfigureCommand方法,就是傳參數(shù)不一樣,根據(jù)自己的代碼風(fēng)格選擇。

  • 定義依賴于外部系統(tǒng)的應(yīng)用程序邏輯 - runFunc 和服務(wù)中斷期間執(zhí)行的邏輯代碼 - fallbackFunc,可以調(diào)用的方法:
  1. func Go(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用Goc方法 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用的是Doc方法 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) // 內(nèi)部調(diào)用Goc方法,處理了異步過(guò)程 

Go和Do的區(qū)別在于異步還是同步,Do方法在調(diào)用Doc方法內(nèi)處理了異步過(guò)程,他們最終都是調(diào)用的Goc方法。后面我們進(jìn)行分析。

舉一個(gè)例子:我們?cè)贕in框架上加一個(gè)接口級(jí)的熔斷中間件

  1. // 代碼已上傳github: 文末查看地址 
  2. var CircuitBreakerName = "api_%s_circuit_breaker" 
  3. func CircuitBreakerWrapper(ctx *gin.Context){ 
  4.  name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL) 
  5.  hystrix.Do(name, func() error { 
  6.   ctx.Next() 
  7.   code := ctx.Writer.Status() 
  8.   if code != http.StatusOK{ 
  9.    return errors.New(fmt.Sprintf("status code %d", code)) 
  10.   } 
  11.   return nil 
  12.  
  13.  }, func(err error) error { 
  14.   if err != nil{ 
  15.    // 監(jiān)控上報(bào)(未實(shí)現(xiàn)) 
  16.    _, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) //寫(xiě)入文件(字符串) 
  17.    fmt.Printf("circuitBreaker and err is %s\n",err.Error()) 
  18.    // 返回熔斷錯(cuò)誤 
  19.    ctx.JSON(http.StatusServiceUnavailable,gin.H{ 
  20.     "msg": err.Error(), 
  21.    }) 
  22.   } 
  23.   return nil 
  24.  }) 
  25.  
  26. func init()  { 
  27.  hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{ 
  28.   Timeout:                int(3*time.Second), // 執(zhí)行command的超時(shí)時(shí)間為3s 
  29.   MaxConcurrentRequests:  10, // command的最大并發(fā)量 
  30.   RequestVolumeThreshold: 100, // 統(tǒng)計(jì)窗口10s內(nèi)的請(qǐng)求數(shù)量,達(dá)到這個(gè)請(qǐng)求數(shù)量后才去判斷是否要開(kāi)啟熔斷 
  31.   SleepWindow:            int(2 * time.Second), // 當(dāng)熔斷器被打開(kāi)后,SleepWindow的時(shí)間就是控制過(guò)多久后去嘗試服務(wù)是否可用了 
  32.   ErrorPercentThreshold:  20, // 錯(cuò)誤百分比,請(qǐng)求數(shù)量大于等于RequestVolumeThreshold并且錯(cuò)誤率到達(dá)這個(gè)百分比后就會(huì)啟動(dòng)熔斷 
  33.  }) 
  34.  if checkFileIsExist(filename) { //如果文件存在 
  35.   f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) //打開(kāi)文件 
  36.  } else { 
  37.   f, errfile = os.Create(filename) //創(chuàng)建文件 
  38.  } 
  39.  
  40.  
  41. func main()  { 
  42.  defer f.Close() 
  43.  hystrixStreamHandler := hystrix.NewStreamHandler() 
  44.  hystrixStreamHandler.Start() 
  45.  go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 
  46.  r := gin.Default() 
  47.  r.GET("/api/ping/baidu", func(c *gin.Context) { 
  48.   _, err := http.Get("https://www.baidu.com"
  49.   if err != nil { 
  50.    c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) 
  51.    return 
  52.   } 
  53.   c.JSON(http.StatusOK, gin.H{"msg""success"}) 
  54.  }, CircuitBreakerWrapper) 
  55.  r.Run()  // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080"
  56.  
  57. func checkFileIsExist(filename string) bool { 
  58.  if _, err := os.Stat(filename); os.IsNotExist(err) { 
  59.   return false 
  60.  } 
  61.  return true 

指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/ping/baidu

運(yùn)行結(jié)果:

  1. circuitBreaker and err is status code 500 
  2. circuitBreaker and err is status code 500 
  3. .....  
  4. circuitBreaker and err is hystrix: max concurrency 
  5. circuitBreaker and err is hystrix: max concurrency 
  6. ..... 
  7. circuitBreaker and err is hystrix: circuit open 
  8. circuitBreaker and err is hystrix: circuit open 
  9. ..... 

對(duì)錯(cuò)誤進(jìn)行分析:

  • circuitBreaker and err is status code 500:因?yàn)槲覀冴P(guān)閉了網(wǎng)絡(luò),所以請(qǐng)求是沒(méi)有響應(yīng)的
  • circuitBreaker and err is hystrix: max concurrency:我們?cè)O(shè)置的最大并發(fā)量MaxConcurrentRequests是10,我們的壓測(cè)工具使用的是100并發(fā),所有會(huì)觸發(fā)這個(gè)熔斷
  • circuitBreaker and err is hystrix: circuit open:我們?cè)O(shè)置熔斷開(kāi)啟的請(qǐng)求數(shù)量RequestVolumeThreshold是100,所以當(dāng)10s內(nèi)的請(qǐng)求數(shù)量大于100時(shí)就會(huì)觸發(fā)熔斷。

簡(jiǎn)單對(duì)上面的例子做一個(gè)解析:

  • 添加接口級(jí)的熔斷中間件
  • 初始化熔斷相關(guān)配置
  • 開(kāi)啟dashboard 可視化hystrix的上報(bào)信息,瀏覽器打開(kāi)http://localhost:81,可以看到如下結(jié)果:

hystrix-go流程分析

本來(lái)想對(duì)源碼進(jìn)行分析,代碼量有點(diǎn)大,所以就針對(duì)流程來(lái)分析,順便看一些核心代碼。

配置熔斷規(guī)則

既然是熔斷,就要有熔斷規(guī)則,我們可以調(diào)用兩個(gè)方法配置熔斷規(guī)則,不會(huì)最終調(diào)用的都是ConfigureCommand,這里沒(méi)有特別的邏輯,如果我們沒(méi)有配置,系統(tǒng)將使用默認(rèn)熔斷規(guī)則:

  1. var ( 
  2.  // DefaultTimeout is how long to wait for command to complete, in milliseconds 
  3.  DefaultTimeout = 1000 
  4.  // DefaultMaxConcurrent is how many commands of the same type can run at the same time 
  5.  DefaultMaxConcurrent = 10 
  6.  // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health 
  7.  DefaultVolumeThreshold = 20 
  8.  // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery 
  9.  DefaultSleepWindow = 5000 
  10.  // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests 
  11.  DefaultErrorPercentThreshold = 50 
  12.  // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing. 
  13.  DefaultLogger = NoopLogger{} 

配置規(guī)則如下:

  • Timeout:定義執(zhí)行command的超時(shí)時(shí)間,時(shí)間單位是ms,默認(rèn)時(shí)間是1000ms;
  • MaxConcurrnetRequests:定義command的最大并發(fā)量,默認(rèn)值是10并發(fā)量;
  • SleepWindow:熔斷器被打開(kāi)后使用,在熔斷器被打開(kāi)后,根據(jù)SleepWindow設(shè)置的時(shí)間控制多久后嘗試服務(wù)是否可用,默認(rèn)時(shí)間為5000ms;
  • RequestVolumeThreshold:判斷熔斷開(kāi)關(guān)的條件之一,統(tǒng)計(jì)10s(代碼中寫(xiě)死了)內(nèi)請(qǐng)求數(shù)量,達(dá)到這個(gè)請(qǐng)求數(shù)量后再根據(jù)錯(cuò)誤率判斷是否要開(kāi)啟熔斷;
  • ErrorPercentThreshold:判斷熔斷開(kāi)關(guān)的條件之一,統(tǒng)計(jì)錯(cuò)誤百分比,請(qǐng)求數(shù)量大于等于RequestVolumeThreshold并且錯(cuò)誤率到達(dá)這個(gè)百分比后就會(huì)啟動(dòng)熔斷 默認(rèn)值是50;

這些規(guī)則根據(jù)command的name進(jìn)行區(qū)分存放到一個(gè)map中。

執(zhí)行command

執(zhí)行command主要可以調(diào)用四個(gè)方法,分別是:

  1. func Go(name string, run runFunc, fallback fallbackFunc) 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) 

Do內(nèi)部調(diào)用的Doc方法,Go內(nèi)部調(diào)用的是Goc方法,在Doc方法內(nèi)部最終調(diào)用的還是Goc方法,只是在Doc方法內(nèi)做了同步邏輯:

  1. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error { 
  2.   ..... 省略部分封裝代碼 
  3.   var errChan chan error 
  4.  if fallback == nil { 
  5.   errChan = GoC(ctx, name, r, nil) 
  6.  } else { 
  7.   errChan = GoC(ctx, name, r, f) 
  8.  } 
  9.  
  10.  select { 
  11.  case <-done: 
  12.   return nil 
  13.  case err := <-errChan: 
  14.   return err 
  15.  } 

因?yàn)樗麄冏罱K都是調(diào)用的Goc方法,所以我們執(zhí)行分析Goc方法的內(nèi)部邏輯;代碼有點(diǎn)長(zhǎng),我們分邏輯來(lái)分析:

創(chuàng)建command對(duì)象

  1. cmd := &command{ 
  2.  run:      run, 
  3.  fallback: fallback, 
  4.  start:    time.Now(), 
  5.  errChan:  make(chan error, 1), 
  6.  finished: make(chan bool, 1), 
  7. // 獲取熔斷器 
  8. circuit, _, err := GetCircuit(name
  9. if err != nil { 
  10.  cmd.errChan <- err 
  11.  return cmd.errChan 

介紹一下command的數(shù)據(jù)結(jié)構(gòu):

  1. type command struct { 
  2.  sync.Mutex 
  3.  
  4.  ticket      *struct{} 
  5.  start       time.Time 
  6.  errChan     chan error 
  7.  finished    chan bool 
  8.  circuit     *CircuitBreaker 
  9.  run         runFuncC 
  10.  fallback    fallbackFuncC 
  11.  runDuration time.Duration 
  12.  events      []string 

字段介紹:

  • ticket:用來(lái)做最大并發(fā)量控制,這個(gè)就是一個(gè)令牌
  • start:記錄command執(zhí)行的開(kāi)始時(shí)間
  • errChan:記錄command執(zhí)行錯(cuò)誤
  • finished:標(biāo)志command執(zhí)行結(jié)束,用來(lái)做協(xié)程同步
  • circuit:存儲(chǔ)熔斷器相關(guān)信息
  • run:應(yīng)用程序
  • fallback:應(yīng)用程序執(zhí)行失敗后要執(zhí)行的函數(shù)
  • runDuration:記錄command執(zhí)行消耗時(shí)間
  • events:events主要是存儲(chǔ)事件類型信息,比如執(zhí)行成功的success,或者失敗的timeout、context_canceled等

上段代碼重點(diǎn)是GetCircuit方法,這一步的目的就是獲取熔斷器,使用動(dòng)態(tài)加載的方式,如果沒(méi)有就創(chuàng)建一個(gè)熔斷器,熔斷器結(jié)構(gòu)如下:

  1. type CircuitBreaker struct { 
  2.  Name                   string 
  3.  open                   bool 
  4.  forceOpen              bool 
  5.  mutex                  *sync.RWMutex 
  6.  openedOrLastTestedTime int64 
  7.  
  8.  executorPool *executorPool 
  9.  metrics      *metricExchange 

解釋一下這幾個(gè)字段:

  • name:熔斷器的名字,其實(shí)就是創(chuàng)建的command名字
  • open:判斷熔斷器是否打開(kāi)的標(biāo)志
  • forceopen:手動(dòng)觸發(fā)熔斷器的開(kāi)關(guān),單元測(cè)試使用
  • mutex:使用讀寫(xiě)鎖保證并發(fā)安全
  • openedOrLastTestedTime:記錄上一次打開(kāi)熔斷器的時(shí)間,因?yàn)橐鶕?jù)這個(gè)時(shí)間和SleepWindow時(shí)間來(lái)做恢復(fù)嘗試
  • executorPool:用來(lái)做流量控制,因?yàn)槲覀冇幸粋€(gè)最大并發(fā)量控制,就是根據(jù)這個(gè)來(lái)做的流量控制,每次請(qǐng)求都要獲取令牌

metrics:用來(lái)上報(bào)執(zhí)行狀態(tài)的事件,通過(guò)它把執(zhí)行狀態(tài)信息存儲(chǔ)到實(shí)際熔斷器執(zhí)行各個(gè)維度狀態(tài) (成功次數(shù),失敗次數(shù),超時(shí)……) 的數(shù)據(jù)集合中。

后面會(huì)單獨(dú)分析executorPool、metrics的實(shí)現(xiàn)邏輯。

定義令牌相關(guān)的方法和變量

因?yàn)槲覀冇幸粋€(gè)條件是最大并發(fā)控制,采用的是令牌的方式進(jìn)行流量控制,每一個(gè)請(qǐng)求都要獲取一個(gè)令牌,使用完畢要把令牌還回去,先看一下這段代碼:

  1. ticketCond := sync.NewCond(cmd) 
  2. ticketChecked := false 
  3. // When the caller extracts error from returned errChan, it's assumed that 
  4. // the ticket's been returned to executorPool. Therefore, returnTicket() can 
  5. // not run after cmd.errorWithFallback(). 
  6. returnTicket := func() { 
  7.  cmd.Lock() 
  8.  // Avoid releasing before a ticket is acquired. 
  9.  for !ticketChecked { 
  10.   ticketCond.Wait() 
  11.  } 
  12.  cmd.circuit.executorPool.Return(cmd.ticket) 
  13.  cmd.Unlock() 

使用sync.NewCond創(chuàng)建一個(gè)條件變量,用來(lái)協(xié)調(diào)通知你可以歸還令牌了。

然后定義一個(gè)返回令牌的方法,調(diào)用Return方法歸還令牌。

定義上報(bào)執(zhí)行事件的方法

前面我們也提到了,我們的熔斷器會(huì)上報(bào)執(zhí)行狀態(tài)的事件,通過(guò)它把執(zhí)行狀態(tài)信息存儲(chǔ)到實(shí)際熔斷器執(zhí)行各個(gè)維度狀態(tài) (成功次數(shù),失敗次數(shù),超時(shí)……) 的數(shù)據(jù)集合中。所以要定義一個(gè)上報(bào)的方法:

  1. reportAllEvent := func() { 
  2.  err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) 
  3.  if err != nil { 
  4.   log.Printf(err.Error()) 
  5.  } 

開(kāi)啟協(xié)程一:執(zhí)行應(yīng)用程序邏輯 - runFunc

協(xié)程一的主要目的就是執(zhí)行應(yīng)用程序邏輯:

  1. go func() { 
  2.   defer func() { cmd.finished <- true }() // 標(biāo)志協(xié)程一的command執(zhí)行結(jié)束,同步到協(xié)程二 
  3.  
  4.   // 當(dāng)最近執(zhí)行的并發(fā)數(shù)量超過(guò)閾值并且錯(cuò)誤率很高時(shí),就會(huì)打開(kāi)熔斷器。  
  5.    // 如果熔斷器打開(kāi),直接拒絕拒絕請(qǐng)求并返回令牌,當(dāng)感覺(jué)健康狀態(tài)恢復(fù)時(shí),熔斷器將允許新的流量。 
  6.   if !cmd.circuit.AllowRequest() { 
  7.    cmd.Lock() 
  8.    // It's safe for another goroutine to go ahead releasing a nil ticket. 
  9.    ticketChecked = true 
  10.    ticketCond.Signal() // 通知釋放ticket信號(hào) 
  11.    cmd.Unlock() 
  12.       // 使用sync.Onece保證只執(zhí)行一次。 
  13.    returnOnce.Do(func() { 
  14.         // 返還令牌 
  15.     returnTicket() 
  16.         // 執(zhí)行fallback邏輯 
  17.     cmd.errorWithFallback(ctx, ErrCircuitOpen) 
  18.         // 上報(bào)狀態(tài)事件 
  19.     reportAllEvent() 
  20.    }) 
  21.    return 
  22.   } 
  23.    // 控制并發(fā) 
  24.   cmd.Lock() 
  25.   select { 
  26.     // 獲取到令牌 
  27.   case cmd.ticket = <-circuit.executorPool.Tickets: 
  28.       // 發(fā)送釋放令牌信號(hào) 
  29.    ticketChecked = true 
  30.    ticketCond.Signal() 
  31.    cmd.Unlock() 
  32.   default
  33.       // 沒(méi)有令牌可用了, 也就是達(dá)到最大并發(fā)數(shù)量則直接處理fallback邏輯 
  34.    ticketChecked = true 
  35.    ticketCond.Signal() 
  36.    cmd.Unlock() 
  37.    returnOnce.Do(func() { 
  38.     returnTicket() 
  39.     cmd.errorWithFallback(ctx, ErrMaxConcurrency) 
  40.     reportAllEvent() 
  41.    }) 
  42.    return 
  43.   } 
  44.   // 執(zhí)行應(yīng)用程序邏輯 
  45.   runStart := time.Now() 
  46.   runErr := run(ctx) 
  47.   returnOnce.Do(func() { 
  48.    defer reportAllEvent() // 狀態(tài)事件上報(bào) 
  49.       // 統(tǒng)計(jì)應(yīng)用程序執(zhí)行時(shí)長(zhǎng) 
  50.    cmd.runDuration = time.Since(runStart) 
  51.       // 返還令牌 
  52.    returnTicket() 
  53.       // 如果應(yīng)用程序執(zhí)行失敗執(zhí)行fallback函數(shù) 
  54.    if runErr != nil { 
  55.     cmd.errorWithFallback(ctx, runErr) 
  56.     return 
  57.    } 
  58.    cmd.reportEvent("success"
  59.   }) 
  60.  }() 

總結(jié)一下這個(gè)協(xié)程:

  • 判斷熔斷器是否打開(kāi),如果打開(kāi)了熔斷器直接進(jìn)行熔斷,不在進(jìn)行后面的請(qǐng)求
  • 運(yùn)行應(yīng)用程序邏輯

開(kāi)啟協(xié)程二:同步協(xié)程一并監(jiān)聽(tīng)錯(cuò)誤

先看代碼:

  1. go func() { 
  2.     //  使用定時(shí)器來(lái)做超時(shí)控制,這個(gè)超時(shí)時(shí)間就是我們配置的,默認(rèn)1000ms 
  3.   timer := time.NewTimer(getSettings(name).Timeout) 
  4.   defer timer.Stop() 
  5.  
  6.   select { 
  7.       // 同步協(xié)程一 
  8.   case <-cmd.finished: 
  9.    // returnOnce has been executed in another goroutine 
  10.        
  11.     // 是否收到context取消信號(hào) 
  12.   case <-ctx.Done(): 
  13.    returnOnce.Do(func() { 
  14.     returnTicket() 
  15.     cmd.errorWithFallback(ctx, ctx.Err()) 
  16.     reportAllEvent() 
  17.    }) 
  18.    return 
  19.     // command執(zhí)行超時(shí)了 
  20.   case <-timer.C: 
  21.    returnOnce.Do(func() { 
  22.     returnTicket() 
  23.     cmd.errorWithFallback(ctx, ErrTimeout) 
  24.     reportAllEvent() 
  25.    }) 
  26.    return 
  27.   } 
  28.  }() 

這個(gè)協(xié)程的邏輯比較清晰明了,目的就是監(jiān)聽(tīng)業(yè)務(wù)執(zhí)行被取消以及超時(shí)。

畫(huà)圖總結(jié)command執(zhí)行流程

上面我們都是通過(guò)代碼來(lái)進(jìn)行分析的,看起來(lái)還是有點(diǎn)亂,最后畫(huà)個(gè)圖總結(jié)一下:

上面我們分析了整個(gè)具體流程,接下來(lái)我們針對(duì)一些核心點(diǎn)就行分析

上報(bào)狀態(tài)事件

hystrix-go為每一個(gè)Command設(shè)置了一個(gè)默認(rèn)統(tǒng)計(jì)控制器,用來(lái)保存熔斷器的所有狀態(tài),包括調(diào)用次數(shù)、失敗次數(shù)、被拒絕次數(shù)等,存儲(chǔ)指標(biāo)結(jié)構(gòu)如下:

  1. type DefaultMetricCollector struct { 
  2.  mutex *sync.RWMutex 
  3.  
  4.  numRequests *rolling.Number 
  5.  errors      *rolling.Number 
  6.  
  7.  successes               *rolling.Number 
  8.  failures                *rolling.Number 
  9.  rejects                 *rolling.Number 
  10.  shortCircuits           *rolling.Number 
  11.  timeouts                *rolling.Number 
  12.  contextCanceled         *rolling.Number 
  13.  contextDeadlineExceeded *rolling.Number 
  14.  
  15.  fallbackSuccesses *rolling.Number 
  16.  fallbackFailures  *rolling.Number 
  17.  totalDuration     *rolling.Timing 
  18.  runDuration       *rolling.Timing 

使用rolling.Number結(jié)構(gòu)保存狀態(tài)指標(biāo),使用rolling.Timing保存時(shí)間指標(biāo)。

最終監(jiān)控上報(bào)都依靠metricExchange來(lái)實(shí)現(xiàn),數(shù)據(jù)結(jié)構(gòu)如下:

  1. type metricExchange struct { 
  2.  Name    string 
  3.  Updates chan *commandExecution 
  4.  Mutex   *sync.RWMutex 
  5.  
  6.  metricCollectors []metricCollector.MetricCollector 

上報(bào)command的信息結(jié)構(gòu):

  1. type commandExecution struct { 
  2.  Types            []string      `json:"types"` // 區(qū)分事件類型,比如success、failure.... 
  3.  Start            time.Time     `json:"start_time"` // command開(kāi)始時(shí)間 
  4.  RunDuration      time.Duration `json:"run_duration"` // command結(jié)束時(shí)間 
  5.  ConcurrencyInUse float64       `json:"concurrency_inuse"` // command 線程池使用率 

說(shuō)了這么多,大家還是有點(diǎn)懵,其實(shí)用一個(gè)類圖就能表明他們之間的關(guān)系:

我們可以看到類mertricExchange提供了一個(gè)Monitor方法,這個(gè)方法主要邏輯就是監(jiān)聽(tīng)狀態(tài)事件,然后寫(xiě)入指標(biāo),所以整個(gè)上報(bào)流程就是這個(gè)樣子:

流量控制

hystrix-go對(duì)流量控制采用的是令牌算法,能得到令牌的就可以執(zhí)行后繼的工作,執(zhí)行完后要返還令牌。結(jié)構(gòu)體executorPool就是hystrix-go 流量控制的具體實(shí)現(xiàn)。字段Max就是每秒最大的并發(fā)值。

  1. type executorPool struct { 
  2.  Name    string 
  3.  Metrics *poolMetrics // 上報(bào)執(zhí)行數(shù)量指標(biāo) 
  4.  Max     int // 最大并發(fā)數(shù)量 
  5.  Tickets chan *struct{} // 代表令牌 

這里還有一個(gè)上報(bào)指標(biāo),這個(gè)又單獨(dú)實(shí)現(xiàn)一套方法用來(lái)統(tǒng)計(jì)執(zhí)行數(shù)量,比如執(zhí)行的總數(shù)量、最大并發(fā)數(shù)等,我們依賴畫(huà)一個(gè)類圖來(lái)表示:

上報(bào)執(zhí)行數(shù)量邏輯與上報(bào)狀態(tài)事件的邏輯是一樣的,使用channel進(jìn)行數(shù)據(jù)通信的,上報(bào)與返還令牌都在Return方法中:

  1. func (p *executorPool) Return(ticket *struct{}) { 
  2.  if ticket == nil { 
  3.   return 
  4.  } 
  5.  
  6.  p.Metrics.Updates <- poolMetricsUpdate{ 
  7.   activeCount: p.ActiveCount(), 
  8.  } 
  9.  p.Tickets <- ticket 

主要邏輯兩步:

  • 上報(bào)當(dāng)前可用的令牌數(shù)
  • 返回令牌

熔斷器

我們最后來(lái)分析熔斷器中一個(gè)比較重要的方法:AllowRequest,我們?cè)趫?zhí)行Command是會(huì)根據(jù)這個(gè)方法來(lái)判斷是否可以執(zhí)行command,接下來(lái)我們就來(lái)看一下這個(gè)判斷的主要邏輯:

  1. func (circuit *CircuitBreaker) AllowRequest() bool { 
  2.  return !circuit.IsOpen() || circuit.allowSingleTest() 

內(nèi)部就是調(diào)用IsOpen()、allowSingleTest這兩個(gè)方法:

  • IsOpen()
  1. func (circuit *CircuitBreaker) IsOpen() bool { 
  2.  circuit.mutex.RLock() 
  3.  o := circuit.forceOpen || circuit.open 
  4.  circuit.mutex.RUnlock() 
  5.  // 熔斷已經(jīng)開(kāi)啟 
  6.  if o { 
  7.   return true 
  8.  } 
  9.  // 判斷10s內(nèi)的并發(fā)數(shù)是否超過(guò)設(shè)置的最大并發(fā)數(shù),沒(méi)有超過(guò)時(shí),不需要開(kāi)啟熔斷器 
  10.  if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold { 
  11.   return false 
  12.  } 
  13.  // 此時(shí)10s內(nèi)的并發(fā)數(shù)已經(jīng)超過(guò)設(shè)置的最大并發(fā)數(shù)了,如果此時(shí)系統(tǒng)錯(cuò)誤率超過(guò)了預(yù)設(shè)值,那就開(kāi)啟熔斷器 
  14.  if !circuit.metrics.IsHealthy(time.Now()) { 
  15.   //  
  16.   circuit.setOpen() 
  17.   return true 
  18.  } 
  19.  
  20.  return false 
  • allowSingleTest()

先解釋一下為什么要有這個(gè)方法,還記得我們之前設(shè)置了一個(gè)熔斷規(guī)則中的SleepWindow嗎,如果在開(kāi)啟熔斷的情況下,在SleepWindow時(shí)間后進(jìn)行嘗試,這個(gè)方法的目的就是干這個(gè)的:

  1. func (circuit *CircuitBreaker) allowSingleTest() bool { 
  2.  circuit.mutex.RLock() 
  3.  defer circuit.mutex.RUnlock() 
  4.   
  5.   // 獲取當(dāng)前時(shí)間戳 
  6.  now := time.Now().UnixNano() 
  7.  openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) 
  8.   // 當(dāng)前熔斷器是開(kāi)啟狀態(tài),當(dāng)前的時(shí)間已經(jīng)大于 (上次開(kāi)啟熔斷器的時(shí)間 +SleepWindow 的時(shí)間) 
  9.  if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { 
  10.     // 替換openedOrLastTestedTime 
  11.   swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) 
  12.   if swapped { 
  13.    log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name
  14.   } 
  15.   return swapped 
  16.  } 

這里只看到了熔斷器被開(kāi)啟的設(shè)置了,但是沒(méi)有關(guān)閉熔斷器的邏輯,因?yàn)殛P(guān)閉熔斷器的邏輯是在上報(bào)狀態(tài)指標(biāo)的方法ReportEvent內(nèi)實(shí)現(xiàn),我們最后再看一下ReportEvent的實(shí)現(xiàn):

  1. func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { 
  2.  if len(eventTypes) == 0 { 
  3.   return fmt.Errorf("no event types sent for metrics"
  4.  } 
  5.   
  6.  circuit.mutex.RLock() 
  7.  o := circuit.open 
  8.  circuit.mutex.RUnlock() 
  9.   // 上報(bào)的狀態(tài)事件是success 并且當(dāng)前熔斷器是開(kāi)啟狀態(tài),則說(shuō)明下游服務(wù)正常了,可以關(guān)閉熔斷器了 
  10.  if eventTypes[0] == "success" && o { 
  11.   circuit.setClose() 
  12.  } 
  13.  
  14.  var concurrencyInUse float64 
  15.  if circuit.executorPool.Max > 0 { 
  16.   concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max
  17.  } 
  18.  
  19.  select { 
  20.     // 上報(bào)狀態(tài)指標(biāo),與上文的monitor呼應(yīng) 
  21.  case circuit.metrics.Updates <- &commandExecution{ 
  22.   Types:            eventTypes, 
  23.   Start:            start, 
  24.   RunDuration:      runDuration, 
  25.   ConcurrencyInUse: concurrencyInUse, 
  26.  }: 
  27.  default
  28.   return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} 
  29.  } 
  30.  
  31.  return nil 

可視化hystrix的上報(bào)信息

通過(guò)上面的分析我們知道hystrix-go上報(bào)了狀態(tài)事件、執(zhí)行數(shù)量事件,那么這些指標(biāo)我們可以怎么查看呢?

設(shè)計(jì)者早就想到了這個(gè)問(wèn)題,所以他們做了一個(gè)dashborad,可以查看hystrix的上報(bào)信息,使用方法只需在服務(wù)啟動(dòng)時(shí)添加如下代碼:

  1. hystrixStreamHandler := hystrix.NewStreamHandler() 
  2. hystrixStreamHandler.Start() 
  3. go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 

然后打開(kāi)瀏覽器:http://127.0.0.1:81/hystrix-dashboard,進(jìn)行觀測(cè)吧。

總結(jié)

故事終于接近尾聲了,一個(gè)熔斷機(jī)制的實(shí)現(xiàn)確實(shí)不簡(jiǎn)單,要考慮的因素也是方方面面,尤其在微服務(wù)架構(gòu)下,熔斷機(jī)制是必不可少的,不僅要在框架層面實(shí)現(xiàn)熔斷機(jī)制,還要根據(jù)具體業(yè)務(wù)場(chǎng)景使用熔斷機(jī)制,這些都是值得我們深思熟慮的。本文介紹的熔斷框架實(shí)現(xiàn)的還是比較完美的,這種優(yōu)秀的設(shè)計(jì)思路值得我們學(xué)習(xí)。

 

文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/hystrix_demo,歡迎star。

 

責(zé)任編輯:武曉燕 來(lái)源: Golang夢(mèng)工廠
相關(guān)推薦

2020-09-26 10:56:33

服務(wù)器熔斷服務(wù)隔離

2022-01-17 10:55:50

微服務(wù)API網(wǎng)關(guān)

2020-07-28 08:32:57

微服務(wù)API網(wǎng)關(guān)熔斷

2025-03-13 00:55:00

微服務(wù)架構(gòu)系統(tǒng)

2017-07-03 09:50:07

Spring Clou微服務(wù)架構(gòu)

2018-12-06 14:56:46

微服務(wù)隔離熔斷

2024-06-05 06:43:20

2020-11-27 10:50:06

微服務(wù)架構(gòu)框架

2017-07-04 17:35:46

微服務(wù)架構(gòu)Spring Clou

2021-03-05 11:09:46

Go框架微服務(wù)

2025-01-20 00:10:00

Go語(yǔ)言Kratos

2023-12-13 07:19:01

微服務(wù)架構(gòu)Golang

2024-06-27 10:50:01

2017-07-17 15:50:17

微服務(wù)Docker架構(gòu)

2021-06-22 18:00:09

微服務(wù)架構(gòu)系統(tǒng)

2024-04-09 07:27:06

微服務(wù)架構(gòu)YAML

2024-12-23 00:22:55

2025-01-13 00:00:07

Go語(yǔ)言微服務(wù)

2022-05-13 09:05:49

Hystrix熔斷器

2024-12-30 00:38:23

Go語(yǔ)言微服務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)