偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

1.5萬(wàn)字 + 25張圖盤點(diǎn)RocketMQ 11種消息類型,你知道幾種?

開發(fā) 前端
所謂的單向消息就是指,生產(chǎn)者發(fā)送消息給服務(wù)端之后,就直接不管了,所以對(duì)于生產(chǎn)者來說,他是不會(huì)去care消息發(fā)送的結(jié)果了,即使發(fā)送失敗了,對(duì)于生產(chǎn)者來說也是無(wú)所謂的。

大家好,我是三友~~

故事的開頭是這樣的

最近有個(gè)兄弟私信了我一張截圖

圖片圖片

我一看截圖內(nèi)容,好家伙,原來是我一年多前立的flag

倒不是我忘了這件事,我后來也的確寫了一篇的關(guān)于RocketMQ運(yùn)行的原理的文章

只不過這篇文章是從上帝的視角去看待RocektMQ一條消息整個(gè)生命周期的過程

所以就沒有具體的分析事務(wù)和延遲消息的實(shí)現(xiàn)原理,也算是留下了一個(gè)小小的坑吧

不過,既然現(xiàn)在有兄弟問了,那么今天我這就來把這個(gè)坑填上

并且,索性咱就直接把這個(gè)坑填得滿滿的,直接盤點(diǎn)RocketMQ支持的11種消息類型以及背后的實(shí)現(xiàn)原理

圖片圖片

本文是基于RocketMQ 4.9版本講解

前置知識(shí)

為了幫助大家更好地理解這些消息底層的實(shí)現(xiàn)原理,這里我就通過三個(gè)問題來講一講RocketMQ最最基本的原理

1、生產(chǎn)者如何發(fā)送消息

在RocketMQ中有兩個(gè)重要的角色

  • NameServer:就相當(dāng)于一個(gè)注冊(cè)中心
  • Broker:RocketMQ服務(wù)端

當(dāng)RocketMQ服務(wù)端,也就是Broker在啟動(dòng)的時(shí)候,會(huì)往NameServer注冊(cè)自己的信息

圖片圖片

這些信息其中就包括

  • 當(dāng)前Broker所在機(jī)器的ip和端口
  • 當(dāng)前Broker管理的Topic的名稱以及每個(gè)Topic有幾個(gè)隊(duì)列

當(dāng)生產(chǎn)者和消費(fèi)者啟動(dòng)的時(shí)候,就會(huì)從NameServer拉取這些信息,這樣生產(chǎn)者和消費(fèi)者就可以通過NameServer中獲取到Broker的ip和端口,跟Broker通信了

而Topic我們也都知道,是消息隊(duì)列中一個(gè)很重要的概念,代表了一類消息的集合

在RocketMQ中,每個(gè)Topic默認(rèn)都會(huì)有4個(gè)隊(duì)列,并且每個(gè)隊(duì)列都有一個(gè)id,默認(rèn)從0開始,依次遞增

圖片圖片

當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,就會(huì)從消息所在Topic的隊(duì)列中,根據(jù)一定的算法選擇一個(gè),然后攜帶這個(gè)隊(duì)列的id(queueId),再發(fā)送給Broker

攜帶的隊(duì)列的id就代表了這條消息屬于這個(gè)隊(duì)列的

所以從更細(xì)化的來說,消息雖然是在Topic底下,但是真正是分布在不同的隊(duì)列上的,每個(gè)隊(duì)列會(huì)有這個(gè)Topic下的部分消息。

2、消息存在哪

當(dāng)消息被Broker接收到的時(shí)候,Broker會(huì)將消息存到本地的磁盤文件中,保證Broker重啟之后消息也不丟失

RocketMQ給這個(gè)存消息的文件起了一個(gè)高大上的名字:CommitLog

由于消息會(huì)很多,所以為了防止文件過大,CommitLog在物理磁盤文件上被分為多個(gè)磁盤文件,每個(gè)文件默認(rèn)的固定大小是1G

圖片圖片

消息在寫入到文件時(shí),除了包含消息本身的內(nèi)容數(shù)據(jù),也還會(huì)包含其它信息,比如

  • 消息的Topic
  • 消息所在隊(duì)列的id,前面提到過
  • 消息生產(chǎn)者的ip和端口
  • ...

這些數(shù)據(jù)會(huì)和消息本身按照一定的順序同時(shí)寫到CommitLog文件中

圖片圖片

上圖中黃色排列順序和實(shí)際的存的內(nèi)容并非實(shí)際情況,我只是舉個(gè)例子

3、消費(fèi)者如何消費(fèi)消息

消費(fèi)者是如何拉取消息的

在RocketMQ中,消息的消費(fèi)單元是以隊(duì)列來的

圖片圖片

所以RocketMQ為了方便快速的查找和消費(fèi)消息,會(huì)為每個(gè)Topic的每個(gè)隊(duì)列也單獨(dú)創(chuàng)建一個(gè)文件

RocketMQ給這個(gè)文件也起了一個(gè)高大上的名字:ConsumeQueue

當(dāng)消息被存到CommitLog之后,其實(shí)還會(huì)往這條消息所在隊(duì)列的ConsumeQueue文件中插一條數(shù)據(jù)

