偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

RPC框架編寫實踐-RPC常見限流方法的實現(xiàn)

開發(fā) 架構(gòu)
在微服務(wù)中, 雖然服務(wù)間的調(diào)用都是可信的, 但是服務(wù)端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發(fā)生。此外, 使用不同的限流規(guī)則還能根據(jù)系統(tǒng)間不同服務(wù)的請求進行限制, 解決某個函數(shù)被頻繁調(diào)用而拖垮整個系統(tǒng)的問題。

前記

在微服務(wù)中, 雖然服務(wù)間的調(diào)用都是可信的, 但是服務(wù)端也需要堤防一些流量, 防止被意外的流量擊垮, 而通過限流可以防止問題的發(fā)生。此外, 使用不同的限流規(guī)則還能根據(jù)系統(tǒng)間不同服務(wù)的請求進行限制, 解決某個函數(shù)被頻繁調(diào)用而拖垮整個系統(tǒng)的問題。

NOTE: 雖然本文是在編寫RPC框架有感而發(fā), 但是也適用于常見的Web服務(wù)等有流量進出的場景。

最新修訂見閱讀原文

1 限流的簡介

1.1 限流的作用和場景

對于后端服務(wù)來說, 他們提供的服務(wù)都有一個極限的QPS(除代碼邏輯外,也跟機器配置有關(guān)), 當服務(wù)端的壓力超過這個極限值的時候, 服務(wù)端的響應(yīng)性能就會快速的下降, 然后無法提供服務(wù), 所以服務(wù)端需要一個類似于可以限制請求數(shù)的功能, 使服務(wù)端能犧牲掉部分請求, 保證還能處理一定量的請求, 防止服務(wù)端出現(xiàn)壓力瓶頸,無法處理所有請求。

不過這個功能還需要盡量的智能, 在設(shè)計時可以根據(jù)流量場景不同來做有差別的限制, 使其在不影響其它請求的情況下, 實現(xiàn)部分請求的網(wǎng)絡(luò)流量整形, 達到減少系統(tǒng)資源消耗的效果, 常見的幾種需要做差別限制的場景如下:

場景 可能造成的影響 限流的作用
總體的API有大量的并發(fā)調(diào)用, 導致系統(tǒng)QPS超過設(shè)計值 機器可能會扛不住, 造成系統(tǒng)崩潰 減少進入業(yè)務(wù)的流量, 保證QPS被限制在某個合理值, 其它請求會被丟棄
某個API耗時比較長, 其它API的QPS位于合理范圍內(nèi) 由于API耗時較長, 該API的調(diào)用次數(shù)變多的情況下, 會明顯消耗系統(tǒng)資源, 同時也可能造成數(shù)據(jù)競爭的情況 針對性的限制耗時API, 防止該API引起系統(tǒng)崩潰
總體API的QPS位于合理范圍內(nèi), 但是有部分參數(shù)會引起較大的系統(tǒng)資源消耗 比如某個篩選參數(shù)造成查全表的情況, 此時可能造成數(shù)據(jù)庫處理能力下降,進而造成后端服務(wù)無響應(yīng) 針對性的根據(jù)耗時API的參數(shù)進行限制限制, 防止該API引起系統(tǒng)崩潰
總體API的QPS位于合理范圍內(nèi), 但某個API的某個參數(shù)被大多數(shù)人調(diào)用, 導致整個API無法提供服務(wù), 比如微博的話題功能, 如果有個爆炸性話題, 這個話題就會成為熱點參數(shù) 造成整個API無法使用, 嚴重時會造成整個服務(wù)不可用 通過對熱點參數(shù)的限制, 保證其它功能能正常使用

1.2 限流的組件

通過上述場景可以看到, 在這些場景中限流的作用是差不多的, 一般只涉及到兩個維度:

  • 時間:對某個時間窗口進行限流
  • 資源:針對某個API或者某個API的參數(shù)進行限流,達到保護后方對應(yīng)的資源。

限流可以保證在某段時間內(nèi)的某個資源的請求數(shù)量不會超過設(shè)計值, 達到保護系統(tǒng)的作用, 不過不同場景主要差別是限制的資源維度不一樣, 資源維度的變化從總體服務(wù)到某個API到某個API的某個參數(shù), 資源維度越來越細, 而這個資源維度區(qū)分也就是我們要實現(xiàn)限流的第一步--流量匹配, 只要流量匹配了, 限流系統(tǒng)就可以開始工作了, 一般的限流系統(tǒng)流程圖如下(其中他服務(wù)核心代表微服務(wù)核心):

