偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

RocketMQ事務(wù)消息如何保證數(shù)據(jù)的一致性

開發(fā) 前端
分布式事務(wù)是大廠面試必問題,而目前大部分公司都是采用可靠消息來保證數(shù)據(jù)的最終一種性,通常會采用 RocketMQ 來實現(xiàn)。如果想去阿里的同學(xué),建議 MQ 這塊,選擇 RocketMQ 多復(fù)習(xí)一下。

[[385043]]

本文轉(zhuǎn)載自微信公眾號「菜鳥飛呀飛」,作者劉進坤。轉(zhuǎn)載本文請聯(lián)系菜鳥飛呀飛公眾號。  

前言

在面過的幾家大廠中,幾乎每輪的面試官(「沒寫錯,幾乎是每輪面試官」)都問了同樣一個問題:你們的系統(tǒng)是分布式的系統(tǒng)嗎?

答:是。

面試官:那么你們分布式的系統(tǒng)是如何解決分布式事務(wù)這個問題的呢?也就是如何保證數(shù)據(jù)的一致性。

答:我們的系統(tǒng)中通過 RocketMQ 的事務(wù)消息來保證數(shù)據(jù)的最終一致性。

面試官:那你說說它是如何來保證數(shù)據(jù)的最終一致性的?

答:分兩部分來回答,第一部分先回答事務(wù)消息的實現(xiàn)流程,第二部分解釋為什么它能保證數(shù)據(jù)的最終一致性。

事務(wù)消息的實現(xiàn)流程

事務(wù)消息

  1. 首先服務(wù) A 發(fā)送一個半事務(wù)消息(也稱 half 消息)至 MQ 中。為什么要先發(fā)送一個 half 消息呢?這是為了保證服務(wù) A 和 MQ 之間的通信正常,如果無法正常通信,則服務(wù) A 可以直接返回一個異常,也就不用處理后面的邏輯的了。
  2. 如果 half 消息發(fā)送成功,MQ 收到這個 half 消息后,會返回一個 success 響應(yīng)給服務(wù) A。
  3. 服務(wù) A 接收到 MQ 返回的 success 響應(yīng)后,開始處理本地的業(yè)務(wù)邏輯,并提交本地事務(wù)。
  4. 如果服務(wù) A 本地事務(wù)提交成功,則會向 MQ 中發(fā)送 commit,表示將 half 消息提交,MQ 就會執(zhí)行第 5 步操作;如果服務(wù) A 本地事務(wù)提交失敗,則直接回滾本地事務(wù),并向 MQ 中發(fā)送 rollback,表示將之前的 half 消息進行回滾,MQ 接收到 rollback 消息后,就會將 half 消息刪除。
  5. 如果 commit,則將 half 消息寫入到磁盤。
  6. 如果 MQ 長時間沒有接收到 commit 或者 rollback 消息,例如:服務(wù) A 在處理本地業(yè)務(wù)時宕機了,或者發(fā)送的 commit、rollback 因為在弱網(wǎng)環(huán)境,數(shù)據(jù)丟失了。那么 MQ 就會在一定時間后嘗試調(diào)用服務(wù) A 提供的一個接口,通過這個接口來判斷 half 消息的狀態(tài)。所以服務(wù) A 提供的接口,需要實現(xiàn)的業(yè)務(wù)邏輯是:通過數(shù)據(jù)庫中對應(yīng)數(shù)據(jù)的狀態(tài)來判斷,之前的 half 消息對應(yīng)的業(yè)務(wù)是否執(zhí)行成功。如果 MQ 從這個接口中得知 half 消息執(zhí)行成功了,那么 MQ 就會將 half 消息持久化到本地磁盤,如果得知沒有執(zhí)行成功,那么就會將 half 消息刪除。
  7. 服務(wù) B 從 MQ 中消費到對應(yīng)的消息。
  8. 服務(wù) B 處理本地業(yè)務(wù)邏輯,然后提交本地事務(wù)。

如何保證數(shù)據(jù)的最終一致性

實現(xiàn)流程說完了,可能你現(xiàn)在有各種各樣的疑惑?

Q: half 消息是個啥?