每個(gè)隊(duì)列的ConsumeQueue也是由多個(gè)文件組成,每個(gè)文件默認(rèn)是存30萬(wàn)條數(shù)據(jù)

插入ConsumeQueue中的每條數(shù)據(jù)由20個(gè)字節(jié)組成,包含3部分信息

  • 消息在CommitLog的起始位置(8個(gè)字節(jié))
  • 消息在CommitLog存儲(chǔ)的長(zhǎng)度(8個(gè)字節(jié))
  • 消息tag的hashCode(4個(gè)字節(jié))

圖片圖片

每條數(shù)據(jù)也有自己的編號(hào)(offset),默認(rèn)從0開始,依次遞增

當(dāng)消費(fèi)者拉取消息的時(shí)候,會(huì)告訴服務(wù)端自己消費(fèi)哪個(gè)隊(duì)列(queueId),哪個(gè)位置的消息(offset)的消息

服務(wù)端接收到消息之后,會(huì)找到queueId對(duì)應(yīng)的ConsumeQueue,然后找到offset位置的數(shù)據(jù),最后根據(jù)這條數(shù)據(jù)到CommitLog文件查找真正的消息內(nèi)容

所以,從這可以看出,ConsumeQueue其實(shí)就相當(dāng)于是一個(gè)索引文件,方便我們快速查找在CommitLog中的消息

所以,記住下面這個(gè)非常重要的結(jié)論,有助于后面的文章內(nèi)容的理解

要想查找到某個(gè)Topic下的消息,那么一定是先找這個(gè)Topic隊(duì)列對(duì)應(yīng)的ConsumeQueue,之后再通過ConsumeQueue中的數(shù)據(jù)去CommitLog文件查找真正的消息內(nèi)容

消費(fèi)者組和消費(fèi)模式

在RocketMQ,消費(fèi)者是有個(gè)消費(fèi)者組的概念,在啟動(dòng)消費(fèi)者的時(shí)候會(huì)指定該消費(fèi)者屬于哪個(gè)消費(fèi)者組。

//創(chuàng)建一個(gè)消費(fèi)者,指定消費(fèi)者組的名稱為sanyouConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

一個(gè)消費(fèi)者組中可以有多個(gè)消費(fèi)者,不同消費(fèi)者組之間消費(fèi)消息是互不干擾的

圖片圖片

在同一個(gè)消費(fèi)者組中,消息消費(fèi)有兩種模式

  • 集群模式
  • 廣播模式

同一條消息在同一個(gè)消費(fèi)者組底下只會(huì)被消費(fèi)一次,這就叫集群模式

集群消費(fèi)的實(shí)現(xiàn)就是將隊(duì)列按照一定的算法分配給消費(fèi)者,默認(rèn)是按照平均分配的

圖片圖片

廣播模式剛好相反,同一條消息能被同一個(gè)消費(fèi)者組底下所有的消費(fèi)者消費(fèi)一次

圖片圖片

RocketMQ默認(rèn)是集群模式,如果你想用廣播模式,只需設(shè)置一下即可

consumer.setMessageModel(MessageModel.BROADCASTING);

好了,到這就講完了前置知識(shí),這些前置知識(shí)后面或多或少都有提到

如果你覺得看的不過癮,更詳細(xì)的文章奉上RocketMQ消息短暫而又精彩的一生

普通消息

普通消息其實(shí)就很簡(jiǎn)單,如下面代碼所示,就是發(fā)送一條普通的消息

public class Producer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)生產(chǎn)者,指定生產(chǎn)者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動(dòng)生產(chǎn)者
        producer.start();

        //創(chuàng)建一條消息 topic為 sanyouTopic 消息內(nèi)容為 三友的java日記
        Message msg = new Message("sanyouTopic", "三友的java日記".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 發(fā)送消息并得到消息的發(fā)送結(jié)果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }

}

構(gòu)建的消息的topic為sanyouTopic,內(nèi)容為三友的java日記,這就是一條很普通的消息

批量消息

批量消息從名字也可以看出來,就是將多個(gè)消息同時(shí)發(fā)過去,減少網(wǎng)絡(luò)請(qǐng)求的次數(shù)

public class Producer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)生產(chǎn)者,指定生產(chǎn)者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動(dòng)生產(chǎn)者
        producer.start();

        //用以及集合保存多個(gè)消息
        List<Message> messages = new ArrayList<>();
        messages.add(new Message("sanyouTopic", "三友的java日記 0".getBytes()));
        messages.add(new Message("sanyouTopic", "三友的java日記 1".getBytes()));
        messages.add(new Message("sanyouTopic", "三友的java日記 2".getBytes()));
        // 發(fā)送消息并得到消息的發(fā)送結(jié)果,然后打印
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }

}

多個(gè)普通消息同時(shí)發(fā)送,這就是批量消息

不過在使用批量消息的時(shí)候,需要注意以下兩點(diǎn)

  • 每條消息的Topic必須都得是一樣的
  • 不支持延遲消息和事務(wù)消息

普通消息和批量消息比較簡(jiǎn)單,沒有復(fù)雜的邏輯,就是將消息發(fā)送過去,在ConsumeQueue和CommitLog存上對(duì)應(yīng)的數(shù)據(jù)就可以了