限流

流程圖中第一步是規(guī)則匹配, 它會通過一個函數(shù)把流量提取出來, 當做Key, 這個Key等于某個資源, 然后判斷這個Key是否匹配到規(guī)則, 如果命中規(guī)則就開始執(zhí)行規(guī)則并結(jié)合這段規(guī)則和限流算法來判斷該流量是否限流, 如果限流就丟棄或者等待, 如果沒被限流, 就直接放行。

此外, 流程圖的最下層有一個很大的Backend, 它可以用來存儲規(guī)則以及存儲一些限流相關(guān)的計算變量。其中,限流相關(guān)的計算變量都是跟時間相關(guān)的, 且每次都要進行讀寫, 最好的情況是放在內(nèi)存之中,不過它不能跟請求綁定在一起, 因為跟當前請求的生命周期不一樣, 不能在發(fā)送請求結(jié)束后就把變量回收了, 這些變量也需要有個容器可以存儲, 供不同的請求讀寫, 但是在一個集群服務(wù)中, 每個機器都只存儲自己的計算變量則會導致多臺機器沒辦法共享數(shù)據(jù)而造成限流失敗。

比如針對某個用戶可以調(diào)用某個API的規(guī)則是一秒內(nèi)可以請求十次, 目前有十臺機器, 他們不會互相共享自己的限流計算變量, 那么在最壞的情況下, 用戶可以在1秒內(nèi)訪問100次請求而不被限流, 這樣是達不了限流的效果的, 所以限流必定是一個中心化的應(yīng)用。目前兩個比較主流的限流方案分別是網(wǎng)關(guān)限流和中間件限流, 網(wǎng)關(guān)限流場景下所有入站流量都會經(jīng)過網(wǎng)關(guān)這個單體, 然后由網(wǎng)關(guān)決定是否放行;而中間件限流則是把計算變量都存在某個中間件存儲中, 然后每個服務(wù)的限流組件都可以從中間件實時寫入和讀取數(shù)據(jù), 其中最常用的中間件是Redis, 因為Redis的速度快, 能讓限流組件很快的判斷是否需要限流, 對機器的性能開銷占比也不是很多, 同時Redis支持的數(shù)據(jù)結(jié)構(gòu)和功能非常的多, 我們可以很容易的基于它來實現(xiàn)不同的限流算法。

至于限流的規(guī)則, 由于它只要寫入一次, 后面都是以讀為主, 所以在網(wǎng)關(guān)場景下都存在于內(nèi)存之中, 但在中間件場景下規(guī)則都是存在一個集中式存儲中, 如Etcd, 然后每個服務(wù)會同步集中式存儲的規(guī)則, 并寫入到自己的內(nèi)存中。

在實際的落地要選擇網(wǎng)關(guān)限流還是中間件限流主要還是取決于是服務(wù)的應(yīng)用場景, 比如接口外層都有加一層網(wǎng)關(guān), 那采用網(wǎng)關(guān)限流即可, 如果是內(nèi)部服務(wù)或者該服務(wù)的通信協(xié)議是自定義的, 則采用中間件方式, 有比較強的自定義性。

在使用Redis下的某些情況下(取決于搭建方式), 有可能造成數(shù)據(jù)不準的情況, 但是限流的頻率是允許有些許誤差的, 比如限流的規(guī)則是1秒可以訪問100次, 但在某些時候只實現(xiàn)了1秒訪問110次也是沒太大關(guān)系的。

1.3 限流算法

上面所說的都是一些簡單的概念, 而限流的核心是在于限流算法的實現(xiàn), 常見的限流算法有以下幾種(由于大多數(shù)都限流backend默認是Redis, 所以以可以在Redis運行的lua代碼示例):

1.3.1 固定窗口

