偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

兩個實驗讓我徹底弄懂了「訂閱關(guān)系一致」

開發(fā) 前端
RocketMQ 4.X 源碼實現(xiàn)就是為了和消費組的定義保持一致 ,假如訂閱關(guān)系不一致,那么代碼執(zhí)行邏輯就會出現(xiàn)混亂。

這篇文章,筆者想聊聊 RocketMQ 最佳實踐之一:保證訂閱關(guān)系一致。

訂閱關(guān)系一致指的是同一個消費者 Group ID 下所有 Consumer 實例所訂閱的 Topic 、Tag 必須完全一致。

如果訂閱關(guān)系不一致,消息消費的邏輯就會混亂,甚至導(dǎo)致消息丟失。

1 訂閱關(guān)系演示

首先我們展示正確的訂閱關(guān)系:多個 Group ID 訂閱了多個 Topic,并且每個 Group ID 里的多個消費者的訂閱關(guān)系保持了一致。

正確的訂閱關(guān)系正確的訂閱關(guān)系

接下來,我們展示錯誤的訂閱關(guān)系。

錯誤的訂閱關(guān)系錯誤的訂閱關(guān)系

從上圖中,單個 Group ID 訂閱了多個 Topic,但是該 Group ID 里的多個消費者的訂閱關(guān)系并沒有保持一致。

代碼邏輯角度來看,每個消費者實例內(nèi)訂閱方法的主題、 TAG、監(jiān)聽邏輯都需要保持一致。

圖片圖片

接下來,我們實驗相同消費組,兩種不正確的場景,看看消費者和 Broker 服務(wù)有什么異常。

訂閱主題不同,標簽相同

訂閱主題相同,標簽不同

2 訂閱主題不同,標簽相同

圖片圖片

當我們啟動兩個消費者后,消費者組名:myconsumerGroup。

C1消費者訂閱主題 TopicTest , C2消費者訂閱主題  mytest。

在 Broker 端的日志里,會不停的打印拉取消息失敗的日志 :

2023-10-09 14:52:53 WARN PullMessageThread_2 - 
the consumer's subscription not exist, group: myconsumerGroup, topic:TopicTest

那么在這種情況下,C1 消費者是不可能拉取到消息,也就不可能消費到最新的消息。

為什么呢 ?我們知道客戶端會定時的發(fā)送心跳包到 Broker 服務(wù),心跳包中會包含消費者訂閱信息,數(shù)據(jù)格式樣例如下:

"subscriptionDataSet": [
  {
    "classFilterMode": false,
    "codeSet": [],
    "expressionType": "TAG",
    "subString": "*",
    "subVersion": 1696832107020,
    "tagsSet": [],
    "topic": "TopicTest"
  },
  {
    "classFilterMode": false,
    "codeSet": [],
    "expressionType": "TAG",
    "subString": "*",
    "subVersion": 1696832098221,
    "tagsSet": [],
    "topic": "%RETRY%myconsumerGroup"
  }
]

Broker 服務(wù)會調(diào)用 ClientManageProcessor 的 heartBeat方法處理心跳請求。

最終跟蹤到代碼:org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

圖片圖片

Broker 服務(wù)的會保存消費者信息,消費者信息存儲在消費者表 consumerTable 。消費者表以消費組名為 key , 值為消費者組信息 ConsumerGroupInfo 。

#org.apache.rocketmq.broker.client.ConsumerManager
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
    new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

如果消費組的消費者信息 ConsumerGroupInfo 為空,則新建新的對象。

更新訂閱信息時,訂閱信息是按照消費組存放的,這步驟就會導(dǎo)致同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋。

回到消費者客戶端,當消費者拉取消息時,Broker 服務(wù)會調(diào)用 PullMessageProcessor 的 processRequest 方法 。

首先會進行前置判斷,查詢當前的主題的訂閱信息若該主題的訂閱信息為空,則打印告警日志,并返回異常的響應(yīng)結(jié)果。

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());    
if (null == subscriptionData) {
     log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), 
     response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
     response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
     return response;
}

