偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

事物消息的實現(xiàn)-RocketMQ知識體系6

開發(fā) 前端
RocketMQ提供了事務(wù)消息的功能,采用2PC(兩段式協(xié)議)+補償機制(事務(wù)回查)的分布式事務(wù)功能,通過消息隊列 RocketMQ 版事務(wù)消息能達到分布式事務(wù)的最終一致。

[[411281]]

分布式事務(wù)是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點之上。例如在大型電商系統(tǒng)中,下單接口通常會扣減庫存、減去優(yōu)惠、生成訂單 id, 而訂單服務(wù)與庫存、優(yōu)惠、訂單 id 都是不同的服務(wù),下單接口的成功與否,不僅取決于本地的 db 操作,而且依賴第三方系統(tǒng)的結(jié)果,這時候分布式事務(wù)就保證這些操作要么全部成功,要么全部失敗。本質(zhì)上來說,分布式事務(wù)就是為了保證不同數(shù)據(jù)庫的數(shù)據(jù)一致性。

目前解決分布式事物的解決方案有seata,lcn 等。

RocketMQ 分布式事物實現(xiàn)

RocketMQ提供了事務(wù)消息的功能,采用2PC(兩段式協(xié)議)+補償機制(事務(wù)回查)的分布式事務(wù)功能,通過消息隊列 RocketMQ 版事務(wù)消息能達到分布式事務(wù)的最終一致。

首先,我們要知道什么是半事物消息和消息回查:

  • 半事務(wù)消息:

暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列 RocketMQ 版服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。

  • 消息回查:

由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認丟失,消息隊列 RocketMQ 版服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該詢問過程即消息回查。

【交互流程】

事務(wù)消息發(fā)送步驟如下:

  1. 發(fā)送方將半事務(wù)消息發(fā)送至消息隊列 RocketMQ 版服務(wù)端。
  2. 消息隊列 RocketMQ 版服務(wù)端將消息持久化成功之后,向發(fā)送方返回 Ack。確認消息已經(jīng)發(fā)送成功,此時消息為半事務(wù)消息。
  3. 發(fā)送方開始執(zhí)行本地事務(wù)邏輯。
  4. 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(Commit 或是 Rollback),服務(wù)端收到 Commit 狀態(tài)則將半事務(wù)消息標記為可投遞,訂閱方最終將收到該消息;服務(wù)端收到 Rollback 狀態(tài)則刪除半事務(wù)消息,訂閱方將不會接受該消息。

事務(wù)消息回查步驟如下:

  1. 在斷網(wǎng)或者是應(yīng)用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務(wù)端,經(jīng)過固定時間后服務(wù)端將對該消息發(fā)起消息回查。
  2. 發(fā)送方收到消息回查后,需要檢查對應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
  3. 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認,服務(wù)端仍按照步驟 4 對半事務(wù)消息進行操作。

總體而言RocketMQ事務(wù)消息分為兩條主線:

  • 發(fā)送流程:發(fā)送half message(半消息),執(zhí)行本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果
  • 定時任務(wù)回查流程:MQ定時任務(wù)掃描半消息,回查本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果

源碼相關(guān)

Producer發(fā)送事務(wù)半消息的(prepare)

在本地應(yīng)用發(fā)送事務(wù)消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復(fù)用大部分發(fā)送消息相關(guān)的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法

這里的transactionListener就是上面所說的消息回查的類,它提供了2個方法:

  • executeLocalTransaction

執(zhí)行本地事務(wù)

  • checkLocalTransaction

回查本地事務(wù)

接著看DefaultMQProducer.sendMessageInTransaction()方法:

該方法主要做了以下事情

  • 給消息打上事務(wù)消息相關(guān)的tag,用于broker區(qū)分普通消息和事務(wù)消息
  • 發(fā)送半消息(half message)
  • 發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)
  • 執(zhí)行endTransaction方法,告訴 broker 執(zhí)行 commit/rollback。

執(zhí)行本地事務(wù)

Producer 半事務(wù)消息發(fā)送成功后,會調(diào)用transactionListener.executeLocalTransaction方法執(zhí)行本地事務(wù)。只有半消息發(fā)送成功后,才會執(zhí)行本地事務(wù),如果半消息發(fā)送失敗,則設(shè)置回滾。

結(jié)束事務(wù)(commit/rollback)

本地事務(wù)執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務(wù)執(zhí)行狀態(tài),去提交事務(wù)或者回滾事務(wù)。

如果半消息發(fā)送失敗或本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息,半消息發(fā)送成功且本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息

Broker端處理事務(wù)消息

Broker端通過SendMessageProcessor.processRequest()方法接收處理 Producer 發(fā)送的消息 最后會調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進行消息存儲。

存儲半消息

代碼 prepareMessage(msgInner) :

在這一步,備份消息的原主題名稱與原隊列ID,然后取消事務(wù)消息的消息標簽,重新設(shè)置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。

到這里,Broker 就初步處理完了 Producer 發(fā)送的事務(wù)半消息。

半消息事務(wù)回查