固定窗口的原理比較簡單,就是將時間切分成若干個時間片,每個時間片內(nèi)固定處理若干個請求。比如限流規(guī)則是10秒內(nèi)最多處理5個請求, 那么就會有一個容器來統(tǒng)計這10秒內(nèi)的請求數(shù), 如果容器的統(tǒng)計數(shù)量大于5, 那么后續(xù)的請求都會被拒絕, 然后每隔10秒重置這個容器的統(tǒng)計。這種實現(xiàn)非常簡單, 但不是非常嚴謹, 假如限制規(guī)則是1秒限制100個, 但在最壞的情況下, 在第一個窗口的0.5秒后到第二個窗口的0.5秒前的這個時間點共計會放行200個請求, 所以固定窗口只適用于一些要求不嚴格的場景, 通過下圖的左圖可以看到限流的流程, 通過下圖的右圖可以看到整個限流曲線不平滑。

固定窗口

固定窗口的實現(xiàn)很簡單, 在Redis中的lua代碼如下:

  1. -- keys為傳入的命令, 其中keys[1]為限流的key  
  2. -- argv為傳入的參數(shù), ARGV[1]為窗口限制量, ARGV[2]為窗口時間  
  3. local count 
  4. local limit = ARGV[1] 
  5. count = redis.call("incr",KEYS[1]) 
  6. if tonumber(count) == 1 then 
  7.     -- 返回1代表是第一個, 該key剛被創(chuàng)建, 需要設(shè)置過期時間 
  8.     redis.call("expire",KEYS[1], ARGV[2]]) 
  9. end 
  10. return count<limit  

1.3.2 滑動窗口

滑動窗口是固定窗口的改進方法, 他是通過增加窗口數(shù)量使限流算法更順滑, 本身從一個窗口變?yōu)橐粋€先進先出的隊列, 隊列的內(nèi)容是更加精細的窗口,比如原來是10秒一個窗口, 現(xiàn)在會改為1秒一個窗口, 然后每隔一秒鐘滑動一個窗口。只寫入最新的窗口而讀取判斷時都是取最近10個窗口, 這樣就可以通過減小粒度來讓限流算法更加精細, 可以看到波動幅度會變小(取決于精細程度):

滑動窗口的實現(xiàn)也是很簡單的, 具體見:RPC框架編寫實踐--服務(wù)治理的基石, 在Redis可以采用Zset數(shù)據(jù)結(jié)構(gòu)進行實現(xiàn), 這里就不做代碼示例了?;瑒哟翱谑菭奚欢ǖ膬?nèi)存來讓限流變得平滑,窗口數(shù)量越多, 限流速率越精細, 占用的內(nèi)存就越大, 同時獲取數(shù)據(jù)時都是獲取一批窗口的數(shù)據(jù), 相比于固定窗口來說,它的時間復雜度也會跟著變多(O(k))。

1.3.3 漏桶

漏桶的出現(xiàn)可以完美的解決參差不齊的速率限制問題, 漏桶算法的核心原理是進入漏桶的請求量不限制, 但能漏出去的速率請求是恒定的, 這樣就能完美的控制請求的速率, 如果桶滿了, 在漏桶里的請求就會溢出去, 達到丟棄請求的目的, 如下圖, 整個請求的速率都是很平滑的, 沒有多少毛尖:

從圖中可以看到, 漏桶的原理很像一個FIFO隊列, 然后有個定時器會以恒定的速率把請求取出來, 使用Python代碼實現(xiàn)如下:

  1. import asyncio 
  2.  
  3. # 假設(shè)容量只有10 
  4. import time 
  5.  
  6. leaky_bucket: asyncio.Queue = asyncio.Queue(10) 
  7. loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() 
  8.  
  9.  
  10. async def demo_request(cnt: int) -> None: 
  11.     """模仿請求""" 
  12.     msg: str = f"I'm mock request:{cnt}" 
  13.     future: asyncio.Future = asyncio.Future() 
  14.     try: 
  15.         leaky_bucket.put_nowait(future) 
  16.     except asyncio.QueueFull: 
  17.         # 代表桶滿了, 溢出來, 該請求要提前拋棄 
  18.         print(f"Fail Request:{msg}"
  19.     else
  20.         # 等待放行 
  21.         await future 
  22.         print(time.time(), msg) 
  23.  
  24.  
  25. def timer() -> None: 
  26.     """定時放行請求""" 
  27.     try: 
  28.         # 放行該請求 
  29.         future: asyncio.Future = leaky_bucket.get_nowait() 
  30.         future.set_result(True
  31.     except asyncio.QueueEmpty: 
  32.         pass 
  33.     # 一秒執(zhí)行一次 
  34.     loop.call_later(1, timer) 
  35.  
  36. timer() 
  37. # 模擬并發(fā)12個請求 
  38. loop.run_until_complete(asyncio.gather(*[demo_request(i) for i in range(12)])) 

但是, 這樣實現(xiàn)的漏桶算法依然需要占用一些空間用來存儲等待放行的請求, 直到放行才被釋放。為了解決空間占用的問題, 可以采用GCRA算法, 它從另外一個角度看起來跟漏桶算法很像(GRRA應(yīng)該被認為是計量器實現(xiàn)的漏桶版本, 而不是上面所說的隊列形漏桶), 但很省空間占用, 因為無論漏桶多大, 它的空間占用都是恒定的, 只需要存漏水速率(可以認為是該時間段可以放行的請求量)以及桶目前的容量即可。

使用GCRA算法之所以能這樣省空間, 主要還是它是基于虛擬調(diào)度實現(xiàn)的, 它只需要存一個漏水速率,然后每次有請求進來時判斷現(xiàn)在可否可以漏水, 如果可以就放行, 如果不可以則判斷桶是否滿, 滿則拋棄請求, 沒滿則讓請求等待, 直到可以放行為止。常見的GCRA限流實現(xiàn)一般都考慮使用redis-cell, 它的使用方法如下:

  1. # 第一個參數(shù)為命令, 第二個參數(shù)是要限流的key, 第三個參數(shù)是桶容量, 第四第五綜合起來為漏桶速率, 第五個為每次漏多少 
  2.  
  3. # 第一次請求放行, 可以發(fā)現(xiàn)容量變多了一個 
  4. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  5. 1) "0"     # 0表示允許, 1表示拒絕 
  6. 2) "3"     # 漏桶容量(會比輸入的多1) 
  7. 3) "2"     # 漏斗剩余空間 
  8. 4) "-1"    # 如果拒絕, 需要多長時間后重試, 單位秒 
  9. 5) "10"    # 多少時間后,漏桶完全空了, 單位秒 
  10.  
  11. # 第二次請求被放行, 但是漏斗已經(jīng)被占了一個空位 
  12. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  13. 1) "0" 
  14. 2) "3" 
  15. 3) "1" 
  16. 4) "-1" 
  17. 5) "18" 
  18. # 第三次請求被放行, 但是漏斗已經(jīng)被占了兩個個空位 
  19. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  20. 1) "0" 
  21. 2) "3" 
  22. 3) "0" 
  23. 4) "-1" 
  24. 5) "27" 
  25. # 第四次請求不被放行, 但是漏斗沒有空位了 
  26. 127.0.0.1:6379> CL.THROTTLE demo_leaky_bucket 2 1 10 1 
  27. 1) "1" 
  28. 2) "3" 
  29. 3) "0" 
  30. 4) "6" 
  31. 5) "26" 

從命令可以看出, 即使漏斗中還有數(shù)據(jù)沒漏出去, 返回值得第一個也還是0, 表示放行, 這樣并不是一個完善的GCRA。為了實現(xiàn)一個完備的GCRA, 我們需要額外的在代碼判斷漏桶是否全空, 如果放行且桶不是全空, 則需要在代碼判斷多久后才能會為空, 這個時間也就是請求的等待放行時間, 在等待這段時間后才能放行請求, 如果不是放行, 則直接丟棄請求即可。

不過, 如果不做任何判斷, 直接使用返回值的第一個值來判斷是否放行請求, 那這個實現(xiàn)就很像下面所說的令牌桶的實現(xiàn)。

1.3.4 令牌桶

漏桶能很好的控制速率, 使其變得平滑, 但是它沒辦法應(yīng)對突發(fā)流量, 比如我們把規(guī)則定義為10秒內(nèi)可以請求10次, 對于漏桶來說, 它會控制為1秒放行一個請求, 如果同時收到10個請求時它則會分開10秒放行每個請求。然而10秒內(nèi)可以請求10次的含義是10秒內(nèi)總共可以請求10次, 也就是允許在這10秒內(nèi)的某個瞬間同時放行10個請求, 對于這個問題可以使用令牌桶來解決, 令牌桶和漏桶很像, 只是漏桶控制的是請求, 令牌桶控制的是令牌發(fā)放速度。

