偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Redis5新特性Streams作消息隊(duì)列

數(shù)據(jù)庫 其他數(shù)據(jù)庫 Redis
本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

前言

Redis 5 新特性中,Streams 數(shù)據(jù)結(jié)構(gòu)的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊(duì)列使用時(shí),得到更完善,更強(qiáng)大的原生支持,其中尤為明顯的是持久化消息隊(duì)列。同時(shí),stream 借鑒了 kafka 的消費(fèi)組模型概念和設(shè)計(jì),使消費(fèi)消息處理上更加高效快速。本文就 Streams 數(shù)據(jù)結(jié)構(gòu)中常用 API 進(jìn)行分析。

準(zhǔn)備

本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。

添加消息

Streams 添加數(shù)據(jù)使用 XADD 指令進(jìn)行添加,消息中的數(shù)據(jù)以 K-V 鍵值對的形式進(jìn)行操作。一條消息可以存在多個(gè)鍵值對,添加命令格式: 

  1. XADD key ID field string [field string ...] 

其中 key 為 Streams 的名稱,ID 為消息的唯一標(biāo)志,不可重復(fù),field string 就為鍵值對。下面我們就添加以 person 為名稱的流,進(jìn)行操作。 

  1. XADD person * name ytao des https://ytao.top 

上面添加案例中,ID 使用 * 號復(fù)制,這里代表著服務(wù)端自動(dòng)生成 Id,添加后返回?cái)?shù)據(jù) "1578238486193-0"

這里自動(dòng)生成的 Id 格式為 <millisecondstime>-<sequencenumber> Id 是由兩部分組成:

  1.  millisecondsTime 為當(dāng)前服務(wù)器時(shí)間毫秒時(shí)間戳。
  2.  sequenceNumber 當(dāng)前序列號,取值來源于當(dāng)前毫秒內(nèi),生成消息的順序,默認(rèn)從 0 開始加 1 遞增。

比如:1578238486193-3 表示在 1578238486193 毫秒的時(shí)間戳?xí)r,添加的第 4 條消息。

除了服務(wù)端自動(dòng)生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:

  1.  Id 中的前后部分必須為數(shù)字。
  2.  最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。
  3.  添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否則,當(dāng)不滿足上述條件時(shí),添加后會(huì)拋出異常: 

  1. (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

實(shí)際上,當(dāng)添加一條消息時(shí),會(huì)進(jìn)行兩部操作。第一步,先判斷如果不存在 Streams,則創(chuàng)建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時(shí),由于 Id 異常,也可以在 Redis 中存在以當(dāng)前 Streams 的名稱。 Streams 中 Id 也可作為指針使用,因?yàn)樗且粋€(gè)有序的標(biāo)記。

生產(chǎn)中,如果這樣使用添加消息,會(huì)存在一個(gè)問題,那就是消息數(shù)量太大時(shí),會(huì)使服務(wù)宕機(jī)。這里 Streams 的設(shè)計(jì)初期也有考慮到這個(gè)問題,那就是可以指定 Streams 的容量。如果容量操作這個(gè)設(shè)定的值,就會(huì)對調(diào)舊的消息。在添加消息時(shí),設(shè)置 MAXLEN 參數(shù)。 

  1. XADD person MAXLEN 5 * name ytao des https://ytao.top 

這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多余的消息: 

  1. XTRIM person MAXLEN 8 

消息數(shù)量

查看消息數(shù)量使用 XLEN 指令進(jìn)行操作。 

  1. XLEN key 

例:查看 person 流中的消息數(shù)量: 

  1. > XLEN person  
  2. (integer) 5 

查詢消息

查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。

XRANGE

查詢數(shù)據(jù)時(shí),可以按照指定 Id 范圍進(jìn)行查詢,XRANGE 查詢指令格式: 

  1. XRANGE key start end [COUNT count] 

參數(shù)說明:

  •  key 為 Streams 的名稱
  •  start 為范圍查詢開始 Id,包含本 Id。
  •  start 為范圍查詢結(jié)束 Id,包含本 Id。
  •  Count 為查詢返回最大的消息數(shù)量,非必填。