A: 它和我們正常發(fā)送的普通消息是一樣的,都是存儲在 MQ 中,唯一不同的是 half 在 MQ 中不會立馬被消費者消費到,除非這個 half 消息被 commit 了。(至于為什么未 commit 的 half 消息無法被消費者讀取到,這是因為在 MQ 內(nèi)部,對于事務(wù)消息而言,在 commit 之前,會先放在一個內(nèi)部隊列中,只有 commit 了,才會真正將消息放在消費者能讀取到的 topic 隊列中)

Q: 為什么要先發(fā)送 half 消息?

A: 前面已經(jīng)解釋過了,主要是為了保證服務(wù) A 和 MQ 之間是否能正常通信,如果兩者之間都不能正常通信,后面還玩?zhèn)€錘子,直接返回異常就可以了。

Q: 如果 MQ 接收到了 half 消息,但是在返回 success 響應(yīng)的時候,因為網(wǎng)絡(luò)原因,導(dǎo)致服務(wù) A 沒有接收到 success 響應(yīng),這個時候是什么現(xiàn)象?

A: 當(dāng)服務(wù) A 發(fā)送 half 消息后,它會等待 MQ 給自己返回 success 響應(yīng),如果沒有接收到,那么服務(wù) A 也會直接結(jié)束,返回異常,不再執(zhí)行后續(xù)邏輯。不執(zhí)行后續(xù)邏輯,這樣服務(wù) A 也就不會提交 commit 消息給 MQ,MQ 長時間沒接收到 commit 消息,那么它就會主動回調(diào)服務(wù) A 的一個接口,服務(wù) A 通過接口,查詢本地數(shù)據(jù)后,發(fā)現(xiàn)這條消息對應(yīng)的業(yè)務(wù)并沒有正常執(zhí)行,那么就告訴 MQ,這個 half 消息不能 commit,需要 rollback,MQ 知道后,就將 half 消息進行刪除。

Q: 如果服務(wù) A 本地事務(wù)執(zhí)行失敗了,怎么辦?

A: 服務(wù) A 本地事務(wù)執(zhí)行失敗后,先對自己本地事務(wù)進行回滾,然后再向 MQ 發(fā)送 rollback 操作。

Q: 服務(wù) A 本地事務(wù)提交成功或失敗后,向 MQ 發(fā)送的 commit 或者 rollback 消息,因為網(wǎng)絡(luò)問題丟失了,又該怎么處理?

A: 和上一個問題一樣,MQ 長時間沒有接收到 half 消息的 commit 或者 rollback 消息,MQ 會主動回調(diào)服務(wù) A 的接口,通過這個接口來判斷自己該對這個 half 消息如何處理。

Q: 前面說的全是事務(wù)消息的實現(xiàn)流程,這和事務(wù)消息如何保證數(shù)據(jù)的最終一致性有什么關(guān)系呢?

A: 有關(guān)系。首先,服務(wù) A 執(zhí)行本地事務(wù)并提交和向 MQ 中發(fā)送消息這是兩個寫操作,然后通過 RocketMQ 的事務(wù)消息,我們保證了這兩個寫操作要么都執(zhí)行成功,要么都執(zhí)行失敗。然后讓其他系統(tǒng),如服務(wù) B 通過消費 MQ 中的消息,然后再去執(zhí)行自己本地的事務(wù),這樣到最后,服務(wù) A 和服務(wù) B 這兩個系統(tǒng)的數(shù)據(jù)狀態(tài)是不是達到了一致?這就是最終一致性的含義。

如果要求服務(wù) A 和服務(wù) B 的數(shù)據(jù)狀態(tài),在服務(wù) A 返回給客戶端之間,這兩者就達到一致,這是強一致性,RocketMQ 是沒法保證強一致性的。

目前通過「可靠消息來保證數(shù)據(jù)的最終一致性」是很多大廠都采用的方案,基本都是通過 MQ 和補償機制來保證數(shù)據(jù)的一致性。(所謂的可靠消息,就是消息不丟失,如何保證 MQ 的消息不丟失,下篇文章會寫,這也是面試??碱})

Q: 服務(wù) B 本地事務(wù)提交失敗了,怎么辦?

A: 如果服務(wù) B 本地事務(wù)提交失敗了,可以進行多次重試,直到成功。如果重試多次后,還是提交失敗,例如此時服務(wù) B 對應(yīng)的 DB 宕機了,這個時候只要服務(wù) B 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會在一定時間后,繼續(xù)將這條消息推送給服務(wù) B,服務(wù) B 就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功。這樣,依舊是保證了服務(wù) A 和服務(wù) B 數(shù)據(jù)的最終一致性。