順序消息

所謂的順序消息就是指

生產(chǎn)者發(fā)送消息的順序跟消費(fèi)者消費(fèi)消息的順序是一致的

RocketMQ可以保證同一個(gè)隊(duì)列的消息絕對(duì)順序,先進(jìn)入隊(duì)列的消息會(huì)先被消費(fèi)者拉取到,但是無(wú)法保證一個(gè)Topic內(nèi)消息的絕對(duì)順序

所以要想通過RocketMQ實(shí)現(xiàn)順序消費(fèi),需要保證兩點(diǎn)

  • 生產(chǎn)者將需要保證順序的消息發(fā)送到同一個(gè)隊(duì)列
  • 消費(fèi)者按照順序消費(fèi)拉取到的消息

圖片圖片

那么,第一個(gè)問題,如何消息發(fā)送到同一個(gè)隊(duì)列

前面有提到,RocketMQ發(fā)送消息的時(shí)候會(huì)選擇一個(gè)隊(duì)列進(jìn)行發(fā)送

而RocketMQ默認(rèn)是通過輪詢算法來選擇隊(duì)列的,這就無(wú)法保證需要順序消費(fèi)的消息會(huì)存到同一個(gè)隊(duì)列底下

所以,默認(rèn)情況下是不行了,我們需要自定義隊(duì)列的選擇算法,才能保證消息都在同一個(gè)隊(duì)列中

RocketMQ提供了自定義隊(duì)列選擇的接口MessageQueueSelector

比如我們可以實(shí)現(xiàn)這個(gè)接口,保證相同訂單id的消息都選擇同一個(gè)隊(duì)列,在消息發(fā)送的時(shí)候指定一下就可以了

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        //可以根據(jù)業(yè)務(wù)的id從mqs中選擇一個(gè)隊(duì)列
        return null;
    }
}, new Object());

保證消息順序發(fā)送之后,第二個(gè)問題,消費(fèi)者怎么按照順序消費(fèi)拉取到的消息?

這個(gè)問題RocketMQ已經(jīng)考慮到了,看看RocketMQ多么地貼心

RocketMQ在消費(fèi)消息的時(shí)候,提供了兩種方式:

  • 并發(fā)消費(fèi)
  • 順序消費(fèi)

并發(fā)消費(fèi),多個(gè)線程同時(shí)處理同一個(gè)隊(duì)列拉取到的消息

順序消費(fèi),同一時(shí)間只有一個(gè)線程會(huì)處理同一個(gè)隊(duì)列拉取到的消息

至于是并發(fā)消費(fèi)還是順序消費(fèi),需要我們自己去指定

對(duì)于順序處理,只需要實(shí)現(xiàn)MessageListenerOrderly接口,處理消息就可以了

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 創(chuàng)建一個(gè)消費(fèi)者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 訂閱sanyouTopic這個(gè)topic下的所有的消息
        consumer.subscribe("sanyouTopic", "*");
        // 注冊(cè)一個(gè)消費(fèi)的監(jiān)聽器,當(dāng)有消息的時(shí)候,會(huì)回調(diào)這個(gè)監(jiān)聽器來消費(fèi)消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消費(fèi)消息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 啟動(dòng)消費(fèi)者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

如果想并發(fā)消費(fèi),換成實(shí)現(xiàn)MessageListenerConcurrently即可

到這你可能會(huì)有一個(gè)疑問

并發(fā)消費(fèi)和順序消費(fèi)跟前面提到的集群消費(fèi)和廣播消費(fèi)有什么區(qū)別?

集群消費(fèi)和廣播消費(fèi)指的是一個(gè)消費(fèi)者組里的每個(gè)消費(fèi)者是去拉取全部隊(duì)列的消息還是部分隊(duì)列的消息,也就是選擇需要拉取的隊(duì)列

而并發(fā)和順序消費(fèi)的意思是,是對(duì)已經(jīng)拉到的同一個(gè)隊(duì)列的消息,是并發(fā)處理還是按照消息的順序去處理

延遲消息

延遲消息就是指生產(chǎn)者發(fā)送消息之后,消息不會(huì)立馬被消費(fèi),而是等待一定的時(shí)間之后再被消息

RocketMQ的延遲消息用起來非常簡(jiǎn)單,只需要在創(chuàng)建消息的時(shí)候指定延遲級(jí)別,之后這條消息就成為延遲消息了

Message message = new Message("sanyouTopic", "三友的java日記 0".getBytes());
//延遲級(jí)別
message.setDelayTimeLevel(1);

雖然用起來簡(jiǎn)單,但是背后的實(shí)現(xiàn)原理還是有點(diǎn)意思,我們接著往下看

RocketMQ延遲消息的延遲時(shí)間默認(rèn)有18個(gè)級(jí)別,不同的延遲級(jí)別對(duì)應(yīng)的延遲時(shí)間不同

圖片圖片

RocketMQ內(nèi)部有一個(gè)Topic,專門用來表示是延遲消息的,叫SCHEDULE_TOPIC_XXXX,XXXX不是占位符,就是XXXX

RocketMQ會(huì)根據(jù)延遲級(jí)別的個(gè)數(shù)為SCHEDULE_TOPIC_XXXX這個(gè)Topic創(chuàng)建相對(duì)應(yīng)數(shù)量的隊(duì)列