這里 start 和 end 有-和+兩個(gè)非指定值,他們分別表示無窮小和無窮大,所以當(dāng)使用這個(gè)兩個(gè)值時(shí),會(huì)查詢出全部的消息。 

  1. > XRANGE person - +  
  2. 1) 1) "0-1"  
  3.    2) 1) "name"  
  4.       2) "ytao"  
  5.       3) "des"  
  6.       4) "https://ytao.top"  
  7. 2) 1) "0-2"  
  8.    2) 1) "name"  
  9.       2) "luffy"  
  10.       3) "des"  
  11.       4) "valiant!"  
  12. 3) 1) "2-0"  
  13.    2) 1) "name"  
  14.       2) "gaga"  
  15.       3) "des"  
  16.       4) "fishion!" 

上面查詢的消息數(shù)據(jù),可以看到是按照先進(jìn)先出的順序查詢出來的。

使用 COUNT 指定查詢返回的數(shù)量: 

  1. # 查詢所有的消息,并且返回一條數(shù)據(jù)  
  2. > XRANGE person - + COUNT 1  
  3. 1) 1) "0-1"  
  4.    2) 1) "name"  
  5.       2) "ytao"  
  6.       3) "des"  
  7.       4) "https://ytao.top" 

在范圍查詢中,Id 的后半部分可省略,后半部分中的數(shù)據(jù)會(huì)全部查詢到。

XREVRANGE

XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數(shù)順序進(jìn)行了調(diào)換: 

  1. XREVRANGE key end start [COUNT count] 

使用案例: 

  1. > XREVRANGE person +  -  
  2. 1) 1) "2-0"  
  3.    2) 1) "name"  
  4.       2) "gaga"  
  5.       3) "des"  
  6.       4) "fishion!"  
  7. 2) 1) "0-2"  
  8.    2) 1) "name"  
  9.       2) "luffy"  
  10.       3) "des"  
  11.       4) "valiant!"  
  12. 3) 1) "0-1"  
  13.    2) 1) "name"  
  14.       2) "ytao"  
  15.       3) "des"  
  16.       4) "https://ytao.top" 

查詢后的結(jié)果與 XRANGE 的結(jié)果順序剛好相反,其他都一樣,這兩個(gè)指令可進(jìn)行消息的升序和降序的返回。

刪除消息

刪除消息使用 XDEL 指令操作,只需指定將要?jiǎng)h除的 Streams 名稱和 Id 即可,支持一次刪除多個(gè)消息 。 

  1. XDEL key ID [ID ...] 

刪除案例: 

  1. # 查詢所有消息  
  2. > XRANGE person - +  
  3. 1) 1) "0-1"  
  4.    2) 1) "name"  
  5.       2) "ytao"  
  6.       3) "des"  
  7.       4) "https://ytao.top"  
  8. 2) 1) "0-2"  
  9.    2) 1) "name"  
  10.       2) "luffy"  
  11.       3) "des"  
  12.       4) "valiant!"  
  13. 3) 1) "2-0"  
  14.    2) 1) "name"  
  15.       2) "gaga"  
  16.       3) "des"  
  17.       4) "fishion!"  
  18. # 刪除消息        
  19. > XDEL person 2-0  
  20. (integer) 1  
  21. # 再次查詢刪除后的所有消息  
  22. > XRANGE person - +  
  23. 1) 1) "0-1"  
  24.    2) 1) "name"  
  25.       2) "ytao"  
  26.       3) "des"  
  27.       4) "https://ytao.top"  
  28. 2) 1) "0-2"  
  29.    2) 1) "name"  
  30.       2) "luffy"  
  31.       3) "des"  
  32.       4) "valiant!"  
  33. # 查詢刪除后的長度        
  34. > XLEN person  
  35. (integer) 2        

從上面可以看到,刪除消息后,長度也會(huì)減少相應(yīng)的數(shù)量。

消費(fèi)消息

在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費(fèi)消息,在 Streams 數(shù)據(jù)結(jié)構(gòu)中,同樣也能實(shí)現(xiàn)同等功能,當(dāng)沒有新的消息時(shí),可進(jìn)行阻塞等待。不僅支持單獨(dú)消費(fèi),而且還可以支持群組消費(fèi)。

