Redis5新特性Streams作消息隊(duì)列
前言
Redis 5 新特性中,Streams 數(shù)據(jù)結(jié)構(gòu)的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊(duì)列使用時(shí),得到更完善,更強(qiáng)大的原生支持,其中尤為明顯的是持久化消息隊(duì)列。同時(shí),stream 借鑒了 kafka 的消費(fèi)組模型概念和設(shè)計(jì),使消費(fèi)消息處理上更加高效快速。本文就 Streams 數(shù)據(jù)結(jié)構(gòu)中常用 API 進(jìn)行分析。
準(zhǔn)備
本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。
添加消息
Streams 添加數(shù)據(jù)使用 XADD 指令進(jìn)行添加,消息中的數(shù)據(jù)以 K-V 鍵值對的形式進(jìn)行操作。一條消息可以存在多個(gè)鍵值對,添加命令格式:
- XADD key ID field string [field string ...]
其中 key 為 Streams 的名稱,ID 為消息的唯一標(biāo)志,不可重復(fù),field string 就為鍵值對。下面我們就添加以 person 為名稱的流,進(jìn)行操作。
- XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 號復(fù)制,這里代表著服務(wù)端自動(dòng)生成 Id,添加后返回?cái)?shù)據(jù) "1578238486193-0"
這里自動(dòng)生成的 Id 格式為 <millisecondstime>-<sequencenumber> Id 是由兩部分組成:
- millisecondsTime 為當(dāng)前服務(wù)器時(shí)間毫秒時(shí)間戳。
- sequenceNumber 當(dāng)前序列號,取值來源于當(dāng)前毫秒內(nèi),生成消息的順序,默認(rèn)從 0 開始加 1 遞增。
比如:1578238486193-3 表示在 1578238486193 毫秒的時(shí)間戳?xí)r,添加的第 4 條消息。
除了服務(wù)端自動(dòng)生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:
- Id 中的前后部分必須為數(shù)字。
- 最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。
- 添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。
否則,當(dāng)不滿足上述條件時(shí),添加后會(huì)拋出異常:
- (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
實(shí)際上,當(dāng)添加一條消息時(shí),會(huì)進(jìn)行兩部操作。第一步,先判斷如果不存在 Streams,則創(chuàng)建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時(shí),由于 Id 異常,也可以在 Redis 中存在以當(dāng)前 Streams 的名稱。 Streams 中 Id 也可作為指針使用,因?yàn)樗且粋€(gè)有序的標(biāo)記。
生產(chǎn)中,如果這樣使用添加消息,會(huì)存在一個(gè)問題,那就是消息數(shù)量太大時(shí),會(huì)使服務(wù)宕機(jī)。這里 Streams 的設(shè)計(jì)初期也有考慮到這個(gè)問題,那就是可以指定 Streams 的容量。如果容量操作這個(gè)設(shè)定的值,就會(huì)對調(diào)舊的消息。在添加消息時(shí),設(shè)置 MAXLEN 參數(shù)。
- XADD person MAXLEN 5 * name ytao des https://ytao.top
這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多余的消息:
- XTRIM person MAXLEN 8
消息數(shù)量
查看消息數(shù)量使用 XLEN 指令進(jìn)行操作。
- XLEN key
例:查看 person 流中的消息數(shù)量:
- > XLEN person
- (integer) 5
查詢消息
查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
XRANGE
查詢數(shù)據(jù)時(shí),可以按照指定 Id 范圍進(jìn)行查詢,XRANGE 查詢指令格式:
- XRANGE key start end [COUNT count]
參數(shù)說明:
- key 為 Streams 的名稱
- start 為范圍查詢開始 Id,包含本 Id。
- start 為范圍查詢結(jié)束 Id,包含本 Id。
- Count 為查詢返回最大的消息數(shù)量,非必填。
這里 start 和 end 有-和+兩個(gè)非指定值,他們分別表示無窮小和無窮大,所以當(dāng)使用這個(gè)兩個(gè)值時(shí),會(huì)查詢出全部的消息。
- > XRANGE person - +
- 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
- 2) 1) "0-2"
- 2) 1) "name"
- 2) "luffy"
- 3) "des"
- 4) "valiant!"
- 3) 1) "2-0"
- 2) 1) "name"
- 2) "gaga"
- 3) "des"
- 4) "fishion!"
上面查詢的消息數(shù)據(jù),可以看到是按照先進(jìn)先出的順序查詢出來的。
使用 COUNT 指定查詢返回的數(shù)量:
- # 查詢所有的消息,并且返回一條數(shù)據(jù)
- > XRANGE person - + COUNT 1
- 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
在范圍查詢中,Id 的后半部分可省略,后半部分中的數(shù)據(jù)會(huì)全部查詢到。
XREVRANGE
XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數(shù)順序進(jìn)行了調(diào)換:
- XREVRANGE key end start [COUNT count]
使用案例:
- > XREVRANGE person + -
- 1) 1) "2-0"
- 2) 1) "name"
- 2) "gaga"
- 3) "des"
- 4) "fishion!"
- 2) 1) "0-2"
- 2) 1) "name"
- 2) "luffy"
- 3) "des"
- 4) "valiant!"
- 3) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
查詢后的結(jié)果與 XRANGE 的結(jié)果順序剛好相反,其他都一樣,這兩個(gè)指令可進(jìn)行消息的升序和降序的返回。
刪除消息
刪除消息使用 XDEL 指令操作,只需指定將要?jiǎng)h除的 Streams 名稱和 Id 即可,支持一次刪除多個(gè)消息 。
- XDEL key ID [ID ...]
刪除案例:
- # 查詢所有消息
- > XRANGE person - +
- 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
- 2) 1) "0-2"
- 2) 1) "name"
- 2) "luffy"
- 3) "des"
- 4) "valiant!"
- 3) 1) "2-0"
- 2) 1) "name"
- 2) "gaga"
- 3) "des"
- 4) "fishion!"
- # 刪除消息
- > XDEL person 2-0
- (integer) 1
- # 再次查詢刪除后的所有消息
- > XRANGE person - +
- 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
- 2) 1) "0-2"
- 2) 1) "name"
- 2) "luffy"
- 3) "des"
- 4) "valiant!"
- # 查詢刪除后的長度
- > XLEN person
- (integer) 2
從上面可以看到,刪除消息后,長度也會(huì)減少相應(yīng)的數(shù)量。
消費(fèi)消息
在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費(fèi)消息,在 Streams 數(shù)據(jù)結(jié)構(gòu)中,同樣也能實(shí)現(xiàn)同等功能,當(dāng)沒有新的消息時(shí),可進(jìn)行阻塞等待。不僅支持單獨(dú)消費(fèi),而且還可以支持群組消費(fèi)。
單獨(dú)消費(fèi)
單獨(dú)消費(fèi)使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 為必填項(xiàng)。ID 表示將要讀取大于該 ID 的消息。當(dāng) ID 值使用 $ 賦予時(shí),表示已存在消息的最大 Id 值。
- XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT 參數(shù)用來指定讀取的最大數(shù)量,與 XRANGE 的用法一樣。
- > XREAD COUNT 1 STREAMS person 0
- 1) 1) "person"
- 2) 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
- > XREAD COUNT 2 STREAMS person 0
- 1) 1) "person"
- 2) 1) 1) "0-1"
- 2) 1) "name"
- 2) "ytao"
- 3) "des"
- 4) "https://ytao.top"
- 2) 1) "0-2"
- 2) 1) "name"
- 2) "luffy"
- 3) "des"
- 4) "valiant!"
在 XREAD 里面還有個(gè) BLOCK 參數(shù),這個(gè)是用來阻塞訂閱消息的,BLOCK 攜帶的參數(shù)為阻塞時(shí)間,單位為毫秒,如果在這個(gè)時(shí)間內(nèi)沒有新的消息消費(fèi),那么就會(huì)釋放該阻塞。當(dāng)這里的時(shí)間指定為 0 時(shí),會(huì)一直阻塞,直到有新的消息來消費(fèi)到。
- # 窗口 1 開啟阻塞,等待新消息的到來
- > XREAD BLOCK 0 STREAMS person $
- # 另開一個(gè)連接窗口 2,添加一條新的消息
- > XADD person 2-2 name tao des coder
- "2-2"
- # 窗口 1,獲取到有新的消息來消費(fèi),并且?guī)в凶枞臅r(shí)間
- > XREAD BLOCK 0 STREAMS person $
- 1) 1) "person"
- 2) 1) 1) "2-2"
- 2) 1) "name"
- 2) "tao"
- 3) "des"
- 4) "coder"
- (60.81s)
當(dāng)使用 XREAD 進(jìn)行順序消費(fèi)時(shí),需要額外記錄下讀取到位置的 Id,方便下次繼續(xù)消費(fèi)。
群組消費(fèi)
群組消費(fèi)的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達(dá)到這一肝功能需求,我們需要做三件事:創(chuàng)建群組,群組讀取消息,向服務(wù)端確認(rèn)消息以處理。
群組操作
操作群組使用 XGROUP 指令:
- XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操作有:
- CREATE 創(chuàng)建消費(fèi)組。
- SETID 修改下一個(gè)處理消息的 Id。
- DESTROY 銷毀消費(fèi)組。
- DELCONSUMER 刪除消費(fèi)組中指定的消費(fèi)者。
我們當(dāng)前需要使用的是創(chuàng)建消費(fèi)組:
- # 以當(dāng)前存在的最大 Id 作為消費(fèi)起始
- > XGROUP CREATE person group1 $
- OK
群組讀取消息
群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個(gè)群組和消費(fèi)者的指定:
- XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
由于群組消費(fèi)和單獨(dú)消費(fèi)類似,這里只進(jìn)行個(gè)阻塞分析,這里 Id 也有個(gè)特殊值>,表示還未進(jìn)行消費(fèi)的消息:
- # 窗口 1,消費(fèi)群組中,taotao 消費(fèi)者建立阻塞監(jiān)聽
- XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
- # 窗口 2,消費(fèi)群組中,yangyang 消費(fèi)者建立阻塞監(jiān)聽
- XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
- # 窗口 3,添加消費(fèi)消息
- > XADD person 3-1 name tony des 666
- "3-1"
- # 窗口 1,讀取到新消息,此時(shí) 窗口 2 沒有任何反應(yīng)
- > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
- 1) 1) "person"
- 2) 1) 1) "3-1"
- 2) 1) "name"
- 2) "tony"
- 3) "des"
- 4) "666"
- (77.54s)
- # 窗口 3,再次添加消費(fèi)消息
- > XADD person 3-2 name james des abc!
- "3-2"
- # 窗口 2,讀取到新消息,此時(shí) 窗口 1 沒有任何反應(yīng)
- > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
- 1) 1) "person"
- 2) 1) 1) "3-2"
- 2) 1) "name"
- 2) "james"
- 3) "des"
- 4) "abc!"
- (76.36s)
以上執(zhí)行流程中,group1 群組中有兩個(gè)消費(fèi)者,當(dāng)添加兩條消息后,這兩個(gè)消費(fèi)者輪流消費(fèi)。
消息ACK
消息消費(fèi)后,為避免再次重復(fù)消費(fèi),這是需要向服務(wù)端發(fā)送 ACK,確保消息被消費(fèi)后的標(biāo)記。 例如下列情況,我們上面我們將最新兩條消息已進(jìn)行了消費(fèi),但是當(dāng)我們再次讀取消息時(shí),還是被讀到:
- > XREADGROUP GROUP group1 yangyang STREAMS person 0
- 1) 1) "person"
- 2) 1) 1) "3-2"
- 2) 1) "name"
- 2) "james"
- 3) "des"
- 4) "abc!"
這時(shí),我們使用 XACK 指令告訴服務(wù)器,我們已處理的消息:
- XACK key group ID [ID ...]0
讓服務(wù)器標(biāo)記 3-2 已處理:
- > XACK person group1 3-2
- (integer) 1
再次獲取群組讀取消息:
- > XREADGROUP GROUP group1 yangyang STREAMS person 0
- 1) 1) "person"
- 2) (empty list or set)
隊(duì)列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費(fèi)群組信息可使用 XINFO 指令查看,本文不做分析。
總結(jié)
上面對 Streams 常用 API 進(jìn)行了分析,我們可以感受到 Redis 在消息隊(duì)列支持的道路上,也越來越強(qiáng)大。如果使用過它的 PUB/SUB 功能的話,就會(huì)感受到 5.x 迭代正是將你的一些痛點(diǎn)進(jìn)行了優(yōu)化。