比如默認(rèn)延遲級(jí)別是18,那么SCHEDULE_TOPIC_XXXX就有18個(gè)隊(duì)列,隊(duì)列的id從0開始,所以延遲級(jí)別為1時(shí),對(duì)應(yīng)的隊(duì)列id就是0,為2時(shí)對(duì)應(yīng)的就是1,依次類推

圖片圖片

那SCHEDULE_TOPIC_XXXX這個(gè)Topic有什么作用呢?

這就得從消息存儲(chǔ)時(shí)的一波偷梁換柱的騷操作了說起了

當(dāng)服務(wù)端接收到消息的時(shí)候,判斷延遲級(jí)別大于0的時(shí)候,說明是延遲消息,此時(shí)會(huì)干下面三件事:

  • 將消息的Topic改成SCHEDULE_TOPIC_XXXX
  • 將消息的隊(duì)列id設(shè)置為延遲級(jí)別對(duì)應(yīng)的隊(duì)列id
  • 將消息真正的Topic和隊(duì)列id存到前面提到的消息存儲(chǔ)時(shí)的額外信息中

之后消息就按照正常存儲(chǔ)的步驟存到CommitLog文件中

由于消息存到的是SCHEDULE_TOPIC_XXXX這個(gè)Topic中,而不是消息真正的目標(biāo)Topic中,所以消費(fèi)者此時(shí)是消費(fèi)不到消息的

舉個(gè)例子,比如有條消息,Topic為sanyou,所在的隊(duì)列id = 1,延遲級(jí)別 = 1,那么偷梁換柱之后的結(jié)果如下圖所示

圖片圖片

代碼如下

圖片圖片

所以從上分析可以得出一個(gè)結(jié)論

所有RocketMQ的延遲消息,最終都會(huì)存儲(chǔ)到SCHEDULE_TOPIC_XXXX這個(gè)Topic中,并且同一個(gè)延遲級(jí)別的消息在同一個(gè)隊(duì)列中

在存消息偷梁換柱之后,實(shí)現(xiàn)延遲消費(fèi)的最關(guān)鍵的一個(gè)步驟來了

BocketMQ在啟動(dòng)的時(shí)候,除了為每個(gè)延遲級(jí)別創(chuàng)建一個(gè)隊(duì)列之后,還會(huì)為每個(gè)延遲級(jí)別創(chuàng)建一個(gè)延遲任務(wù),也就相當(dāng)于一個(gè)定時(shí)任務(wù),每隔100ms執(zhí)行一次

圖片圖片

這個(gè)延遲任務(wù)會(huì)去檢查這個(gè)隊(duì)列中的消息有沒有到達(dá)延遲時(shí)間,也就是不是可以消費(fèi)了

前面的結(jié)論,每個(gè)隊(duì)列都有一個(gè)ConsumeQueue文件,可以通過ConsumeQueue找到這個(gè)隊(duì)列中的消息

一旦發(fā)現(xiàn)到達(dá)延遲時(shí)間,可以消費(fèi)了,此時(shí)就會(huì)從這條消息額外存儲(chǔ)的消息中拿到真正的Topic和隊(duì)列id,重新構(gòu)建一條新的消息,將新的消息的Topic和隊(duì)列id設(shè)置成真正的Topic和隊(duì)列id,內(nèi)容還是原來消息的內(nèi)容

之后再一次將新構(gòu)建的消息存儲(chǔ)到CommitLog中

由于新消息的Topic變成消息真正的Topic了,所以之后消費(fèi)者就能夠消費(fèi)到這條消息了

圖片圖片

所以,從整體來說,RocketMQ延遲消息的實(shí)現(xiàn)本質(zhì)上就是最開始消息是存在SCHEDULE_TOPIC_XXXX這個(gè)中轉(zhuǎn)的Topic中

然后會(huì)有一個(gè)類似定時(shí)任務(wù)的東西,不停地去找到這個(gè)Topic中的消息

一旦發(fā)現(xiàn)這個(gè)消息達(dá)到了延遲任務(wù),說明可以消費(fèi)了,那么就重新構(gòu)建一條消息,這條消息的Topic和隊(duì)列id都是實(shí)際上的Topic和隊(duì)列id,然后存到CommitLog

之后消費(fèi)者就能夠在目標(biāo)的Topic獲取到消息了

事務(wù)消息

事務(wù)消息用起來也比較簡(jiǎn)單,如下所示:

public class TransactionMessageDemo {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("sanyouProducer");
        transactionMQProducer.setNamesrvAddr("192.168.200.143:9876");

        //設(shè)置事務(wù)監(jiān)聽器
        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                //處理本次事務(wù)
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //檢查本地事務(wù)
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        transactionMQProducer.start();

        Message message = new Message("sanyouTopic", "三友的java日記".getBytes());

        //發(fā)送消息
        transactionMQProducer.sendMessageInTransaction(message, new Object());
    }

}

事務(wù)消息發(fā)送相對(duì)于前面的例子主要有以下不同:

  • 將前面的DefaultMQProducer換成TransactionMQProducer
  • 需要設(shè)置事務(wù)的監(jiān)聽器TransactionListener,來執(zhí)行本地事務(wù)
  • 發(fā)送方法改成 sendMessageInTransaction