令牌桶算法規(guī)定每個請求需要從桶里拿到并消耗一個令牌才可以放行, 拿不到則會被拋棄, 同時令牌桶本身會以恒定的速率產(chǎn)生令牌, 直到桶滿為止, 這樣就可以保證限流的平緩, 同時又能應(yīng)對突發(fā)請求, 令牌桶的原理圖和限流曲線圖如下, 其中限流曲線圖表示初始時桶里面放滿了令牌, 所以放行的請求很多, 隨著令牌被逐漸消耗并消耗光了, 限流的曲率會穩(wěn)定在一條線上, 也就是令牌的生產(chǎn)速率:

同樣的, 在實現(xiàn)令牌桶時為了減少空間的占用, 也會使用虛擬調(diào)度方法, 只存一個時間和容量到內(nèi)存中, 每次收到請求時都會根據(jù)請求的時間和在內(nèi)存中的時間差值再乘以速率計算這段時間應(yīng)該產(chǎn)生的令牌數(shù)量并存到內(nèi)存中, 然后再判斷是否有足夠的令牌來判斷是否放行請求, 具體的Redis lua代碼實現(xiàn)如下:

  1. local key = KEYS[1] -- key 
  2. local current_time = redis.call('TIME')[1] -- redis時間戳 
  3. local interval_per_token = tonumber(ARGV[1]) --每個單位產(chǎn)生多少個token 
  4. local max_token = tonumber(ARGV[2]) -- 桶最大的量 
  5. local init_token = tonumber(ARGV[3]) -- 桶初始量 
  6. local tokens 
  7. -- 上次請求時保留的桶數(shù)據(jù) 
  8. local bucket = redis.call("hmget"key"last_time""last_token"
  9. local last_time= bucket[1] 
  10. local last_token = bucket[2] 
  11. if last_time == false or last_token == false then 
  12.     -- 如果沒數(shù)據(jù), 則代表該資源是第一次訪問, 進行初始化 
  13.     tokens = init_token 
  14.     redis.call('hset'key'last_time'current_time
  15. else 
  16.     -- 算出間隔時間 
  17.     local this_interval = current_time - tonumber(last_time) 
  18.     if this_interval > 1 then 
  19.         -- 算出該時間應(yīng)該產(chǎn)生的令牌 
  20.         local tokens_to_add = math.floor(this_interval * interval_per_token) 
  21.         -- 算出真實可以擁有的令牌 
  22.         tokens = math.min(last_token + tokens_to_add, max_token) 
  23.         -- 保存數(shù)據(jù) 
  24.         redis.call('hset'key'last_time'current_time
  25.     else 
  26.         tokens = tonumber(last_token) 
  27.     end 
  28. end 
  29. if tokens < 1 then 
  30.     -- 令牌不夠消費 
  31.     redis.call('hset'key'last_token', tokens) 
  32.     return -1 
  33. else 
  34.     -- 消費令牌并返回令牌數(shù), 代表可以消費 
  35.     tokens = tokens - 1 
  36.     redis.call('hset'key'last_token', tokens) 
  37.     return tokens 
  38. end 

2.具體實現(xiàn)

上面說完了算法實現(xiàn)后, 接下來來看看該如何結(jié)合算法進行實現(xiàn), 由于代碼會隨時更新,具體源碼更新見:https://github.com/so1n/rap/tree/master/rap/server/plugin/processor/limit

項目的代碼結(jié)構(gòu)如下, 在常見的后端服務(wù)中需要占用空間少, 然后速度盡量快點限流組件, 所以一般只用漏桶或者令牌桶且基于Redis的實現(xiàn), 這里就不會去實現(xiàn)窗口相關(guān)的限流了:

  1. ├── backend  # 算法 
  2. │   ├── base.py  # 封裝的協(xié)議 
  3. │   └── redis.py  # 基于redis當做banckend的算法實現(xiàn) 
  4. ├── core.py  # 核心判斷代碼, 實際上是一個中間流量處理 
  5. ├── rule.py  # 規(guī)則聲明 
  6. └── util.py  # 其它小代碼 

首先是rule.py里的規(guī)則類, 它主要是聲明了限流速率, 初始化token數(shù)量, 最多的tokens數(shù)量以及停用時間, 其中停用時間是用來防止惡意用戶頻繁刷新, 它的邏輯是當漏桶已經(jīng)滿了或者令牌桶沒有令牌的時候, 限流組件會在停用時間內(nèi)不再提供服務(wù)。

然后就是backend.base.py, 它是一個限流算法的統(tǒng)一封裝, 代碼如下:

  1. from typing import Any, Coroutine, Union 
  2.  
  3. from rap.server.plugin.processor.limit.rule import Rule 
  4.  
  5.  
  6. class BaseLimitBackend(object): 
  7.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  8.         raise NotImplementedError 
  9.  
  10.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  11.         raise NotImplementedError 

這個類它聲明了兩個方法, 一個是can_request, 它會根據(jù)算法來判斷是否放行, 如果需要等待, 則會在這個方法里進行等待, 直到到時間后才返回放行標記, 其中can_request還內(nèi)嵌了一個block_time的邏輯;另外一個是expected_time, 用來獲取下次可用的時間, 具體的實現(xiàn)以RedisCellBackend為例子, 它是一個子類。

它的最上層實現(xiàn)是BaseLimitBackend, 然后就是繼承于BaseLimitBackend的BaseRedisBackend, 這個組件Redis限流算法的基礎(chǔ)實現(xiàn), 主要是實現(xiàn)了一個停用時間的邏輯, 當發(fā)現(xiàn)不放行請求的時候, 會啟用停用邏輯, 以停用后續(xù)相同key的請求:

  1. class BaseRedisBackend(BaseLimitBackend, ABC): 
  2.     def __init__(self, redis: Union[StrictRedis, StrictRedisCluster]): 
  3.         # 初始化Redis模塊 
  4.         self._redis: "Union[StrictRedis, StrictRedisCluster]" = redis 
  5.  
  6.     async def _block_time_handle(self, key: str, ruleRule, func: Callable[..., Awaitable[bool]]) -> bool: 
  7.         """處理block_time邏輯""" 
  8.         block_time_key: str = f"{key}:block_time" 
  9.         bucket_block_time: Optional[int] = rule.block_time 
  10.  
  11.         if bucket_block_time is not None and await self._redis.exists(block_time_key): 
  12.             # 啟用block_time邏輯, 且key已經(jīng)存在, 那么直接返回False告訴該請求應(yīng)該被拒絕  
  13.             return False 
  14.  
  15.         # 執(zhí)行正真的判斷是否限流邏輯 
  16.         can_requests: bool = await func() 
  17.  
  18.         if not can_requests and bucket_block_time is not None: 
  19.             # 啟用block_time邏輯且被限流時, 正式啟用block time邏輯  
  20.             await self._redis.set(block_time_key, bucket_block_time, ex=bucket_block_time) 
  21.  
  22.         return can_requests 

接著就是繼承于BaseRedisBackend的BaseRedisCellBackend, 它主要是提供一個命令調(diào)用的封裝以及獲取還有多久后才能請求的封裝:

  1. class BaseRedisCellBackend(BaseRedisBackend): 
  2.     ""
  3.     use redis-cell module 
  4.     learn more:https://github.com/brandur/redis-cell 
  5.  
  6.     input: CL.THROTTLE user123 15 30 60 1 
  7.         # param  |  desc 
  8.         # user123 key 
  9.         # 15 maxburst 
  10.         # 30 token 
  11.         # 60 seconds 
  12.         # 1 apply 1token 
  13.     output
  14.         1) (integer) 0        # is allowed 
  15.         2) (integer) 16       # total bucket num 
  16.         3) (integer) 15       # the remaining limit of the key
  17.         4) (integer) -1       # the number of seconds until the user should retry, 
  18.                               #   and always -1 if the action was allowed. 
  19.         5) (integer) 2        # The number of seconds until the limit will reset to its maximum capacity 
  20.     ""
  21.  
  22.     async def _call_cell(self, key: str, ruleRule, token_num: int = 1) -> List[int]: 
  23.         """調(diào)用redis_cell""" 
  24.         result: List[int] = await self._redis.execute_command( 
  25.             "CL.THROTTLE"keyrule.max_token - 1, rule.gen_token, int(rule.total_second), token_num 
  26.         ) 
  27.         return result 
  28.  
  29.     def expected_time(self, key: str, ruleRule) -> Union[float, Coroutine[AnyAnyfloat]]: 
  30.         """獲取下次可請求時間""" 
  31.         async def _expected_time() -> float
  32.             block_time_key: str = key + ":block_time" 
  33.             block_time = await self._redis.get(block_time_key) 
  34.             if block_time: 
  35.                 return await self._redis.ttl(block_time_key) 
  36.  
  37.             result: List[int] = await self._call_cell(keyrule, 0) 
  38.             return float(max(result[3], 0)) 
  39.  
  40.         return _expected_time() 

