Redis 定長隊列的探索和實踐
一、業(yè)務(wù)背景
從技術(shù)的角度來說,技術(shù)方案的選型都是受限于實際的業(yè)務(wù)場景,都以解決實際業(yè)務(wù)場景為目標。
在我們的實際業(yè)務(wù)場景中,需要以游戲的維度收集和上報行為數(shù)據(jù),考慮數(shù)據(jù)的量級,執(zhí)行盡最大努力交付且允許數(shù)據(jù)的部分丟棄。
數(shù)據(jù)上報支持游戲的維度的批量上報,支持同一款游戲128個行為進行批量上報。
數(shù)據(jù)上報需要時效控制,上報的數(shù)據(jù)必須是上報時刻的前3分鐘的數(shù)據(jù)。
整體數(shù)據(jù)的業(yè)務(wù)形態(tài)如下圖所示:
二、技術(shù)選型
從業(yè)務(wù)的角度來說包含數(shù)據(jù)的收集和數(shù)據(jù)的上報,我們把數(shù)據(jù)的收集比作生產(chǎn)者,數(shù)據(jù)的上報比作消費者,是一個典型的生產(chǎn)消費模型。
生產(chǎn)消費模型在JVM進程內(nèi)部通過隊列+鎖或者無鎖的Disruptor來實現(xiàn),在跨進程場景下通過MQ(RocketMQ/kafka)進行處理解耦。
但是細化到具體業(yè)務(wù)場景來看,消息的消費有諸多限制,包括: 游戲維度的批量行為上報,行為上報的時效限制,細化到各個技術(shù)方案選型進行對比。
方案一
使用RocketMQ 或者Kafaka等消息隊列來存儲上報的消息,但是消費側(cè)需要考慮在業(yè)務(wù)進程中按照游戲維度進行聚合,其中技術(shù)細節(jié)涉及按照游戲維度進行拆分,在滿足消息時效性和批量性的前提下觸發(fā)上報。在這種方案下消息中間件扮演的角色本質(zhì)上消息的中轉(zhuǎn)站, 沒有解決任何業(yè)務(wù)場景中提及的游戲維度拆分、批量性和時效性。
方案二
在方案一的基礎(chǔ)上,尋求一種技術(shù)方案來解決游戲維度的 消息分組、批量消費 、時效性 。通過Redis的list結(jié)構(gòu)來實現(xiàn)隊列(進一步要求實現(xiàn)定長隊列)來解決游戲維度的消息分組;通過Redis的list支持的Lrange來實現(xiàn)批量消費;通過業(yè)務(wù)側(cè)的多線程來解決時效問題,針對高頻的游戲使用單獨的線程池進行處理,上述兩個手段能夠保證消費速度大于生產(chǎn)速度。
方案對比
對比兩種方案后決定使用Redis的實現(xiàn)了一個偽消息中間件:
- 通過List對象實現(xiàn)定長隊列來保存游戲維度的行為消息(以游戲作為key的List對象來保存用戶行為);
- 通過List來保存所有的存在行為數(shù)據(jù)的游戲列表;
- 通過Set來進行去重判斷來保證2中的List對象的唯一性。
整體的技術(shù)方案如下圖所示:
生產(chǎn)過程
步驟一:游戲維度的某行為數(shù)據(jù)PUSH到游戲維度的隊列當中。
步驟二:判斷游戲是否在游戲的集合Set中,如果在就直接返回,如果不在進行步驟三。
步驟三:往游戲列表中PUSH游戲。
消費過程
步驟一:從游戲?qū)ο蟮牧斜碇醒h(huán)取出一款游戲。
步驟二:通過步驟一獲取的游戲?qū)ο笕ピ撚螒驅(qū)ο蟮男袨閿?shù)據(jù)隊列中批量獲取數(shù)據(jù)處理。
三、技術(shù)原理
在Redis的支持命令中,在List和Set的基礎(chǔ)命令,結(jié)合Lua腳本來實現(xiàn)整個技術(shù)方案。
消息數(shù)據(jù)層面,通過單獨的List循環(huán)維護待消費的游戲維度的數(shù)據(jù),每個游戲維度使用定長的List來保存消息。
消息生產(chǎn)過程中,通過結(jié)合List的llen+lpop+rpush來實現(xiàn)游戲維度的定長隊列,保證隊列的長度可控。
消息消費過程中,通過結(jié)合List的lrange+ltrim來實現(xiàn)游戲維度的消息的批量消費。
在整個執(zhí)行的復(fù)雜度層面,需要保證時間復(fù)雜度在0(N)常量維度,保證時間可控。
3.1 Lua 腳本
EVAL script numkeys key [key ] arg [arg ]
時間復(fù)雜度:取決于腳本本身的執(zhí)行的時間復(fù)雜度。
> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"
Redis uses the same Lua interpreter to run all the commands.
Also Redis guarantees that a script is executed in an atomic way:
no other script or Redis command will be executed while a script is being executed.
This semantic is similar to the one of MULTI / EXEC.
From the point of view of all the other clients the effects of a script are either still not visible or already completed.
Redis采用相同的Lua解釋器去運行所有命令,我們可以保證,腳本的執(zhí)行是原子性的。作用就類似于加了MULTI/EXEC。
- Lua 腳本內(nèi)多個命令以原子性的方式執(zhí)行,保證了命令執(zhí)行的線程安全。
- Lua 腳本結(jié)合List命令實現(xiàn)定長隊列,實現(xiàn)批量消費。
- Lua 腳本僅支持單個key的操作,不支持多key的操作。
3.2 List 對象
LLEN key
計算List的長度
時間復(fù)雜度:O(1)。
LPOP key [count]
從List的左側(cè)移除元素
時間復(fù)雜度:O(N),N為移除元素的個數(shù)。
RPUSH key element [element ]
從List的右側(cè)保存元素
時間復(fù)雜度:O(N),N為保存元素的個數(shù)。
- List的基礎(chǔ)命令包括計算List的長度,移除數(shù)據(jù),添加數(shù)據(jù),整體命令的復(fù)雜度都在O(N)的常量時間。
- 整合上述三個命令,我們能保證實現(xiàn)固定長度的隊列,通過判斷隊列長度是否達到定長結(jié)合新增隊列元素和移除隊列元素來完成。
LRANGE key start end
時間復(fù)雜度:O(S+N), S為偏移量start, N為指定區(qū)間內(nèi)元素的數(shù)量。
下標(index)參數(shù) start 和 stop 都以 0 為底,也就是說,以 0 表示列表的第一個元素,以 1 表示列表的第二個元素,以此類推。
你也可以使用負數(shù)下標,以 -1 表示列表的最后一個元素, -2 表示列表的倒數(shù)第二個元素,以此類推。
LTRIM key start stop
時間復(fù)雜度:O(N) where N is the number of elements to be removed by the operation.
修剪(trim)一個已存在的 list,這樣 list 就會只包含指定范圍的指定元素。
- List的基礎(chǔ)命令包括批量返回數(shù)據(jù)和裁剪數(shù)據(jù),整體命令的復(fù)雜度都在O(N)的常量時間。
- 整合上述兩個命令,我們能夠批量消費數(shù)據(jù)并移除隊列數(shù)據(jù),通過LRANGE批量返回數(shù)據(jù)并通過LTRIM保留剩余數(shù)據(jù)。
3.3 Set 對象
SADD key member [member ]
往Set集合添加數(shù)據(jù)。
時間復(fù)雜度:O(1)。
SISMEMBER key member
判斷Set集合是否存在元素。
時間復(fù)雜度:O(1)。
- 通過Set集合來保證數(shù)據(jù)的唯一性,且時間復(fù)雜度可控。
四、技術(shù)應(yīng)用
4.1 生產(chǎn)消息
定義LUA腳本
CACHE_NPPA_EVENT_LUA =
"local retVal = 0 " +
"local key = KEYS[1] " +
"local num = tonumber(ARGV[1]) " +
"local val = ARGV[2] " +
"local expire = tonumber(ARGV[3]) " +
"if (redis.call('llen', key) < num) then redis.call('rpush', key, val) " +
"else redis.call('lpop', key) redis.call('rpush', key, val) retVal = 1 end " +
"redis.call('expire', key, expire) return retVal";
執(zhí)行LUA腳本
String data = JSON.toJSONString(nppaBehavior);
Long retVal = (Long)jedisClusterTemplate.eval(CACHE_NPPA_EVENT_LUA, 1, NPPA_PREFIX + nppaBehavior.getGamePackage(), String.valueOf(MAX_GAME_EVENT_PER_GAME), data, String.valueOf(NPPA_TTL_MINUTE * 60));
執(zhí)行效果
實現(xiàn)固長隊列的數(shù)據(jù)存儲并設(shè)置過期時間
- 通過整合llen+rpush+lpop三個命令實現(xiàn)定長隊列。
- 通過lua腳本保證上述命令的原子性執(zhí)行。
- 整體的執(zhí)行流程如上圖所示,核心理念通過lua腳本的原子性保證了隊列長度計算(llen)、隊列數(shù)據(jù)移除(lpop)、隊列數(shù)據(jù)保存(rpush)的原子性執(zhí)行。
4.2 消費消息
定義LUA腳本
QUERY_NPPA_EVENT_LUA =
"local data = {} " +
"local key = KEYS[1] " +
"local num = tonumber(ARGV[1]) " +
"data = redis.call('lrange', key, 0, num) redis.call('ltrim', key, num+1, -1) return data";
執(zhí)行LUA腳本
Integer batchSize = NppaConfigUtils.getInteger("nppa.report.batch.size", 1);
Object result = jedisClusterTemplate.eval(QUERY_NPPA_EVENT_LUA, 1,NPPA_PREFIX + gamePackage, String.valueOf(batchSize));
執(zhí)行效果
取固定數(shù)量的對象,然后保留隊列的剩余的消息對象。
- 通過整合lrange+ltrim兩個命令實現(xiàn)消息的批量消費。
- 通過lua腳本保證上述命令的原子性執(zhí)行。
- 整體的執(zhí)行流程如上圖所示,核心理念通過lua腳本的原子性保證了數(shù)據(jù)獲取(Lrange)和數(shù)據(jù)裁剪(Ltrim)的原子性執(zhí)行。
- 整體的消費流程選擇pull模式,通過多線程循環(huán)輪詢可消費的隊列進行消費。與借助于redis的pub/sub的通知機制實現(xiàn)消費流程的push模式相比,pull模式成本更低效果更佳。
4.3 注意事項
- Redis集群模式下,執(zhí)行Lua腳本建議傳單key,多key會報重定向錯誤。
- 在不同的Redis版本下,Lua腳本針對null的返回值處理不同,參考官方文檔。
- 消費者的消費過程中通過循環(huán)遍歷游戲列表,然后根據(jù)游戲去獲取對應(yīng)的消息對象,但是不同的游戲?qū)?yīng)的熱度不同,所以在消費端我們通過配置的方式為熱門游戲單獨開啟消費線程進行消費,相當于針對不同游戲配置不同優(yōu)先級的消費者。
五、線上效果
- 生產(chǎn)和消費的QPS約為1w qps左右,整體上報QPS通過批量上報后會遠低于生產(chǎn)的消息生產(chǎn)和消費的QPS。
- 整體數(shù)據(jù)的使用游戲包名作為key進行存儲,性能上不存在熱點的問題。
六、適用場景
在描述完方案的原理和實現(xiàn)細節(jié)之后,進一步對適用的業(yè)務(wù)場景進行下總結(jié)。整體方案是基于redis的基本數(shù)據(jù)結(jié)構(gòu)構(gòu)建一個偽消息隊列,用以解決 消息的單個生產(chǎn)批量消費 的場景,通過多key形式實現(xiàn)消息隊列的多Topic模式,重要的是能夠借助于redis的原生能力在O(N)的時間復(fù)雜度完成批量消費。另外該方案也可以降級作為實現(xiàn)先進先出定長的日志隊列。
七、總結(jié)
本文主要探索在特定業(yè)務(wù)場景下通過Redis的原生命令實現(xiàn)類MQ的功能,創(chuàng)新式的通過Lua腳本組合Redis的List的基礎(chǔ)命令,實現(xiàn)了消息的分組,消息的定長隊列,消息的批量消費功能;整體解決方案在線上環(huán)境落地并平穩(wěn)運行,為特定場景提供了一種通用的解決方案。