通過調(diào)研 Broker 端的代碼,我們發(fā)現(xiàn):相同消費組的訂閱信息必須保持一致 , 否則同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋,從而導(dǎo)致某個消費者客戶端無法拉取到新的消息。

C1消費者無法消費主題 TopicTest 的消息數(shù)據(jù),那么 C2 消費者訂閱主題 mytest,消費會正常嗎 ?

圖片圖片

從上圖來看,依然有問題。主題 mytest 有四個隊列,但只有兩個隊列被分配了, 另外兩個隊列的消息就沒有辦法消費了。

要解釋這個問題,我們需要重新溫習(xí)負載均衡的原理。

負載均衡服務(wù)會根據(jù)消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:

(1) 獲取該主題下的消息消費隊列集合;

(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;

(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列;

圖片圖片

這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數(shù),并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應(yīng)該分配到的記錄。

(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作。

圖片圖片

消費者實例內(nèi) ,processQueueTable 對象存儲著當前負載均衡的隊列 ,以及該隊列的處理隊列 processQueue (消費快照)。

  1. 標紅的 Entry 部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。
  2. 綠色的 Entry 部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經(jīng)存在該隊列。
  3. 黃色的 Entry 部分表示這些隊列需要添加到 processQueueTable 對象中,為每個分配的新隊列創(chuàng)建一個消息拉取請求 pullRequest , 在消息拉取請求中保存一個處理隊列 processQueue (隊列消費快照),內(nèi)部是紅黑樹(TreeMap),用來保存拉取到的消息。

最后創(chuàng)建拉取消息請求列表,并將請求分發(fā)到消息拉取服務(wù),進入拉取消息環(huán)節(jié)。

通過上面的介紹 ,通過負載均衡的原理推導(dǎo),原因就顯而易見了。

圖片圖片

C1消費者被分配了隊列 0、隊列 1 ,但是 C1消費者本身并沒有訂閱主題 mytest , 所以無法消費該主題的數(shù)據(jù)。

從本次實驗來看,C1消費者無法消費主題 TopicTest 的消息數(shù)據(jù) , C2 消費者只能部分消費主題 mytest的消息數(shù)據(jù)。

但是因為在 Broker 端,同一個消費組內(nèi)的各個消費者客戶端的訂閱信息相互被覆蓋,所以這種消費狀態(tài)非?;靵y,偶爾也會切換成:C1消費者可以部分消費主題 TopicTest 的消息數(shù)據(jù) , C2消費者無法消費主題  mytest的消息數(shù)據(jù)。

3 訂閱主題相同,標簽不同

圖片圖片

如圖,C1 消費者和 C2 消費者訂閱主題 TopicTest ,但兩者的標簽 TAG 并不相同。

啟動消費者服務(wù)之后,從控制臺觀察,負載均衡的效果也如預(yù)期一般正常。

圖片圖片

筆者在 Broker 端打印埋點日志,發(fā)現(xiàn)主題 TopicTest 的訂閱信息為 :

{
  "classFilterMode": false,
  "codeSet": [66],
  "expressionType": "TAG",
  "subString": "B",
  "subVersion": 1696901014319,
  "tagsSet": ["B"],
  "topic": "TopicTest"
}

那么這種狀態(tài),消費正常嗎 ?筆者做了一組實驗,消費依然混亂:

C1 消費者無法消費 TAG 值為 A 的消息 ,C2 消費者只能消費部分 TAG 值為 B 的消息。

想要理解原因,我們需要梳理消息過濾機制。

首先 ConsumeQueue 文件的格式如下 :

圖片圖片

  1. Broker 端在接收到拉取請求后,根據(jù)請求參數(shù)定位 ConsumeQueue 文件,然后遍歷 ConsumeQueue 待檢索的條目, 判斷條目中存儲 Tag 的 hashcode 是否和訂閱信息中 TAG 的 hashcode 是否相同,若不符合,則跳過,繼續(xù)對比下一個, 符合條件的聚合后返回給消費者客戶端。
  2. 消費者在收到過濾后的消息后,也要執(zhí)行過濾機制,只不過過濾的是 TAG 字符串的值,而不是 hashcode 。

我們模擬下消息過濾的過程:

圖片圖片

首先,生產(chǎn)者將不同的消息發(fā)送到 Broker 端,不同的 TAG 的消息會發(fā)送到保存的不同的隊列中。

C1 消費者從隊列 0 ,隊列 1 中拉取消息時,因為 Broker 端該主題的訂閱信息中 TAG 值為 B ,經(jīng)過服務(wù)端過濾后, C1 消費者拉取到的消息的 TAG 值都是 B  , 但消費者在收到過濾的消息后,也需要進行客戶端過濾,A 并不等于 B ,所以 C1 消費者無法消費 TAG 值為 A 的消息。

C2 消費者從隊列 2, 隊列 3 中拉取消息,整個邏輯鏈路是正常的 ,但是因為負載均衡的緣故,它無法消費隊列 0 ,隊列 1的消息。

4 總結(jié)

什么是消費組 ?消費同一類消息且消費邏輯一致 。

RocketMQ 4.X 源碼實現(xiàn)就是為了和消費組的定義保持一致 ,假如訂閱關(guān)系不一致,那么代碼執(zhí)行邏輯就會出現(xiàn)混亂。

規(guī)避訂閱關(guān)系不一致這個問題有兩種方式 :

  • 嚴格規(guī)范上線流程在上線之前,梳理好相關(guān)依賴服務(wù),梳理好上線流程,做好上線評審,并嚴格按照流程執(zhí)行。
  • 合理定義好主題和標簽當我們定義好主題和標簽后,需要添加新的標簽時,是否可以換一個思路:換一個新的消費組或者新建一個主題。

最后的思考:

假如從基礎(chǔ)架構(gòu)層面來思考,將訂閱關(guān)系信息中心化來設(shè)計,應(yīng)該也可以實現(xiàn) ,但成本較高,對于中小企業(yè)來講,并不合算。

參考資料:

RocketMQ為什么要保證訂閱關(guān)系的一致性

https://cloud.tencent.com/developer/article/1474885

RocketMQ最佳實踐之坑?

https://mp.weixin.qq.com/s/Ypk-U8uVu4aZKMinbfU3xQ

源碼分析RocketMQ消息過濾機制

https://blog.csdn.net/prestigeding/article/details/79255328


責任編輯:武曉燕 來源: 勇哥java實戰(zhàn)分享
相關(guān)推薦

2021-02-02 05:41:16

底層設(shè)計頂層

2021-02-07 21:59:39

Java回調(diào)機制

2018-07-05 09:41:08

一致性哈希算法

2021-10-26 00:17:21

Linux網(wǎng)絡(luò)命名

2020-07-02 09:15:59

Netty內(nèi)存RPC

2022-12-05 16:49:05

volatileJava

2021-01-18 07:52:08

Dom節(jié)點Element

2023-12-12 07:31:51

Executors工具開發(fā)者

2023-02-28 23:04:15

2023-11-06 09:06:54

分布式一致性數(shù)據(jù)

2024-04-11 08:01:24

RedisMysql分布式鎖

2019-10-28 09:26:35

PylintPython編程語言

2018-01-29 15:25:05

前端JSDate對象

2019-11-08 16:05:54

Promise前端鏈式調(diào)用

2019-09-12 09:40:34

秒殺系統(tǒng)高并發(fā)

2022-01-27 08:31:20

一致性哈希

2017-10-23 10:34:36

服務(wù)器數(shù)據(jù)同步

2023-12-05 14:44:01

2023-11-08 07:56:38

單鏈表雙鏈表

2025-05-28 02:20:00

點贊
收藏

51CTO技術(shù)棧公眾號