為什么要這么改,接下來我們來講講背后的實(shí)現(xiàn)原理

上一節(jié)在說延遲消息的時(shí)候提到,RocketMQ使用到了SCHEDULE_TOPIC_XXXX這個(gè)中轉(zhuǎn)Topic,來偷梁換柱實(shí)現(xiàn)延遲消息

不僅僅是延遲消息,事務(wù)消息其實(shí)也是這么干的,它也會(huì)進(jìn)行偷梁換柱,將消息先存在RMQ_SYS_TRANS_HALF_TOPIC這個(gè)Topic下,同時(shí)也會(huì)將消息真正的Topic和隊(duì)列id存到額外信息中,操作都是一樣滴

圖片圖片

由于消息不在真正目標(biāo)的Topic下,所以這條消息消費(fèi)者也是消費(fèi)不到滴

當(dāng)消息成功存儲(chǔ)之后,服務(wù)端會(huì)向生產(chǎn)者響應(yīng),告訴生產(chǎn)者我消息存儲(chǔ)成功了,你可以執(zhí)行本地事務(wù)了

之后生產(chǎn)者就會(huì)執(zhí)行本地執(zhí)行事務(wù),也就是執(zhí)行如下方法

TransactionListener#executeLocalTransaction

當(dāng)本地事務(wù)執(zhí)行完之后,會(huì)將執(zhí)行的結(jié)果發(fā)送給服務(wù)端

服務(wù)端會(huì)根據(jù)事務(wù)的執(zhí)行狀態(tài)來執(zhí)行對(duì)應(yīng)的處理結(jié)果

  • commit:提交事務(wù)消息,跟延遲消息一樣,重新構(gòu)建一條消息,Topic和隊(duì)列id都設(shè)置成消息真正的Topic和隊(duì)列id,然后重新存到CommitLog文件,這樣消費(fèi)者就可以消費(fèi)到消息了
  • rollback:回滾消息,其實(shí)并沒有實(shí)際的操作,因?yàn)橄⒈旧砭筒辉谡嬲腡opic下,所以消費(fèi)者壓根就消費(fèi)不到,什么都不做就可以了
  • unknown:本地事務(wù)執(zhí)行異常時(shí)就是這個(gè)狀態(tài),這個(gè)狀態(tài)下會(huì)干一些事,咱們后面再說

所以在正常情況下,事務(wù)消息整個(gè)運(yùn)行流程如下圖所示

圖片圖片

既然有正常情況下,那么就有非正常情況下

比如前面提到的拋異常導(dǎo)致unknown,又或者什么亂七八糟的原因,導(dǎo)致無(wú)法正常提交本地事務(wù)的執(zhí)行狀態(tài),那么此時(shí)該怎么辦呢?

RocketMQ當(dāng)然也想到了,他有自己的一套補(bǔ)償機(jī)制

RocketMQ內(nèi)部會(huì)起動(dòng)一個(gè)線程,默認(rèn)每隔1分鐘去檢查沒有被commit或者rollback的事務(wù)消息

RocketMQ內(nèi)部有一套機(jī)制,可以找出哪些事務(wù)消息沒有commit或者rollback,這里就不細(xì)說了

當(dāng)發(fā)現(xiàn)這條消息超過6s沒有提交事務(wù)狀態(tài),那么此時(shí)就會(huì)向生產(chǎn)者發(fā)送一個(gè)請(qǐng)求,讓生產(chǎn)者去檢查一下本地的事務(wù)執(zhí)行的狀態(tài),就是執(zhí)行下面這行代碼

TransactionListener#checkLocalTransaction

之后會(huì)將這個(gè)方法返回的事務(wù)狀態(tài)提交給服務(wù)端,服務(wù)端就可以知道事務(wù)的執(zhí)行狀態(tài)了

圖片

這里有一個(gè)細(xì)節(jié)需要注意,事務(wù)消息檢查次數(shù)不是無(wú)限的,默認(rèn)最大為15次,一旦超過15次,那么就不會(huì)再被檢查了,而是會(huì)直接把這個(gè)消息存到TRANS_CHECK_MAX_TIME_TOPIC中

所以你可以從這個(gè)Topic讀取那些無(wú)法正常提交事務(wù)的消息

這就是RocketMQ事務(wù)消息的原理

小總結(jié)

RocketMQ事務(wù)消息的實(shí)現(xiàn)主要是先將消息存到RMQ_SYS_TRANS_HALF_TOPIC這個(gè)中間Topic,有些資料會(huì)把這個(gè)消息稱為半消息(half消息),這是因?yàn)檫@個(gè)消息不能被消費(fèi)

之后會(huì)執(zhí)行本地的事務(wù),提交本地事務(wù)的執(zhí)行狀態(tài)

RocketMQ會(huì)根據(jù)事務(wù)的執(zhí)行狀態(tài)去判斷commit或者是rollback消息,也就是是不是可以讓消費(fèi)者消費(fèi)這條消息的意思

在一些異常情況下,生產(chǎn)者無(wú)法及時(shí)正確提交事務(wù)執(zhí)行狀態(tài)

