偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

別再用 Redis List 實現(xiàn)消息隊列了,Stream 專為隊列而生

數(shù)據(jù)庫 Redis
Stream 是 Redis 5.0 引入的一種專門為消息隊列設(shè)計的數(shù)據(jù)類型,Stream 是一個包含 0 個或者多個元素的有序隊列,這些元素根據(jù) ID 的大小進(jìn)行有序排列。

上回說到使用 Redis 的 List 實現(xiàn)消息隊列有很多局限性,比如:

  • 沒有良好的 ACK 機(jī)制;
  • 沒有 ConsumerGroup 消費(fèi)組概念;
  • 消息堆積。
  • List 是線性結(jié)構(gòu),想要查詢指定數(shù)據(jù)需要遍歷整個列表;

Stream 是 Redis 5.0 引入的一種專門為消息隊列設(shè)計的數(shù)據(jù)類型,Stream 是一個包含 0 個或者多個元素的有序隊列,這些元素根據(jù) ID 的大小進(jìn)行有序排列。

它實現(xiàn)了大部分消息隊列的功能:

  • 消息 ID 系列化生成;
  • 消息遍歷;
  • 消息的阻塞和非阻塞讀;
  • Consumer Groups 消費(fèi)組;
  • ACK 確認(rèn)機(jī)制。
  • 支持多播。

提供了很多消息隊列操作命令,并且借鑒 Kafka 的 Consumer Groups 的概念,提供了消費(fèi)組功能。

同時提供了消息的持久化和主從復(fù)制機(jī)制,客戶端可以訪問任何時刻的數(shù)據(jù),并且能記住每一個客戶端的訪問位置,從而保證消息不丟失。

廢話少說,先來看下如何使用,官網(wǎng)文檔詳見:https://redis.io/topics/streams-intro

XADD:插入消息

「云嵐宗眾弟子聽命,擊殺蕭炎!」

當(dāng)云山最后一字落下,那彌漫的緊繃氣氛,頓時宣告破碎,懸浮半空的眾多云嵐宗長老背后雙翼一振,便是咻咻的劃過天際,追殺蕭炎。

云山使用以下指令向隊列中插入「追殺蕭炎」命令,讓長老帶領(lǐng)子弟去執(zhí)行。

XADD 云嵐宗 * task kill name 蕭炎
"1645936602161-0"

Stream 中的每個元素由鍵值對的形式組成,不同元素可以包含不同數(shù)量的鍵值對。

該命令的語法如下:

XADD streamName id field value [field value ...]

消息隊列名稱后面的 「*」 ,表示讓 Redis 為插入的消息自動生成唯一 ID,當(dāng)然也可以自己定義。

消息 ID 由兩部分組成:

  • 當(dāng)前毫秒內(nèi)的時間戳;
  • 順序編號。從 0 為起始值,用于區(qū)分同一時間內(nèi)產(chǎn)生的多個命令。

通過將元素 ID 與時間進(jìn)行關(guān)聯(lián),并強(qiáng)制要求新元素的 ID 必須大于舊元素的 ID, Redis 從邏輯上將流變成了一種只執(zhí)行追加操作(append only)的數(shù)據(jù)結(jié)構(gòu)。

這種特性對于使用流實現(xiàn)消息隊列和事件系統(tǒng)的用戶來說是非常重要的:

用戶可以確信,新的消息和事件只會出現(xiàn)在已有消息和事件之后,就像現(xiàn)實世界里新事件總是發(fā)生在已有事件之后一樣,一切都是有序進(jìn)行的。

XREAD:讀取消息

云凌老狗使用如下指令接收云山的命令:

XREAD COUNT 1 BLOCK 0 STREAMS 云嵐宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) "蕭炎" # 蕭炎

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

該指令可以同時對多個流進(jìn)行讀取,每個心法對應(yīng)含義如下:

  • COUNT:表示每個流中最多讀取的元素個數(shù);
  • BLOCK:阻塞讀取,當(dāng)消息隊列沒有消息的時候,則阻塞等待, 0 表示無限等待,單位是毫秒。
  • ID:消息 ID,在讀取消息的時候可以指定 ID,并從這個 ID 的下一條消息開始讀取,0-0 則表示從第一個元素開始讀取。

如果想使用 XREAD 進(jìn)行順序消費(fèi),每次讀取后要記住返回的消息 ID,下次調(diào)用 XREAD 就將上一次返回的消息 ID 作為參數(shù)傳遞到下一次調(diào)用就可以繼續(xù)消費(fèi)后續(xù)的消息了。