兩段式協(xié)議發(fā)送與提交回滾消息,執(zhí)行完本地事務(wù)消息的狀態(tài)為UNKNOW時,結(jié)束事務(wù)不做任何操作。通過事務(wù)狀態(tài)定時回查得到發(fā)送端的事務(wù)狀態(tài)是rollback或commit。

通過TransactionalMessageCheckService線程定時去檢測RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務(wù)狀態(tài)。

  • RMQ_SYS_TRANS_HALF_TOPIC

prepare消息的主題,事務(wù)消息首先先進入到該主題。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC

當消息服務(wù)器收到事務(wù)消息的提交或回滾請求后,會將消息存儲在該主題下。

Broker處理END_TRANSACTION

當Producer或者回查定時任務(wù)提交/回滾事務(wù)的時候,Broker如何處理事務(wù)消息提交、回滾命令的?其核心實現(xiàn)如下:

  • 根據(jù)commitlogOffset找到消息
  • 如果是提交動作,就恢復(fù)原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
  • 回滾消息,則直接將原預(yù)處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理。

整體實現(xiàn)流程

如果消費端消費失敗了怎么辦?

如果有消息消費失敗了,則將失敗的消息回傳給broker,即重新寫入commitLog文件,消費者將重新消費;如果消息回傳的時候,consumer和broker之間網(wǎng)絡(luò)斷開,則consumer會調(diào)用submitConsumeRequestLater()方法,在consumer端進行重新消費,如果仍然消費失敗,會不斷重試直到達到默認的16次,你可以使用msg.getReconsumeTimes()方法來獲取當前重試次數(shù),如果重試次數(shù)足夠多之后仍然無法消費成功,必須通過工單、日志等方式進行人工干預(yù)以讓producer事務(wù)進行回退處理。

Producer發(fā)送半消息失敗

可能由于網(wǎng)絡(luò)或者mq故障,導(dǎo)致 Producer 訂單系統(tǒng) 發(fā)送半消息(prepare)失敗。

這時訂單系統(tǒng)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等,走逆向流程退款給用戶。

半消息發(fā)送成功,本地事務(wù)執(zhí)行失敗

如果訂單系統(tǒng)發(fā)送的半消息成功了,但是執(zhí)行本地事務(wù)失敗了,如更新訂單狀態(tài)為“已完成”。

這種情況下,執(zhí)行本地事務(wù)失敗后,會返回rollback給 MQ,MQ會刪除之前發(fā)送的半消息。 也就不會調(diào)用優(yōu)惠券系統(tǒng)了。

半消息發(fā)送成功,沒收到MQ返回的響應(yīng)

假如訂單系統(tǒng)發(fā)送半消息成功后,沒有收到MQ返回的響應(yīng)。

這個時候可能是因為網(wǎng)絡(luò)問題,或者其他異常報錯,訂單系統(tǒng)誤以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。

但這個時候其實mq已經(jīng)保存半消息成功了,那這個消息怎么處理?

這個時候MQ的后臺消息回查定時任務(wù)TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務(wù),這時MQ就會發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時就要發(fā)送rollback請求給mq,刪除之前的半消息。

如果commit/rollback失敗了

這個其實也是通過定時任務(wù)TransactionalMessageCheckService,它會發(fā)現(xiàn)這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務(wù)。

小結(jié)

消息隊列RocketMQ分布式事務(wù)消息不僅可以實現(xiàn)應(yīng)用之間的解耦,又能保證數(shù)據(jù)的最終一致性。同時,傳統(tǒng)的大事務(wù)可以被拆分為小事務(wù),不僅能提升效率,還不會因為某一個關(guān)聯(lián)應(yīng)用的不可用導(dǎo)致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。在極端情況下,如果關(guān)聯(lián)的某一個應(yīng)用始終無法處理成功,也只需對當前應(yīng)用進行補償或數(shù)據(jù)訂正處理,而無需對整體業(yè)務(wù)進行回滾。

從RocketMQ事務(wù)型消息鏈路體現(xiàn)了面向失敗的設(shè)計思路,也體現(xiàn)了事務(wù)型系統(tǒng)的嚴謹性,在第二階段的消息沒有送達的時候,broker會主動請求producer端去做check,producer做完check后會將事務(wù)的狀態(tài)再次返回。雖然說實現(xiàn)最終一致的方案有很多,但是事務(wù)型消息是比較優(yōu)雅實現(xiàn)方式之一。

本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。

 

責任編輯:武曉燕 來源: 小汪哥寫代碼
相關(guān)推薦

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-07-07 15:29:52

存儲RocketMQ體系

2021-07-09 07:15:48

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-16 18:44:42

RocketMQ知識

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2023-07-18 09:03:01

RocketMQ場景消息

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2015-07-28 17:52:36

IOS知識體系

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構(gòu)

2017-06-22 13:07:21

2012-03-08 11:13:23

企業(yè)架構(gòu)

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2021-07-08 05:52:34

Kafka架構(gòu)主從架構(gòu)

2015-07-16 10:15:44

web前端知識體系

2020-09-09 09:15:58

Nginx體系進程

2020-10-26 08:34:18

知識體系普適性

2020-03-09 10:31:58

vue前端開發(fā)

2017-07-25 17:34:54

大數(shù)據(jù)機器學(xué)習(xí)數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號