RocketMQ會(huì)向生產(chǎn)者發(fā)送消息,讓生產(chǎn)者去檢查本地的事務(wù),之后再提交事務(wù)狀態(tài)

當(dāng)然,這個(gè)檢查次數(shù)默認(rèn)不超過15次,如果超過15次還未成功提交事務(wù)狀態(tài),RocketMQ就會(huì)直接把這個(gè)消息存到TRANS_CHECK_MAX_TIME_TOPIC中

請(qǐng)求-應(yīng)答消息

這個(gè)消息類型比較有意思,類似一種RPC的模式

生產(chǎn)者發(fā)送消息之后可以阻塞等待消費(fèi)者消費(fèi)這個(gè)消息的之后返回的結(jié)果

生產(chǎn)者通過過調(diào)用request方法發(fā)送消息,接收回復(fù)消息

public class Producer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)生產(chǎn)者,指定生產(chǎn)者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動(dòng)生產(chǎn)者
        producer.start();


        Message message = new Message("sanyouTopic", "三友的java日記".getBytes());
        
        //發(fā)送消息,拿到響應(yīng)結(jié)果, 3000代表超時(shí)時(shí)間,3s內(nèi)未拿到響應(yīng)結(jié)果,就超時(shí),會(huì)拋出RequestTimeoutException異常
        Message result = producer.request(message, 3000);
        System.out.println("接收到響應(yīng)消息:" + result);

        // 關(guān)閉生產(chǎn)者
        producer.shutdown();
    }

}

而對(duì)于消費(fèi)者來著,當(dāng)消費(fèi)完消息之后,也要作為生產(chǎn)者,將響應(yīng)的消息發(fā)送出去

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        //創(chuàng)建一個(gè)生產(chǎn)者,指定生產(chǎn)者組為 sanyouProducer
        DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");
        // 指定NameServer的地址
        producer.setNamesrvAddr("192.168.200.143:9876");
        // 啟動(dòng)生產(chǎn)者
        producer.start();


        // 通過push模式消費(fèi)消息,指定消費(fèi)者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
        // 指定NameServer的地址
        consumer.setNamesrvAddr("192.168.200.143:9876");

        // 訂閱這個(gè)topic下的所有的消息
        consumer.subscribe("sanyouTopic", "*");
        // 注冊(cè)一個(gè)消費(fèi)的監(jiān)聽器,當(dāng)有消息的時(shí)候,會(huì)回調(diào)這個(gè)監(jiān)聽器來消費(fèi)消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消費(fèi)消息:%s", new String(msg.getBody()) + "\n");

                    try {
                        // 用RocketMQ自帶的工具類創(chuàng)建響應(yīng)消息
                        Message replyMessage = MessageUtil.createReplyMessage(msg, "這是響應(yīng)消息內(nèi)容".getBytes(StandardCharsets.UTF_8));
                        // 將響應(yīng)消息發(fā)送出去,拿到發(fā)送結(jié)果
                        SendResult replyResult = producer.send(replyMessage, 3000);
                        System.out.println("響應(yīng)消息的結(jié)果 = " + replyResult);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動(dòng)消費(fèi)者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

這種請(qǐng)求-應(yīng)答消息實(shí)現(xiàn)原理也比較簡(jiǎn)單,如下圖所示

圖片圖片

生產(chǎn)者和消費(fèi)者,會(huì)跟RocketMQ服務(wù)端進(jìn)行網(wǎng)絡(luò)連接

所以他們都是通過這個(gè)連接來發(fā)送和拉取消息的

當(dāng)服務(wù)端接收到回復(fù)消息之后,有個(gè)專門處理回復(fù)消息的類

圖片圖片

這個(gè)類就會(huì)直接找到發(fā)送消息的生產(chǎn)者的連接,之后會(huì)通過這個(gè)連接將回復(fù)消息發(fā)送給生產(chǎn)者

RocketMQ底層是基于Netty通信的,所以如果你有用過Netty的話,應(yīng)該都知道,就是通過Channel來發(fā)送的

重試消息

重試消息并不是我們業(yè)務(wù)中主動(dòng)發(fā)送的,而是指當(dāng)消費(fèi)者消費(fèi)消息失敗之后,會(huì)間隔一段時(shí)間之后再次消費(fèi)這條消息

重試的機(jī)制在并發(fā)消費(fèi)模式和順序消費(fèi)模式下實(shí)現(xiàn)的原理并不相同

并發(fā)消費(fèi)模式重試實(shí)現(xiàn)原理

RocetMQ會(huì)為每個(gè)消費(fèi)者組創(chuàng)建一個(gè)重試消息所在的Topic,名字格式為

%RETRY% + 消費(fèi)者組名稱

舉個(gè)例子,假設(shè)消費(fèi)者組為sanyouConsumer,那么重試Topic的名稱為:%RETRY%sanyouConsumer

當(dāng)消息消費(fèi)失敗后,RocketMQ會(huì)把消息存到這個(gè)Topic底下

消費(fèi)者在啟動(dòng)的時(shí)候會(huì)主動(dòng)去訂閱這個(gè)Topic,那么自然而然就能消費(fèi)到消費(fèi)失敗的消息了

圖片圖片

為什么要為每個(gè)消費(fèi)者組創(chuàng)建一個(gè)重試Topic呢?

其實(shí)我前面已經(jīng)說過,每個(gè)消費(fèi)者組的消費(fèi)是隔離的,互不影響

所以,每個(gè)消費(fèi)者組消費(fèi)失敗的消息可能就不一樣,自然要放到不同的Topic下了

重試消息是如何實(shí)現(xiàn)間隔一段時(shí)間來消費(fèi)呢?

說到間隔一段時(shí)間消費(fèi),你有沒有覺得似曾相識(shí)?

不錯(cuò),間隔一段時(shí)間消費(fèi)說白了不就是延遲消費(fèi)么!

所以,并發(fā)消費(fèi)模式下間隔一段時(shí)間底層就是使用的延遲消息來實(shí)現(xiàn)的

RocetMQ會(huì)為重試消息設(shè)置一個(gè)延遲級(jí)別

并且延遲級(jí)別與重試次數(shù)的關(guān)系為

delayLevel = 3 + 已經(jīng)重試次數(shù)

比如第一次消費(fèi)失敗,那么已經(jīng)重試次數(shù)就是0,那么此時(shí)延遲級(jí)別就是3

對(duì)應(yīng)的默認(rèn)的延遲時(shí)間就是10s,也就是一次消息重試消費(fèi)間隔時(shí)間是10s

隨著重試次數(shù)越多,延遲級(jí)別也越來越高,重試的間隔也就越來越長(zhǎng),但是最大也是最大延遲級(jí)別的時(shí)間

不過需要注意的是,在并發(fā)消費(fèi)模式下,只有集群消費(fèi)才支持消息重試,對(duì)于廣播消費(fèi)模式來說,是不支持消息重試的,消費(fèi)失敗就失敗了,不會(huì)管

順序消費(fèi)模式重試實(shí)現(xiàn)原理

順序消費(fèi)模式下重試就比較簡(jiǎn)單了

當(dāng)消費(fèi)失敗的時(shí)候,他并不會(huì)將消息發(fā)送到服務(wù)端,而是直接在本地等1s鐘之后重試

在這個(gè)等待的期間其它消息是不能被消費(fèi)的

這是因?yàn)楸WC消息消費(fèi)的順序性,即使前面的消息消費(fèi)失敗了,它也需要等待前面的消息處理完畢才能處理后面的消息