云韻宗主,我今天剛到云嵐宗,歷史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻開始通過 XADD 發(fā)布的消息要咋整?

運(yùn)行「」心法即可,心法的最后「」符號表示讀取最新的阻塞消息,讀取不到則一直死等。

等待過程中,其他長老向隊列追加消息,則會立即讀取到。

XREAD COUNT 1 BLOCK 0 STREAMS 云嵐宗 $

這么容易就實現(xiàn)消息隊列了么?說好的 ACK 機(jī)制呢?

這里只是開胃菜,通過 XREAD 讀取的數(shù)據(jù)其實并沒有被刪除,當(dāng)重新執(zhí)行 XREAD COUNT 2 BLOCK 0 STREAMS 云嵐宗 0-0 指令的時候又會重新讀取到。

所以我們還需要 ACK 機(jī)制,

接下來,我們來一個真正的消息隊列。

ConsumerGroup

Redis Stream 的 ConsumerGroup(消費(fèi)者組)允許用戶將一個流從邏輯上劃分為多個不同的流,并讓 ConsumerGroup 的消費(fèi)者去處理。

它是一個強(qiáng)大的支持多播的可持久化的消息隊列。Redis Stream 借鑒了 Kafka 的設(shè)計。

Stream 的高可用是建立主從復(fù)制基礎(chǔ)上的,它和其它數(shù)據(jù)結(jié)構(gòu)的復(fù)制機(jī)制沒有區(qū)別,也就是說在 Sentinel 和 Cluster 集群環(huán)境下 Stream 是可以支持高可用的。

Redis-Stream

  • Redis Stream 的結(jié)構(gòu)如上圖所示。有一個消息鏈表,每個消息都有一個唯一的 ID 和對應(yīng)的內(nèi)容;
  • 消息持久化;
  • 每個消費(fèi)組的狀態(tài)是獨(dú)立的,不不影響,同一份的 Stream 消息會被所有的消費(fèi)組消費(fèi);
  • 一個消費(fèi)組可以由多個消費(fèi)者組成,消費(fèi)者之間是競爭關(guān)系,任意一個消費(fèi)者讀取了消息都會使 last_deliverd_id 往前移動;
  • 每個消費(fèi)者有一個 pending_ids 變量,用于記錄當(dāng)前消費(fèi)者讀取了但是還沒 ack 的消息。它用來保證消息至少被客戶端消費(fèi)了一次。

消費(fèi)組實現(xiàn)的消息隊列主要涉及以下三個指令:

  • XGROUP用于創(chuàng)建、銷毀和管理消費(fèi)者組。
  • XREADGROUP通過消費(fèi)組從流中讀取數(shù)據(jù)。
  • XACK是允許消費(fèi)者將待處理消息標(biāo)記為已正確處理的命令。

創(chuàng)建消費(fèi)組

Stream 通過 XGROUP CREATE 指令創(chuàng)建消費(fèi)組 (Consumer Group),需要傳遞起始消息 ID 參數(shù)用來初始化 last_delivered_id 變量。

我們使用 XADD 往 bossStream 隊列插入一些消息:

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40

如下指令,為消息隊列名為 bossStream 創(chuàng)建「青龍門」和「六扇門」兩個消費(fèi)組。

# 語法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龍門 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇門 0-0 MKSTREAM
  • stream:指定隊列的名字;
  • group:指定消費(fèi)組名字;
  • start_id:指定消費(fèi)組在 Stream 中的起始 ID,它決定了消費(fèi)者組從哪個 ID 之后開始讀取消息,0-0 從第一條開始讀取, $ 表示從最后一條向后開始讀取,只接收新消息。
  • MKSTREAM:默認(rèn)情況下,XGROUP CREATE命令在目標(biāo)流不存在時返回錯誤??梢允褂每蛇xMKSTREAM子命令作為 之后的最后一個參數(shù)來自動創(chuàng)建流。

讀取消息

讓「青龍門」消費(fèi)組的 consumer1 從bossStream 阻塞讀取一條消息:

XREADGROUP GROUP 青龍門 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"

語法如下:

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]

[] 內(nèi)的表示可選參數(shù),該命令與 XREAD 大同小異,區(qū)別在于新增 GROUP groupName consumerName 選項。

該選項的兩個參數(shù)分別用于指定被讀取的消費(fèi)者組以及負(fù)責(zé)處理消息的消費(fèi)者。

其中:

  • >:命令的最后參數(shù) >,表示從尚未被消費(fèi)的消息開始讀取;
  • BLOCK:阻塞讀取;