代碼實現(xiàn)

使用 RokcetMQ 的事務(wù)消息主要涉及到兩個部分:

如何發(fā)送半事務(wù)消息,這個可以通過「TransactionMQProducer」 類來實現(xiàn)。

  1. TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); 
  2. TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null); 
  3. // 通過result來判斷half消息是否發(fā)送成功 
  4. if(result.getSendStatus() == SendStatus.SEND_OK){ 
  5.     // 成功 
  6. }else
  7.     // 失敗 

在前面我們提到了服務(wù) A 需要提供一個接口,用來供 MQ 回調(diào)服務(wù) A,實際上這個接口就是一個監(jiān)聽器:「TransactionListener」的方法。這是一個接口,提供了兩個方法。

  1. public interface TransactionListener { 
  2.  
  3.      // 當(dāng)half消息發(fā)送成功后,我們在這里實現(xiàn)自己的業(yè)務(wù)邏輯,然后commit或者rollback 給MQ 
  4.     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); 
  5.  
  6.  
  7.      // 這個方法就是供MQ回調(diào)的方法,MQ通過回調(diào)該方法來判斷half消息的狀態(tài) 
  8.      // 可以看到,這個方法的參數(shù)是MessageExt,也就是half消息的內(nèi)容,如果根據(jù)MessageExt,我們完全能在服務(wù)A中判斷之前的業(yè)務(wù)是否處理成功 
  9.     LocalTransactionState checkLocalTransaction(final MessageExt msg); 

實際使用時,我們需要實現(xiàn)該接口,例如:

  1. public class MyTransactionListener implements TransactionListener { 
  2.  
  3.     @Override 
  4.     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  5.         try{ 
  6.             // 處理業(yè)務(wù)邏輯 
  7.             // .... 
  8.  
  9.             // 業(yè)務(wù)邏輯處理成功,commit 
  10.             return LocalTransactionState.COMMIT_MESSAGE; 
  11.         }catch (Exception e){ 
  12.  
  13.         } 
  14.         // 業(yè)務(wù)處理失敗,rollback 
  15.         return LocalTransactionState.ROLLBACK_MESSAGE; 
  16.     } 
  17.  
  18.     @Override 
  19.     public LocalTransactionState checkLocalTransaction(MessageExt msg) { 
  20.         return null
  21.     } 

另外,在創(chuàng)建 producer 時,指定我們實現(xiàn)實現(xiàn)的監(jiān)聽器

  1. TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); 
  2. transactionMQProducer.setTransactionListener(new MyTransactionListener()); 

總結(jié)

分布式事務(wù)是大廠面試必問題,而目前大部分公司都是采用可靠消息來保證數(shù)據(jù)的最終一種性,通常會采用 RocketMQ 來實現(xiàn)。如果想去阿里的同學(xué),建議 MQ 這塊,選擇 RocketMQ 多復(fù)習(xí)一下。

 

責(zé)任編輯:武曉燕 來源: 菜鳥飛呀飛
相關(guān)推薦

2019-08-30 12:46:10

并發(fā)扣款查詢SQL

2025-03-27 08:20:54

2022-10-19 12:22:53

并發(fā)扣款一致性

2024-12-26 15:01:29

2023-09-07 08:11:24

Redis管道機制

2025-02-10 03:00:00

2023-10-08 08:29:31

2023-11-07 07:32:46

RocketMQ數(shù)據(jù)一致性

2020-08-05 08:46:10

NFS網(wǎng)絡(luò)文件系統(tǒng)

2024-08-20 16:13:52

2023-05-26 07:34:50

RedisMySQL緩存

2021-12-14 07:15:57

MySQLRedis數(shù)據(jù)

2022-03-29 10:39:10

緩存數(shù)據(jù)庫數(shù)據(jù)

2024-10-28 12:41:25

2024-10-16 09:53:07

2024-01-10 08:01:55

高并發(fā)場景悲觀鎖

2022-04-06 15:19:32

數(shù)據(jù)庫MySQL一致性

2024-01-22 08:52:00

AQS雙異步數(shù)據(jù)一致性

2020-01-02 09:06:23

微服務(wù)數(shù)據(jù)框架

2024-07-04 12:36:50

點贊
收藏

51CTO技術(shù)棧公眾號