偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

RocketMQ順序消息解析!

開發(fā) 前端
順序消息是消息隊列 RocketMQ 提供的一種高級消息類型。對于一個指定的Topic,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費。

順序消息是消息隊列 RocketMQ 提供的一種高級消息類型。

對于一個指定的Topic,消息嚴(yán)格按照先進(jìn)先出(FIFO)的原則進(jìn)行消息發(fā)布和消費。

  • 即先發(fā)送的消息先消費,后發(fā)送的消息后消費。

順序消息適用于對消息發(fā)送和消費順序有嚴(yán)格要求的情況。

應(yīng)用場景

順序消息和普通消息的對比如下:

消息類型

消費順序

性能

適用場景

普通消息

無順序

適用于對吞吐量要求高,且對生產(chǎn)和消費順序無要求

順序消息

指定的 Topic 內(nèi)的消息遵循先入先出(FIFO)規(guī)則

一般

吞吐量要求一般

但是要求特定的 Topic 嚴(yán)格地按照 FIFO 原則進(jìn)行消息發(fā)布和消費的場景

訂單創(chuàng)建場景:

在一些電商系統(tǒng)中,同一個訂單相關(guān)的創(chuàng)建訂單消息、訂單支付消息、訂單退款消息、訂單物流消息。

必須嚴(yán)格按照先后順序來進(jìn)行生產(chǎn)或者消費,否則消費中傳遞訂單狀態(tài)會發(fā)生紊亂,影響業(yè)務(wù)的正常進(jìn)行。

因此,該訂單的消息必須按照一定的順序在客戶端和消息隊列中進(jìn)行生產(chǎn)和消費。

  • 同時消息之間有先后的依賴關(guān)系,后一條消息需要依賴于前一條消息的處理結(jié)果。

順序消息分為全局有序和局部有序。

全局有序

可以為Topic設(shè)置一個消息隊列,使用一個生產(chǎn)者單線程發(fā)送數(shù)據(jù),消費者端也使用單線程進(jìn)行消費。

從而保證消息的全局有序,但是這種方式效率低,一般不使用。

圖片

局部有序

假設(shè)一個Topic分配了兩個消息隊列,生產(chǎn)者在發(fā)送消息的時候,可以對消息設(shè)置一個路由ID。

  • 比如想保證一個訂單的相關(guān)消息有序,那么就使用訂單ID當(dāng)做路由ID。

在發(fā)送消息的時候,通過訂單ID對消息隊列的個數(shù)取余,根據(jù)取余結(jié)果選擇消息隊列。

  • 這樣同一個訂單的數(shù)據(jù)就可以保證發(fā)送到一個消息隊列中。

消費者端使用MessageListenerOrderly處理有序消息。

這就是RocketMQ的局部有序,保證消息在某個消息隊列中有序。

圖片圖片

實現(xiàn)原理

消費者在啟動時會調(diào)用DefaultMQPushConsumerImpl的start方法。

圖片圖片

在DefaultMQPushConsumerImpl的start方法中,對消息監(jiān)聽器類型進(jìn)行了判斷。

如果類型是MessageListenerOrderly表示要進(jìn)行順序消費。

此時使用ConsumeMessageOrderlyService對ConsumeMessageService進(jìn)行實例化。

  • 然后調(diào)用它的start方法進(jìn)行啟動。

圖片圖片

加鎖定時任務(wù)

進(jìn)入到ConsumeMessageOrderlyService的start方法中。

可以看到,如果是集群模式,會啟動一個定時加鎖的任務(wù),周期性的對訂閱的消息隊列進(jìn)行加鎖。

具體是通過調(diào)用RebalanceImpl的lockAll方法實現(xiàn)的。

圖片圖片

為什么集群模式下需要加鎖?

因為廣播模式下,消息隊列會分配給消費者下的每一個消費者。

而在集群模式下,一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進(jìn)行。

  • 所以在廣播模式下不存在競爭關(guān)系,也就不需要對消息隊列進(jìn)行加鎖。

而在集群模式下,有可能因為負(fù)載均衡等原因?qū)⒛骋粋€消息隊列分配到了另外一個消費者中。

  • 因此在集群模式下就要加鎖,當(dāng)某個消息隊列被鎖定時,其他的消費者不能進(jìn)行消費。

整個順序消費過程涉及了三把鎖,它們分別對應(yīng)不同的情況。

向Broker申請的消息隊列鎖

集群模式下一個消息隊列同一時刻只能被同一個消費組下的某一個消費者進(jìn)行。

為了避免負(fù)載均衡等原因引起的變動,消費者會向Broker發(fā)送請求對消息隊列進(jìn)行加鎖。

如果加鎖成功,記錄到消息隊列對應(yīng)的ProcessQueue中的locked變量中,它是boolean類型的。

public class ProcessQueue {
    private volatile boolean locked = false;
}

消費者處理拉取消息時的消息隊列鎖

消費者在處理拉取到的消息時,由于可以開啟多線程進(jìn)行處理。

所以處理消息前通過MessageQueueLock中的mqLockTable獲取到了消息隊列對應(yīng)的鎖。

鎖住要處理的消息隊列,這里加消息隊列鎖主要是處理多線程之間的競爭。

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

消息消費鎖

消費者在調(diào)用consumeMessage方法之前會加消費鎖。

主要是為了避免在消費消息時,由于負(fù)載均衡等原因,ProcessQueue被刪除。

public class ProcessQueue {
    private final Lock consumeLock = new ReentrantLock();
}

圖片圖片

順序消息缺陷

消費順序消息的并行度依賴于隊列的數(shù)量。

隊列熱點問題,個別隊列由于哈希不均導(dǎo)致消息過多,消費速度跟不上,產(chǎn)生消息堆積問題。

遇到消息失敗的消息,無法跳過,當(dāng)前隊列消費暫停。

熱點問題,只能通過拆分MessageQueue和優(yōu)化路由方法來盡量均衡的將消息分配到不同的MessageQueue。

消費并行度理論上不會有太大問題,因為MessageQueue的數(shù)量可以調(diào)整。

消費失敗的無法跳過是不可避免的。

因為跳過可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯誤的。

不過可以提供一些策略,由用戶根據(jù)錯誤類型來決定是否跳過,并且提供重試隊列之類的功能。

  • 在跳過之后用戶可以在其他地方重新消費到這條消息。

資料分享:

參考:

丁威、周繼鋒《RocketMQ技術(shù)內(nèi)幕》

https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage/

責(zé)任編輯:武曉燕 來源: 月伴飛魚
相關(guān)推薦

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2021-04-15 09:17:01

SpringBootRocketMQ

2024-09-25 08:32:05

2024-08-22 18:49:23

2023-12-15 13:08:00

RocketMQ中間件消費順序

2023-09-04 08:00:53

提交事務(wù)消息

2021-07-13 11:52:47

順序消息RocketMQkafka

2024-10-11 09:15:33

2024-11-11 00:00:10

2022-06-27 11:04:24

RocketMQ順序消息

2022-12-22 10:03:18

消息集成

2023-12-04 09:23:49

分布式消息

2023-07-18 09:03:01

RocketMQ場景消息

2025-04-09 08:20:00

RocketMQ消息隊列開發(fā)

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2023-09-21 09:02:03

RocketMQ全局有序局部有序

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)
點贊
收藏

51CTO技術(shù)棧公眾號