敲黑板了

如果消息隊列中的消息被消費(fèi)組的一個消費(fèi)者消費(fèi)了,這條消息就不會再被這個消費(fèi)組的其他消費(fèi)者讀取到。

比如 consumer2 執(zhí)行讀取操作:

XREADGROUP GROUP 青龍門 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"

consumer2 不能再讀取到 zhangsan 了,而是讀取下一條 lisi 因為這條消息已經(jīng)被 consumer1 讀取了。

使用消費(fèi)者的另一個目的可以讓組內(nèi)的多個消費(fèi)者分擔(dān)讀取消息,也就是每個消費(fèi)者讀取部分消息,從而實現(xiàn)均衡負(fù)載。

比如一個消費(fèi)組有三個消費(fèi)者 C1、C2、C3 和一個包含消息 1、2、3、4、5、6、7 的流:

XPENDING 查看已讀未確認(rèn)消息

為了保證消費(fèi)者在消費(fèi)的時候發(fā)生故障或者宕機(jī)重啟后依然可以讀取消息,Stream 內(nèi)部有一個隊列(pending List)保存每個消費(fèi)者讀取但是還沒有執(zhí)行 ACK 的消息。

如果消費(fèi)者使用了 XREADGROUP GROUP groupName consumerName 讀取消息,但是沒有給 Stream 發(fā)送 XACK 命令,消息依然保留。

比如查看 bossStream 中的 消費(fèi)組「青龍門」中各個消費(fèi)者已讀取未確認(rèn)的消息信息:

XPENDING bossStream 青龍門
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"

1)未確認(rèn)消息條數(shù);

2) ~ 3)青龍門中所有消費(fèi)者讀取的消息最小和最大 ID;

查看 consumer1讀取了哪些數(shù)據(jù),使用以下命令:

XPENDING bossStream 青龍門 - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1

ACK 確認(rèn)

所以當(dāng)接收到消息并且消費(fèi)成功以后,我們需要手動 ACK 通知 Streams,這條消息就會被刪除了。命令如下:

XACK bossStream 青龍門 1645957821396-0 1645957838700-0
(integer) 2

語法如下:

XACK key group-key ID [ID ...]

消費(fèi)確認(rèn)增加了消息的可靠性,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成,整個流程的執(zhí)行如下圖所示:

Stream 整體流程

使用 Redisson 實戰(zhàn)

使用 maven 添加依賴

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>

添加 Redis 配置,碼哥的 Redis 沒有配置密碼,大家根據(jù)實際情況配置即可。

spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false
@Slf4j
@Service
public class QueueService {
@Autowired
private RedissonClient redissonClient;
/**
* 發(fā)送消息到隊列
*
* @param message
*/
public void sendMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}
/**
* 消費(fèi)者消費(fèi)消息
*
* @param message
*/
public void consumerMessage(String message) {
RStream<String, String> stream = redissonClient.getStream("sensor#4921");
stream.createGroup("sensors_data", StreamMessageId.ALL);
Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
Map<String, String> msg = entry.getValue();
System.out.println(msg);
stream.ack("sensors_data", entry.getKey());
}
}
}

本文轉(zhuǎn)載自微信公眾號「碼哥字節(jié)」

責(zé)任編輯:姜華 來源: 碼哥字節(jié)
相關(guān)推薦

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊列

2024-04-19 08:32:07

Redis緩存數(shù)據(jù)庫

2024-10-25 08:41:18

消息隊列RedisList

2024-03-22 12:10:39

Redis消息隊列數(shù)據(jù)庫

2023-12-30 13:47:48

Redis消息隊列機(jī)制

2022-04-12 11:15:31

Redis消息隊列數(shù)據(jù)庫

2017-10-11 15:08:28

消息隊列常見

2024-09-11 14:57:00

Redis消費(fèi)線程模型

2023-09-12 14:58:00

Redis

2021-01-12 08:43:29

Redis ListStreams

2021-03-01 23:31:48

隊列實現(xiàn)棧存儲

2025-06-27 10:41:04

Redis數(shù)據(jù)庫集群

2022-08-11 08:03:43

隊列

2024-11-14 11:56:45

2024-09-12 14:50:08

2022-06-28 08:37:07

分布式服務(wù)器WebSocket

2024-05-08 14:49:22

Redis延遲隊列業(yè)務(wù)

2010-04-21 12:39:48

Unix 消息隊列

2009-12-07 09:23:05

點贊
收藏

51CTO技術(shù)棧公眾號