單獨(dú)消費(fèi)

單獨(dú)消費(fèi)使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 為必填項(xiàng)。ID 表示將要讀取大于該 ID 的消息。當(dāng) ID 值使用 $ 賦予時(shí),表示已存在消息的最大 Id 值。 

  1. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

上面的 COUNT 參數(shù)用來指定讀取的最大數(shù)量,與 XRANGE 的用法一樣。 

  1. > XREAD COUNT 1 STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) 1) 1) "0-1"  
  4.          2) 1) "name"  
  5.             2) "ytao"  
  6.             3) "des"  
  7.             4) "https://ytao.top"  
  8. > XREAD COUNT 2 STREAMS person 0  
  9. 1) 1) "person"  
  10.    2) 1) 1) "0-1"  
  11.          2) 1) "name"  
  12.             2) "ytao"  
  13.             3) "des"  
  14.             4) "https://ytao.top"  
  15.       2) 1) "0-2"  
  16.          2) 1) "name"  
  17.             2) "luffy"  
  18.             3) "des"  
  19.             4) "valiant!" 

在 XREAD 里面還有個(gè) BLOCK 參數(shù),這個(gè)是用來阻塞訂閱消息的,BLOCK 攜帶的參數(shù)為阻塞時(shí)間,單位為毫秒,如果在這個(gè)時(shí)間內(nèi)沒有新的消息消費(fèi),那么就會(huì)釋放該阻塞。當(dāng)這里的時(shí)間指定為 0 時(shí),會(huì)一直阻塞,直到有新的消息來消費(fèi)到。 

  1. # 窗口 1 開啟阻塞,等待新消息的到來  
  2. > XREAD BLOCK 0 STREAMS person $  
  3. # 另開一個(gè)連接窗口 2,添加一條新的消息  
  4. > XADD person 2-2 name tao des coder  
  5. "2-2"  
  6. # 窗口 1,獲取到有新的消息來消費(fèi),并且?guī)в凶枞臅r(shí)間  
  7. > XREAD BLOCK 0 STREAMS person $  
  8. 1) 1) "person"  
  9.    2) 1) 1) "2-2"  
  10.          2) 1) "name"  
  11.             2) "tao"  
  12.             3) "des"  
  13.             4) "coder"  
  14. (60.81s) 

當(dāng)使用 XREAD 進(jìn)行順序消費(fèi)時(shí),需要額外記錄下讀取到位置的 Id,方便下次繼續(xù)消費(fèi)。

群組消費(fèi)

群組消費(fèi)的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達(dá)到這一肝功能需求,我們需要做三件事:創(chuàng)建群組,群組讀取消息,向服務(wù)端確認(rèn)消息以處理。

群組操作

操作群組使用 XGROUP 指令: 

  1. XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] 

上面命令中,包含操作有:

  •  CREATE 創(chuàng)建消費(fèi)組。
  •  SETID 修改下一個(gè)處理消息的 Id。
  •  DESTROY 銷毀消費(fèi)組。
  •  DELCONSUMER 刪除消費(fèi)組中指定的消費(fèi)者。

我們當(dāng)前需要使用的是創(chuàng)建消費(fèi)組: 

  1. # 以當(dāng)前存在的最大 Id 作為消費(fèi)起始   
  2. > XGROUP CREATE person group1 $  
  3. OK 

群組讀取消息

群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個(gè)群組和消費(fèi)者的指定: 

  1. XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 