最后就是真正的對外使用的限流組件實現(xiàn), 這個實現(xiàn)是基于漏桶算法的, 它繼承于BaseRedisCellBackend(另外一個繼承于BaseRedisCellBackend的實現(xiàn)是基于令牌桶算法的, 可以通過源碼了解), 可以看到非常的簡單, 本質(zhì)上是基于redis-cell的返回判斷是否放行。

  1. class RedisCellBackend(BaseRedisCellBackend): 
  2.  
  3.     def can_requests(self, key: str, ruleRule, token_num: int = 1) -> Union[bool, Coroutine[AnyAny, bool]]: 
  4.         """通過redis-cell判斷是否可以請求,以及是否需要休眠等待, 如果需要則休眠固定的時間后再放行""" 
  5.         async def _can_requests() -> bool: 
  6.             result: List[int] = await self._call_cell(keyrule, token_num) 
  7.             can_requests: bool = result[0] == 0 
  8.             if can_requests and result[4]: 
  9.                 await asyncio.sleep(result[4]) 
  10.             return can_requests 
  11.  
  12.         return self._block_time_handle(keyrule, _can_requests) 

了解完算法的實現(xiàn)后, 接下來就是核心的判斷邏輯, 具體見注釋:

  1. class LimitProcessor(BaseProcessor): 
  2.     def __init__(self, backend: BaseLimitBackend, rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]]): 
  3.         """初始化規(guī)則和算法邏輯, 這里的規(guī)則之所以是使用傳參的方式是僅供參考, 后續(xù)整個框架的配置都會抽離成一個config, 供其它組件調(diào)用""" 
  4.         self._backend: BaseLimitBackend = backend 
  5.         self._rule_list: List[Tuple[RULE_FUNC_TYPE, Rule]] = rule_list 
  6.  
  7.     async def process_request(self, request: Request) -> Request: 
  8.         # not limit client event 
  9.         if request.msg_type == constant.CLIENT_EVENT: 
  10.             # 屏蔽event請求 
  11.             return request 
  12.  
  13.         for func, rule in self._rule_list: 
  14.             # 獲取該請求的key 
  15.             if inspect.iscoroutinefunction(func): 
  16.                 key, is_ignore_limit = await func(request)  # type: ignore 
  17.             else
  18.                 key, is_ignore_limit = func(request) 
  19.             if is_ignore_limit: 
  20.                 # 如果該請求不應(yīng)該限流, 直接跳過限流邏輯 
  21.                 return request 
  22.             if key
  23.                 # 匹配到key, 進入限流邏輯 
  24.                 break 
  25.         else
  26.             raise TooManyRequest() 
  27.  
  28.         # 通過backend判斷是否限流 
  29.         key = f"rap:processor:{self.__class__.__name__}:{key}" 
  30.         can_requests: Union[bool, Awaitable[bool]] = self._backend.can_requests(keyrule
  31.         if inspect.isawaitable(can_requests): 
  32.             can_requests = await can_requests  # type: ignore 
  33.         if not can_requests: 
  34.             # 如果被限流, 返回異常, 并告知要何時后才可以再次請求 
  35.             expected_time: Union[float, Awaitable[float]] = self._backend.expected_time(keyrule
  36.             if inspect.isawaitable(expected_time): 
  37.                 expected_time = await expected_time  # type: ignore 
  38.             raise TooManyRequest(extra_msg=f"expected time: {expected_time}"
  39.         return request 

至此, 整個限流邏輯實現(xiàn)完畢, 本章內(nèi)容完。

3.其它碎碎念

3.1.熱點參數(shù)實現(xiàn)

由于大部分的限流實現(xiàn)的backend都只要依賴于Redis, 所以代碼倉里面只有Redis一種類型的backend,但是有一些限流實現(xiàn)需要依賴于一些特殊的backend,比如熱點參數(shù)限流, 還有蜜罐之類的場景。

以熱點參數(shù)限流場景為例子, 熱點參數(shù)是一個寫大于讀的應(yīng)用場景, 而且跟時間強相關(guān), 所以選用時序數(shù)據(jù)庫做backend,之前選用過Graphite當做backend, 具體實現(xiàn)如圖后端服務(wù)會把每次請求參數(shù)都記錄到時序數(shù)據(jù)庫中, 并使用一個定時腳本每隔一段時間把最近的熱點參數(shù)數(shù)據(jù)拉取到緩存中, 供后端服務(wù)的限流組件判斷是否該放行。其中, 這個間隔一般控制在1秒左右, 所以這是一個近實時的實現(xiàn), 具體的實現(xiàn)圖如下:

當請求進來的時候, 限流中間件會通過異步的方法把數(shù)據(jù)記錄到時序數(shù)據(jù)庫中, 比如一個請求為http://127.0.0.1:80?q=1&b=2,中間件就會發(fā)送一個以{prefix}.hot_param.b=2&b=2為key, value為1的標準Statsd的count類型數(shù)據(jù)到Statsd組件中。

這個Key采用標準的Statsd命令, 以.分割有三個值, 第一個是前綴它與業(yè)務(wù)相關(guān), 如業(yè)務(wù)名, 函數(shù)名,namespace等等; 第二個是代表是熱點參數(shù)的業(yè)務(wù);第三個是參數(shù)Key, 這里以&為分割號, 然后按Key順序排序,重新拼接為一個字符串, 這樣即使請求時順序不一致也能識別到時同種請求。

Statsd組件收到了數(shù)據(jù)后會自行進行統(tǒng)計, 統(tǒng)計一個時間區(qū)間都數(shù)據(jù)并寫入到Graphite中, 然后通過定時腳本使用Graphite API拉取統(tǒng)計次數(shù)大于條件的數(shù)據(jù)寫入到Redis緩存中, 其中Statsd組件的時間區(qū)間和定時腳本的定時時間都會控制在一秒左右, 所以這是一個近實時的實現(xiàn), 在計算性能消耗和實現(xiàn)效果直接做取舍。

數(shù)據(jù)寫入到Redis后, 這個寫入數(shù)據(jù)和統(tǒng)計的異步流程就結(jié)束了, 中間件在記錄數(shù)據(jù)后, 會通過Redis判斷是否是熱點參數(shù), 并根據(jù)規(guī)則判斷是否放行, 到了這里就跟上面的限流流程差不多了。

3.2.限流算法拓展

 

限流算法不止是用于算法, 也可以用于別的地方,比如有一些游戲活動,體力值滿時為5, 然后玩家每次出發(fā)活動會減少1個體力值, 然后可以使用限流算法每隔一個固定時間則增加一個體力值等等。在遇到業(yè)務(wù)需求有跟時間相關(guān)且像上述所說的體力值會恢復的情況時, 可以往限流算法思考。

 

責任編輯:武曉燕 來源: 博海拾貝diary
相關(guān)推薦

2022-08-15 08:01:35

微服務(wù)框架RPC

2012-10-10 09:14:50

PHPRPCPHP框架

2023-01-18 08:32:13

2014-09-02 10:43:45

RedisRPC

2024-01-02 12:17:44

Go傳統(tǒng)遠程

2012-11-23 14:26:40

IBMdW

2022-02-14 21:17:21

RPC框架協(xié)議

2011-04-06 09:39:49

mysql5存儲

2011-02-17 09:45:40

云計算RPC框架

2021-04-21 08:01:31

Googleprotobuf嵌入式系統(tǒng)

2024-07-02 10:40:35

2021-11-15 14:02:27

RPCSpringBootRabbitMQ

2017-01-13 10:51:13

RPC模型解析

2022-01-10 17:18:26

框架 RPC架構(gòu)

2018-08-02 15:24:05

RPCJava微服務(wù)

2020-11-02 08:19:18

RPC框架Java

2020-10-20 17:35:42

srpcRPC語言

2023-03-06 07:28:57

RPC框架序列化

2024-05-31 08:45:24

2019-08-21 08:44:52

RPC框架Java
點贊
收藏

51CTO技術(shù)棧公眾號