偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

分布式事務(wù)實(shí)現(xiàn)方案:一文詳解RocketMQ事務(wù)消息

云計(jì)算 分布式
RocketMQ 事務(wù)消息是分布式事務(wù)中一種常見(jiàn)的實(shí)現(xiàn)方案,只是把發(fā)送消息和本地事務(wù)放在一個(gè)事務(wù)中,并且只保證最終一致性,無(wú)法保證強(qiáng)一致性。

常見(jiàn)的分布式事務(wù)實(shí)現(xiàn)方案有以下幾種:兩階段提交(2PC)、兩階段提交(2PC)、補(bǔ)償事務(wù)(Saga)、MQ事務(wù)消息等。今天就講一下 RocketMQ 的事務(wù)消息,是一種非常特殊的分布式事務(wù)實(shí)現(xiàn)方案,基于半消息(Half Message)機(jī)制實(shí)現(xiàn)的。 看完這篇想一下,RocketMQ事務(wù)消息到底能不能保證分布式系統(tǒng)中數(shù)據(jù)的強(qiáng)一致性?

實(shí)現(xiàn)原理

RocketMQ事務(wù)消息執(zhí)行流程如下:

  1. 生產(chǎn)者將消息發(fā)送至RocketMQ服務(wù)端。
  2. RocketMQ服務(wù)端將消息持久化成功之后,向生產(chǎn)者返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息被標(biāo)記為"暫不能投遞",這種狀態(tài)下的消息即為半事務(wù)消息(Half Message)。
  3. 生產(chǎn)者開始執(zhí)行本地事務(wù)邏輯。
  4. 生產(chǎn)者根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)結(jié)果(Commit或是Rollback),服務(wù)端收到確認(rèn)結(jié)果后處理邏輯如下:

二次確認(rèn)結(jié)果為Commit:服務(wù)端將半事務(wù)消息標(biāo)記為可投遞,并投遞給消費(fèi)者。

二次確認(rèn)結(jié)果為Rollback:服務(wù)端將回滾事務(wù),不會(huì)將半事務(wù)消息投遞給消費(fèi)者。

  1. 在斷網(wǎng)或者是生產(chǎn)者應(yīng)用重啟的特殊情況下,若服務(wù)端未收到發(fā)送者提交的二次確認(rèn)結(jié)果,或服務(wù)端收到的二次確認(rèn)結(jié)果為Unknown未知狀態(tài),經(jīng)過(guò)固定時(shí)間后,服務(wù)端將對(duì)消息生產(chǎn)者即生產(chǎn)者集群中任一生產(chǎn)者實(shí)例發(fā)起消息回查。
  2. 生產(chǎn)者收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  3. 生產(chǎn)者根據(jù)檢查到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行處理。

圖片圖片

代碼實(shí)現(xiàn)

RocketMQ事務(wù)消息示例如下:

//演示demo,模擬訂單表查詢服務(wù),用來(lái)確認(rèn)訂單事務(wù)是否提交成功。
private static boolean checkOrderById(String orderId) {
    return true;
}

//演示demo,模擬本地事務(wù)的執(zhí)行結(jié)果。
private static boolean doLocalTransaction() {
    return true;
}