由于群組消費(fèi)和單獨(dú)消費(fèi)類似,這里只進(jìn)行個(gè)阻塞分析,這里 Id 也有個(gè)特殊值>,表示還未進(jìn)行消費(fèi)的消息: 

  1. # 窗口 1,消費(fèi)群組中,taotao 消費(fèi)者建立阻塞監(jiān)聽  
  2. XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
  3. # 窗口 2,消費(fèi)群組中,yangyang 消費(fèi)者建立阻塞監(jiān)聽   
  4. XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
  5. # 窗口 3,添加消費(fèi)消息  
  6. > XADD person 3-1 name tony des 666  
  7. "3-1"  
  8. # 窗口 1,讀取到新消息,此時(shí) 窗口 2 沒有任何反應(yīng)  
  9. > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >  
  10. 1) 1) "person"  
  11.    2) 1) 1) "3-1"  
  12.          2) 1) "name"  
  13.             2) "tony"  
  14.             3) "des"  
  15.             4) "666"  
  16. (77.54s)  
  17. # 窗口 3,再次添加消費(fèi)消息  
  18. > XADD person 3-2 name james des abc!  
  19. "3-2"  
  20. # 窗口 2,讀取到新消息,此時(shí) 窗口 1 沒有任何反應(yīng)  
  21. > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >  
  22. 1) 1) "person"  
  23.    2) 1) 1) "3-2"  
  24.          2) 1) "name"  
  25.             2) "james"  
  26.             3) "des"  
  27.             4) "abc!"  
  28. (76.36s) 

以上執(zhí)行流程中,group1 群組中有兩個(gè)消費(fèi)者,當(dāng)添加兩條消息后,這兩個(gè)消費(fèi)者輪流消費(fèi)。

消息ACK

消息消費(fèi)后,為避免再次重復(fù)消費(fèi),這是需要向服務(wù)端發(fā)送 ACK,確保消息被消費(fèi)后的標(biāo)記。 例如下列情況,我們上面我們將最新兩條消息已進(jìn)行了消費(fèi),但是當(dāng)我們再次讀取消息時(shí),還是被讀到: 

  1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) 1) 1) "3-2"  
  4.          2) 1) "name"  
  5.             2) "james"  
  6.             3) "des"  
  7.             4) "abc!" 

這時(shí),我們使用 XACK 指令告訴服務(wù)器,我們已處理的消息: 

  1. XACK key group ID [ID ...]0 

讓服務(wù)器標(biāo)記 3-2 已處理: 

  1. > XACK person group1 3-2  
  2. (integer) 1 

再次獲取群組讀取消息: 

  1. >  XREADGROUP GROUP group1 yangyang STREAMS person 0  
  2. 1) 1) "person"  
  3.    2) (empty list or set) 

隊(duì)列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費(fèi)群組信息可使用 XINFO 指令查看,本文不做分析。

總結(jié)

上面對 Streams 常用 API 進(jìn)行了分析,我們可以感受到 Redis 在消息隊(duì)列支持的道路上,也越來越強(qiáng)大。如果使用過它的 PUB/SUB 功能的話,就會(huì)感受到 5.x 迭代正是將你的一些痛點(diǎn)進(jìn)行了優(yōu)化。 

 

責(zé)任編輯:龐桂玉 來源: 中國開源
相關(guān)推薦

2021-01-12 08:43:29

Redis ListStreams

2025-07-01 01:00:00

Spring消息系統(tǒng)Redis

2022-04-12 11:15:31

Redis消息隊(duì)列數(shù)據(jù)庫

2009-06-29 17:42:03

Tapestry5新特

2011-08-30 09:07:30

HTML 5

2018-07-30 08:37:02

數(shù)據(jù)庫Redis數(shù)據(jù)結(jié)構(gòu)

2024-03-22 12:10:39

Redis消息隊(duì)列數(shù)據(jù)庫

2018-12-05 09:00:00

RedisRedis Strea數(shù)據(jù)庫

2024-10-25 08:41:18

消息隊(duì)列RedisList

2011-07-12 13:21:34

2011-11-09 10:05:26

HTML 5

2021-03-06 08:10:16

Redis6 Java架構(gòu)分布式框架

2023-12-30 13:47:48

Redis消息隊(duì)列機(jī)制

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫

2017-10-11 15:08:28

消息隊(duì)列常見

2009-09-25 10:23:51

HTML 5新特性

2014-04-15 15:45:22

Java8Java8教程

2021-07-19 07:55:24

多線程模型Redis

2022-01-15 07:20:18

Redis List 消息隊(duì)列

2022-01-21 19:22:45

RedisList命令
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號