順序消費(fèi)模式下,并發(fā)消費(fèi)和集群消費(fèi)均支持重試消息

死信消息

死信消息就是指如果消息最終無(wú)法被正常消費(fèi),那么這條消息就會(huì)成為死信消息

RocketMQ中,消息會(huì)變成死信消息有兩種情況

第一種就是消息重試次數(shù)已經(jīng)達(dá)到了最大重試次數(shù)

最大重試次數(shù)取決于并發(fā)消費(fèi)還是順序消費(fèi)

  • 順序消費(fèi),默認(rèn)最大重試次數(shù)就是 Integer.MAX_VALUE,基本上就是無(wú)限次重試,所以默認(rèn)情況下順序消費(fèi)的消息幾乎不可能成為死信消息
  • 并發(fā)消費(fèi)的話,那么最大重試次數(shù)默認(rèn)就是16次

當(dāng)然可以通過如下的方法來設(shè)置最大重試次數(shù)

DefaultMQPushConsumer#setMaxReconsumeTimes

除了上面的情況之外,當(dāng)在并發(fā)消費(fèi)模式下,你可以在消息消費(fèi)失敗之后手動(dòng)指定,直接讓消息變成死信消息

在并發(fā)消費(fèi)消息的模式下,處理消息的方法有這么一個(gè)參數(shù)

ConsumeConcurrentlyContext

圖片圖片

這個(gè)類中有這么一個(gè)屬性

圖片圖片

這個(gè)參數(shù)值有三種情況,注釋也有寫:

  • 小于0,那么直接會(huì)把消息放到死信隊(duì)列,成為死信消息。注釋寫的是=-1,其實(shí)只要小于0就可以成為死信消息,不一定非得是-1
  • 0,默認(rèn)就是0,這個(gè)代表消息重試消費(fèi),并且重試的時(shí)間間隔(也就是延遲級(jí)別)由服務(wù)端決定,也即是前面重試消息提到的 delayLevel = 3 + 已經(jīng)重試次數(shù)
  • 大于0,此時(shí)就表示客戶端指定消息重試的時(shí)間間隔,是幾就代表延遲級(jí)別為幾,比如設(shè)置成1,那么延遲級(jí)別就為1

所以,在并發(fā)消費(fèi)模式下,可以通過設(shè)置這個(gè)參數(shù)值為-1,直接讓處理失敗的消息成為死信消息

當(dāng)消息成為死信消息之后,消息并不會(huì)丟失

RocketMQ會(huì)將死信消息保存在死信Topic底下,Topic格式為

%DLQ% + 消費(fèi)者組名稱

跟重試Topic的格式有點(diǎn)像,只是將%RETRY%換成了%DLQ%

如果你想知道有哪些死信消息,只需要訂閱這個(gè)Topic即可獲得

小總結(jié)