public static void main(String[] args) throws ClientException {
    ClientServiceProvider provider = new ClientServiceProvider();
    MessageBuilder messageBuilder = new MessageBuilderImpl();
    //構(gòu)造事務(wù)生產(chǎn)者:事務(wù)消息需要生產(chǎn)者構(gòu)建一個(gè)事務(wù)檢查器,用于檢查確認(rèn)異常半事務(wù)的中間狀態(tài)。
    Producer producer = provider.newProducerBuilder()
            .setTransactionChecker(messageView -> {
                /**
                 * 事務(wù)檢查器一般是根據(jù)業(yè)務(wù)的ID去檢查本地事務(wù)是否正確提交還是回滾,此處以訂單ID屬性為例。
                 * 在訂單表找到了這個(gè)訂單,說(shuō)明本地事務(wù)插入訂單的操作已經(jīng)正確提交;如果訂單表沒(méi)有訂單,說(shuō)明本地事務(wù)已經(jīng)回滾。
                 */
                final String orderId = messageView.getProperties().get("OrderId");
                if (Strings.isNullOrEmpty(orderId)) {
                    // 錯(cuò)誤的消息,直接返回Rollback。
                    return TransactionResolution.ROLLBACK;
                }
                return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
            })
            .build();
    //開啟事務(wù)
    final Transaction transaction;
    try {
        transaction = producer.beginTransaction();
    } catch (ClientException e) {
        e.printStackTrace();
        //事務(wù)開啟失敗,直接退出。
        return;
    }
    Message message = messageBuilder.setTopic("topic")
            //設(shè)置消息索引鍵,可根據(jù)關(guān)鍵字精確查找某條消息。
            .setKeys("messageKey")
            //設(shè)置消息Tag,用于消費(fèi)端根據(jù)指定Tag過(guò)濾消息。
            .setTag("messageTag")
            //一般事務(wù)消息都會(huì)設(shè)置一個(gè)本地事務(wù)關(guān)聯(lián)的唯一ID,用來(lái)做本地事務(wù)回查的校驗(yàn)。
            .addProperty("OrderId", "xxx")
            //消息體。
            .setBody("messageBody".getBytes())
            .build();
    //發(fā)送半事務(wù)消息
    final SendReceipt sendReceipt;
    try {
        sendReceipt = producer.send(message, transaction);
    } catch (ClientException e) {
        //半事務(wù)消息發(fā)送失敗,事務(wù)可以直接退出并回滾。
        return;
    }
    /**
     * 執(zhí)行本地事務(wù),并確定本地事務(wù)結(jié)果。
     * 1. 如果本地事務(wù)提交成功,則提交消息事務(wù)。
     * 2. 如果本地事務(wù)提交失敗,則回滾消息事務(wù)。
     * 3. 如果本地事務(wù)未知異常,則不處理,等待事務(wù)消息回查。
     *
     */
    boolean localTransactionOk = doLocalTransaction();
    if (localTransactionOk) {
        try {
            transaction.commit();
        } catch (ClientException e) {
            // 業(yè)務(wù)可以自身對(duì)實(shí)時(shí)性的要求選擇是否重試,如果放棄重試,可以依賴事務(wù)消息回查機(jī)制進(jìn)行事務(wù)狀態(tài)的提交。
            e.printStackTrace();
        }
    } else {
        try {
            transaction.rollback();
        } catch (ClientException e) {
            // 建議記錄異常信息,回滾異常時(shí)可以無(wú)需重試,依賴事務(wù)消息回查機(jī)制進(jìn)行事務(wù)狀態(tài)的提交。
            e.printStackTrace();
        }
    }
}

注意事項(xiàng)

  • 冪等性: 消費(fèi)者處理消息時(shí)需要確保業(yè)務(wù)邏輯的冪等性,以應(yīng)對(duì)消息可能的重復(fù)消費(fèi)。
  • 超時(shí)和監(jiān)控: 設(shè)置合理的超時(shí)時(shí)間,并監(jiān)控事務(wù)消息的性能

總結(jié)

RocketMQ 事務(wù)消息是分布式事務(wù)中一種常見(jiàn)的實(shí)現(xiàn)方案,只是把發(fā)送消息和本地事務(wù)放在一個(gè)事務(wù)中,并且只保證最終一致性,無(wú)法保證強(qiáng)一致性。 原因有兩點(diǎn):

  1. 執(zhí)行完成本地事務(wù)后,在commit事務(wù)消息之前,這段時(shí)間內(nèi)數(shù)據(jù)是不一致的,所以只是保證了發(fā)送消息和本地事務(wù)的最終一致性。
  2. 在commit事務(wù)消息之后,然后把消息投遞給消費(fèi)者。至于消費(fèi)者是否消費(fèi)消息,什么時(shí)候消費(fèi)?也都是不可控的,所以也只能盡量保證數(shù)據(jù)最終一致性。
責(zé)任編輯:武曉燕 來(lái)源: 一燈架構(gòu)
相關(guān)推薦

2025-04-29 04:00:00

分布式事務(wù)事務(wù)消息

2021-06-28 10:03:44

分布式數(shù)據(jù)庫(kù)架構(gòu)

2022-06-27 08:21:05

Seata分布式事務(wù)微服務(wù)

2022-05-30 10:37:35

分布式事務(wù)反向補(bǔ)償

2023-11-06 13:15:32

分布式事務(wù)Seata

2024-06-11 13:50:43

2024-03-29 13:30:41

分布式事務(wù)節(jié)點(diǎn)

2019-10-10 09:16:34

Zookeeper架構(gòu)分布式

2025-06-04 01:00:00

2024-01-26 13:17:00

rollbackMQ訂單系統(tǒng)

2023-01-06 09:19:12

Seata分布式事務(wù)

2024-08-19 09:05:00

Seata分布式事務(wù)

2022-07-10 20:24:48

Seata分布式事務(wù)

2017-07-26 15:08:05

大數(shù)據(jù)分布式事務(wù)

2022-06-21 08:27:22

Seata分布式事務(wù)

2021-06-16 08:33:02

分布式事務(wù)ACID

2023-09-04 08:00:53

提交事務(wù)消息

2019-08-19 10:24:33

分布式事務(wù)數(shù)據(jù)庫(kù)

2019-11-04 08:38:45

分布式事務(wù)主流TCC

2016-10-25 14:35:05

分布式系統(tǒng) 存儲(chǔ)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)