所以總的來說,兩種情況會(huì)讓消息成為死信消息:

  • 消息重試次數(shù)超過最大次數(shù),跟消息的處理方式有關(guān),默認(rèn)情況下順序處理最大次數(shù)是幾乎是無(wú)限次,也就是幾乎不可能成為死信消息;并發(fā)處理的情況下,最大重試次數(shù)默認(rèn)就是16次。最大重試次數(shù)是可以設(shè)置的。
  • 在并發(fā)處理的情況下,通過ConsumeConcurrentlyContext將delayLevelWhenNextConsume屬性設(shè)置成-1,讓消息直接變成死信消息

當(dāng)消息成為死信消息的時(shí)候,會(huì)被存到%DLQ% + 消費(fèi)者組名稱這個(gè)Topic下

用戶可以通過這個(gè)Topic獲取到死信消息,手動(dòng)干預(yù)處理這些消息

同步消息

同步消息是指,當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,需要阻塞等待服務(wù)端響應(yīng)消息存儲(chǔ)的結(jié)果

同步消息跟前面提到的消息類型并不是互斥的

比如前面說的普通消息時(shí)舉的例子,他就是同步發(fā)送的,那么它也是一個(gè)同步消息

這種模式用于對(duì)數(shù)據(jù)一致性要求較高的場(chǎng)景中,但是等待也會(huì)消耗一定的時(shí)間

異步消息

既然有了同步消息,那么相對(duì)應(yīng)的就有異步消息

異步消息就是指生產(chǎn)者發(fā)送消息后,不需要阻塞等待服務(wù)端存儲(chǔ)消息的結(jié)果

所以異步消息的好處就是可以減少等待響應(yīng)過程消耗的時(shí)間

如果你想知道有沒有發(fā)送成功,可以在發(fā)送消息的時(shí)候傳個(gè)回調(diào)的接口SendCallback的實(shí)現(xiàn)

Message message = new Message("sanyouTopic", "三友的java日記".getBytes());

//異步發(fā)送消息
producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息發(fā)送結(jié)果 = " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("消息發(fā)送異常 = " + e.getMessage());
            }
        }
);

當(dāng)消息發(fā)送之后收到發(fā)送結(jié)果或者出現(xiàn)異常的時(shí)候,RocektMQ就會(huì)回調(diào)這個(gè)SendCallback實(shí)現(xiàn)類,你就可以知道消息發(fā)送的結(jié)果了

單向消息

所謂的單向消息就是指,生產(chǎn)者發(fā)送消息給服務(wù)端之后,就直接不管了

所以對(duì)于生產(chǎn)者來說,他是不會(huì)去care消息發(fā)送的結(jié)果了,即使發(fā)送失敗了,對(duì)于生產(chǎn)者來說也是無(wú)所謂的

所以這種方式的主要應(yīng)用于那種能夠忍受丟消息的操作場(chǎng)景

比如像日志收集就比較適合使用這種方式

單向消息的發(fā)送是通過sendOneway來調(diào)用的

Message message = new Message("sanyouTopic", "三友的java日記".getBytes());

//發(fā)送單向消息
producer.sendOneway(message);

總的來說,同步消息、異步消息、單向消息代表的是消息的發(fā)送方式,主要是針對(duì)消息的發(fā)送方來說,對(duì)消息的存儲(chǔ)之類是的沒有任何影響的

最后

ok,到這本文就結(jié)束了

本文又又是一篇非常非常肝的文章,不知道你是否堅(jiān)持看到這里

我在寫的過程中也是不斷地死磕源碼,盡可能避免出現(xiàn)錯(cuò)誤的內(nèi)容

同時(shí)也在嘗試爭(zhēng)取把我所看到的源碼以一種最簡(jiǎn)單的方式說出來

責(zé)任編輯:武曉燕 來源: 三友的java日記
相關(guān)推薦

2024-09-09 23:15:55

2023-05-23 22:19:04

索引MySQL優(yōu)化

2024-07-02 01:06:33

2023-04-26 10:06:08

RocketMQ屬性Consumer

2020-11-05 08:14:17

鏈表

2022-09-26 10:43:13

RocketMQ保存消息

2024-01-02 22:47:47

Nacos注冊(cè)中心節(jié)點(diǎn)

2023-06-12 08:49:12

RocketMQ消費(fèi)邏輯

2022-06-13 11:05:35

RocketMQ消費(fèi)者線程

2022-07-11 11:06:11

RocketMQ函數(shù).消費(fèi)端

2020-06-22 16:55:49

前端異常處理錯(cuò)誤

2024-06-04 00:01:00

2021-03-19 16:05:33

CSS CSS 屬性CSS 基礎(chǔ)

2022-06-27 11:04:24

RocketMQ順序消息

2022-07-04 11:06:02

RocketMQ事務(wù)消息實(shí)現(xiàn)

2020-09-21 10:50:24

Java多線程代碼

2017-05-22 14:45:51

大數(shù)據(jù)神經(jīng)網(wǎng)絡(luò)架構(gòu)

2022-11-17 09:14:58

MySQL加行級(jí)鎖幻讀

2021-03-03 00:01:30

Redis數(shù)據(jù)結(jié)雙向鏈表

2019-09-02 11:14:08

隔離虛擬機(jī)操作系統(tǒng)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)