三劍客 RocketMQ 事務(wù)消息 + 本地消息表 + XXL-Job 對賬,實現(xiàn)分布式事務(wù) 1w-10wqps 高并發(fā)
CP (強(qiáng)一致)和AP(高并發(fā))的根本沖突
從上面的指標(biāo)數(shù)據(jù)可以知道, Seata AT/TCC是 強(qiáng)一致,并發(fā)能力弱。
CP (強(qiáng)一致)和AP(高并發(fā))是一對 根本矛盾,存在根本沖突。
10Wqps 的高并發(fā)事務(wù),并不是CP,而是屬于AP 高并發(fā)。Seata 如果不做特殊改造,很難滿足。
CAP 定理
CAP 該定理指出一個 分布式系統(tǒng) 最多只能同時滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯性(Partition tolerance)這三項中的兩項。
CAP定理的三個要素可以用來描述分布式系統(tǒng)的一致性和可用性。
如果事務(wù)要追求高并發(fā),根據(jù)cap定理,需要放棄強(qiáng)一致性,只需要保證數(shù)據(jù)的最終一致性。
所以,在實踐可以使用本地消息表的方案來解決分布式事務(wù)問題。
經(jīng)典ebay 本地消息表方案
本地消息表方案最初是ebay提出的,其實也是BASE理論的應(yīng)用,屬于可靠消息最終一致性的范疇。
消息生產(chǎn)方/ 消息消費方,需要額外建一個消息表,并記錄消息發(fā)送狀態(tài)。
一個簡單的本地消息表, 設(shè)計如下:
字段 | 類型 | 注釋 |
id | long | id |
msg_type | varchar | 消息類型 |
biz_id | varchar | 業(yè)務(wù)唯一標(biāo)志 |
content | text | 消息體 |
state | varchar | 狀態(tài)(待發(fā)送,已消費) |
create_time | datetime | 創(chuàng)建時間 |
update_time | datetime | 更新時間 |
消息表和業(yè)務(wù)數(shù)據(jù)要在一個事務(wù)里提交,也就是說他們要在一個數(shù)據(jù)庫里面。
然后消息會經(jīng)過MQ發(fā)送到消息的消費方。如果消息發(fā)送失敗,會進(jìn)行重試發(fā)送。
消息消費方 需要處理這個消息,并完成自己的業(yè)務(wù)邏輯。
此時如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。
如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作。
經(jīng)典 ebay 本地消息表步驟
生產(chǎn)方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發(fā)送一遍。
發(fā)送消息方:
- 需要有一個消息表,記錄著消息狀態(tài)相關(guān)信息。
- 業(yè)務(wù)數(shù)據(jù)和消息表在同一個數(shù)據(jù)庫,要保證它倆在同一個本地事務(wù)。直接利用本地事務(wù),將業(yè)務(wù)數(shù)據(jù)和事務(wù)消息直接寫入數(shù)據(jù)庫。
- 在本地事務(wù)中處理完業(yè)務(wù)數(shù)據(jù)和寫消息表操作后,通過寫消息到 MQ 消息隊列。使用專門的投遞工作線程進(jìn)行事務(wù)消息投遞到MQ,根據(jù)投遞ACK去刪除事務(wù)消息表記錄。
- 消息會發(fā)到消息消費方,如果發(fā)送失敗,即進(jìn)行重試。
消息消費方:
- 處理消息隊列中的消息,完成自己的業(yè)務(wù)邏輯。
- 如果本地事務(wù)處理成功,則表明已經(jīng)處理成功了。
- 如果本地事務(wù)處理失敗,那么就會重試執(zhí)行。
- 如果是業(yè)務(wù)層面的失敗,給消息生產(chǎn)方發(fā)送一個業(yè)務(wù)補(bǔ)償消息,通知進(jìn)行回滾等操作。
生產(chǎn)方和消費方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發(fā)送一遍。
圖片
經(jīng)典ebay本地消息表方案中,還設(shè)計了靠譜的自動對賬補(bǔ)賬邏輯,確保數(shù)據(jù)的最終一致性。
經(jīng)典ebay本地消息表的注意事項
使用本地消息表實現(xiàn)分布式事務(wù)可以確保消息在分布式環(huán)境中的可靠傳遞和一致性。
然而,需要注意以下幾點:
- 消息的冪等性: 消費者一定需要保證接口的冪等性,消息的冪等性非常重要,以防止消息重復(fù)處理導(dǎo)致的數(shù)據(jù)不一致。
- 本地消息表的設(shè)計: 本地消息表的設(shè)計需要考慮到消息狀態(tài)、重試次數(shù)、創(chuàng)建時間等字段,以便實現(xiàn)消息的跟蹤和管理。
- 定時任務(wù)和重試機(jī)制: 需要實現(xiàn)定時任務(wù)或者重試機(jī)制來確保消息的可靠發(fā)送和處理。
經(jīng)典ebay本地消息表訪問的優(yōu)點和缺點:
優(yōu)點:
- 本地消息表建設(shè)成本比較低,實現(xiàn)了可靠消息的傳遞確保了分布式事務(wù)的最終一致性。
- 無需提供回查方法,進(jìn)一步減少的業(yè)務(wù)的侵入。
- 在某些場景下,還可以進(jìn)一步利用注解等形式進(jìn)行解耦,有可能實現(xiàn)無業(yè)務(wù)代碼侵入式的實現(xiàn)。
缺點:
- 本地消息表與業(yè)務(wù)耦合在一起,難于做成通用性,不可獨立伸縮。
- 本地消息表是基于數(shù)據(jù)庫來做的,而數(shù)據(jù)庫是要讀寫磁盤IO的,因此在高并發(fā)下是有性能瓶頸的。
- 數(shù)據(jù)大時,消息積壓問題,掃表效率慢。
- 數(shù)據(jù)大時,事務(wù)表數(shù)據(jù)爆炸,定時掃表存在延遲問題。
使用定時任務(wù)(如 XXL-Job )實現(xiàn)分布式事務(wù)最終一致性方案
通過 XXL-Job 定時任務(wù)替代延遲消息,定期查詢 “待對賬” 業(yè)務(wù)數(shù)據(jù),對比 Service A 與 Service B 的業(yè)務(wù)狀態(tài),通過 “主動核查 + 差異修復(fù)” 確保最終一致性。
核心是 “本地事務(wù)保初始一致 + 定時任務(wù)查狀態(tài)差異 + 人工 / 自動補(bǔ)單修偏差”。
1. Service A(發(fā)起方):本地消息表設(shè)計與事務(wù)保障
Service A 在執(zhí)行核心業(yè)務(wù)(如創(chuàng)建訂單)時,需在本地數(shù)據(jù)庫事務(wù)中同時完成兩件事:
- 執(zhí)行核心業(yè)務(wù)邏輯(如插入訂單表,狀態(tài)標(biāo)記為 “已創(chuàng)建”);
- 插入 “本地消息對賬表”(字段含:對賬 ID、業(yè)務(wù) ID(如訂單 ID)、業(yè)務(wù)類型(如 “訂單扣庫存”)、Service A 狀態(tài)(如 “訂單已創(chuàng)建”)、Service B 狀態(tài)(初始為 “未確認(rèn)”)、對賬狀態(tài)(初始為 “待對賬”)、創(chuàng)建時間、最后對賬時間),確保 “業(yè)務(wù)成功則對賬記錄必存在”,避免初始數(shù)據(jù)缺失。
2. Service B(依賴方):業(yè)務(wù)狀態(tài)可查與結(jié)果反饋
Service B 執(zhí)行依賴業(yè)務(wù)(如扣減庫存)時,需:
- 執(zhí)行核心業(yè)務(wù)邏輯(如扣減庫存表,標(biāo)記 “已扣減”,并關(guān)聯(lián) Service A 的業(yè)務(wù) ID(訂單 ID));
- 提供狀態(tài)查詢接口(如 “根據(jù)訂單 ID 查詢庫存扣減狀態(tài)”),返回 “已成功”“已失敗”“處理中” 三種明確狀態(tài),方便定時任務(wù)核查;
- 若 Service B 執(zhí)行成功 / 失敗,可主動調(diào)用 Service A 的 “狀態(tài)回調(diào)接口” 更新本地消息對賬表的 “Service B 狀態(tài)”(非強(qiáng)制,定時任務(wù)會兜底核查)。
3、XXL-Job 定時任務(wù)設(shè)計:對賬與修復(fù)
定時任務(wù)執(zhí)行時,按以下步驟完成對賬:
1)篩選待對賬數(shù)據(jù):查詢 Service A 本地消息對賬表中 “對賬狀態(tài) = 待對賬” 且 “創(chuàng)建時間超過 5 分鐘”(避免業(yè)務(wù)未執(zhí)行完就對賬)的記錄,按分片范圍批量獲?。ㄈ缑看尾?1000 條,避免一次性查太多導(dǎo)致 OOM);
2) 雙端狀態(tài)查詢:對每條待對賬記錄,分別調(diào)用 Service A 的 “業(yè)務(wù)狀態(tài)接口”(確認(rèn)訂單是否真的已創(chuàng)建)、Service B 的 “狀態(tài)查詢接口”(確認(rèn)庫存是否已扣減);
3)狀態(tài)一致性判斷與處理:
- 若 “Service A 成功 + Service B 成功”:更新本地消息對賬表 “對賬狀態(tài) = 已一致”“Service B 狀態(tài) = 已成功”,完成對賬;
- 若 “Service A 失敗 + Service B 失敗”:更新 “對賬狀態(tài) = 已一致”“Service B 狀態(tài) = 已失敗”,無需額外處理;
- 若 “Service A 成功 + Service B 失敗 / 處理中”:觸發(fā)自動重試(調(diào)用 Service B 的 “重試執(zhí)行接口”,如重新扣減庫存),重試 3 次仍失敗則標(biāo)記 “對賬狀態(tài) = 不一致”,生成業(yè)務(wù)工單;
- 若 “Service A 失敗 + Service B 成功”:屬于異常數(shù)據(jù)(Service A 業(yè)務(wù)失敗但 Service B 執(zhí)行成功),直接標(biāo)記 “對賬狀態(tài) = 不一致”,生成業(yè)務(wù)工單;
4)異常兜底:若調(diào)用 Service A/Service B 接口超時,標(biāo)記該記錄 “對賬狀態(tài) = 待重試”,下次定時任務(wù)重新核查,避免因臨時網(wǎng)絡(luò)問題誤判不一致。
RocketMQ 事務(wù)消息 + 本地消息表 + XXL-Job 對賬 分布式式事務(wù)方案實操
本方案將規(guī)整為標(biāo)準(zhǔn) Markdown 格式,同時補(bǔ)充 XXL-Job 定時任務(wù)事務(wù)對賬機(jī)制,通過 “事務(wù)消息保初始一致性 + 定時對賬兜底差異” 的雙層保障,確保電商下單(生成訂單→扣減庫存→發(fā)送通知)場景的分布式事務(wù)最終一致。
1. 系統(tǒng)架構(gòu)
以電商下單場景為核心,涉及 3 個服務(wù)與 1 個中間件,職責(zé)分工明確:
- 訂單服務(wù)(發(fā)起方):核心服務(wù),負(fù)責(zé)生成訂單、記錄本地消息表、發(fā)送 RocketMQ 事務(wù)消息,同時提供訂單狀態(tài)查詢接口。
- 庫存服務(wù)(依賴方 1):訂閱訂單消息,執(zhí)行庫存扣減,提供庫存扣減狀態(tài)查詢接口,支持重試執(zhí)行。
- 通知服務(wù)(依賴方 2):訂閱訂單消息,發(fā)送短信 / APP 通知,提供通知發(fā)送狀態(tài)查詢接口。
- RocketMQ:作為事務(wù)協(xié)調(diào)中間件,接收訂單服務(wù)的事務(wù)消息,確認(rèn)本地事務(wù)成功后投遞消息至下游服務(wù)。
- XXL-Job:定時任務(wù)調(diào)度中心,部署對賬任務(wù),定期核查訂單服務(wù)與下游服務(wù)的業(yè)務(wù)狀態(tài)差異,兜底修復(fù)不一致數(shù)據(jù)。
2. 數(shù)據(jù)庫表設(shè)計
訂單服務(wù)的 本地消息表(message_log)包括對賬相關(guān)字段(如對賬狀態(tài)、重試次數(shù)、下次對賬時間),支撐定時對賬邏輯;
message_log 同時 包含 核心業(yè)務(wù)字段,確保消息與訂單的關(guān)聯(lián)可追溯。
-- 訂單服務(wù):本地消息表(含對賬字段)
CREATE TABLE message_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL COMMENT '訂單ID(關(guān)聯(lián)t_order表,唯一)',
rocketmq_msg_id VARCHAR(64) DEFAULT NULL COMMENT 'RocketMQ消息唯一ID(關(guān)聯(lián)消息中間件)',
message_content TEXT NOT NULL COMMENT '消息內(nèi)容(JSON格式,含orderId、skuId、quantity等)',
business_type VARCHAR(32) NOT NULL COMMENT '業(yè)務(wù)類型:ORDER_CREATE(創(chuàng)建訂單)、INVENTORY_DEDUCT(扣庫存)、NOTICE_SEND(發(fā)通知)',
msg_status ENUM('INIT','SENT','CONSUMED','FAIL') DEFAULT 'INIT' COMMENT '消息狀態(tài):INIT=初始,SENT=已投遞,CONSUMED=已消費,F(xiàn)AIL=失敗',
reconcile_status ENUM('PENDING','SUCCESS','FAIL','RETRY') DEFAULT 'PENDING' COMMENT '對賬狀態(tài):PENDING=待對賬,SUCCESS=對賬一致,F(xiàn)AIL=對賬不一致,RETRY=待重試',
retry_count TINYINT DEFAULT 0 COMMENT '對賬重試次數(shù)(最大5次)',
next_reconcile_time DATETIME NOT NULL COMMENT '下次對賬時間(定時任務(wù)篩選依據(jù))',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '消息創(chuàng)建時間',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '狀態(tài)更新時間',
UNIQUE KEY uk_order_id_business_type (order_id, business_type) COMMENT '避免同一訂單同一業(yè)務(wù)類型重復(fù)發(fā)消息'
);
-- 訂單服務(wù):訂單表(簡化,僅保留核心字段)
CREATE TABLE t_order (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL COMMENT '訂單唯一編號',
user_id BIGINT NOT NULL COMMENT '用戶ID',
sku_id BIGINT NOT NULL COMMENT '商品SKU ID',
quantity INT NOT NULL COMMENT '購買數(shù)量',
order_status ENUM('CREATED','PAID','SHIPPED','FINISHED','CANCELED') DEFAULT 'CREATED' COMMENT '訂單狀態(tài)',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '訂單創(chuàng)建時間',
UNIQUE KEY uk_order_id (order_id)
);
-- 庫存服務(wù):庫存扣減記錄表(支撐狀態(tài)查詢與冪等)
CREATE TABLE inventory_deduct_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL COMMENT '訂單ID(關(guān)聯(lián)訂單服務(wù))',
sku_id BIGINT NOT NULL COMMENT '商品SKU ID',
deduct_quantity INT NOT NULL COMMENT '扣減數(shù)量',
deduct_status ENUM('SUCCESS','FAIL','PROCESSING') DEFAULT 'PROCESSING' COMMENT '扣減狀態(tài)',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_id (order_id) COMMENT '按訂單ID冪等,避免重復(fù)扣減'
);3. 代碼實現(xiàn)
3.1 Producer 端(訂單服務(wù)):事務(wù)消息發(fā)送
通過 TransactionMQProducer 發(fā)送事務(wù)消息,確保 “生成訂單” 與 “記錄本地消息” 在同一本地事務(wù)中,保證初始數(shù)據(jù)一致性;
同時初始化消息的對賬狀態(tài)(PENDING)與下次對賬時間(默認(rèn) 5 分鐘后,避免業(yè)務(wù)未執(zhí)行完就對賬)。
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Service
public class OrderServiceImpl implements OrderService {
// 注入RocketMQ事務(wù)生產(chǎn)者(單例,由Spring容器初始化)
@Resource
private TransactionMQProducer transactionMQProducer;
@Resource
private OrderMapper orderMapper;
@Resource
private MessageLogMapper messageLogMapper;
/
* 創(chuàng)建訂單 + 發(fā)送事務(wù)消息
*/
@Override
public void createOrder(OrderCreateDTO dto) throws Exception {
// 1. 構(gòu)造訂單數(shù)據(jù)(生成唯一訂單號)
String orderId = generateOrderId();
TOrder order = TOrder.builder()
.orderId(orderId)
.userId(dto.getUserId())
.skuId(dto.getSkuId())
.quantity(dto.getQuantity())
.orderStatus("CREATED")
.build();
// 2. 構(gòu)造RocketMQ事務(wù)消息(主題:OrderTopic,標(biāo)簽:INVENTORY_DEDUCT+NOTICE_SEND,支持多下游訂閱)
String msgContent = JSON.toJSONString(dto);
Message message = new Message(
"OrderTopic", // 主題:下游服務(wù)訂閱此主題
"INVENTORY_DEDUCT||NOTICE_SEND", // 標(biāo)簽:區(qū)分業(yè)務(wù)類型,下游可按標(biāo)簽過濾
orderId.getBytes(StandardCharsets.UTF_8), // 消息Key:訂單ID,便于定位
msgContent.getBytes(StandardCharsets.UTF_8)
);
// 3. 發(fā)送事務(wù)消息(將訂單數(shù)據(jù)作為參數(shù)透傳給事務(wù)監(jiān)聽器)
transactionMQProducer.sendMessageInTransaction(message, order);
}
/
* 事務(wù)監(jiān)聽器:執(zhí)行本地事務(wù) + 事務(wù)回查
/
@Resource
private TransactionListener orderTransactionListener;
// 初始化事務(wù)生產(chǎn)者時綁定監(jiān)聽器(Spring Bean初始化方法)
@PostConstruct
public void initProducer() {
transactionMQProducer.setTransactionListener(orderTransactionListener);
}
/
* 本地事務(wù)邏輯(由監(jiān)聽器調(diào)用,確保訂單與消息表同成功/同失?。?/
@Transactional(rollbackFor = Exception.class)
public LocalTransactionState executeLocalTransaction(TOrder order, Message message) {
try {
// 步驟1:保存訂單到訂單表
orderMapper.insert(order);
// 步驟2:記錄本地消息表(對賬狀態(tài)初始為PENDING,下次對賬時間5分鐘后)
Date nextReconcileTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
MessageLog log = MessageLog.builder()
.orderId(order.getOrderId())
.rocketmqMsgId(message.getMsgId())
.messageContent(new String(message.getBody()))
.businessType("ORDER_CREATE")
.msgStatus("INIT")
.reconcileStatus("PENDING")
.retryCount(0)
.nextReconcileTime(nextReconcileTime)
.build();
messageLogMapper.insert(log);
// 步驟3:提交本地事務(wù),返回COMMIT(通知RocketMQ投遞消息)
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事務(wù)失敗,回滾,返回ROLLBACK(通知RocketMQ丟棄消息)
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/*
* 事務(wù)回查邏輯(Broker未收到Commit/Rollback時觸發(fā))
/
public LocalTransactionState checkLocalTransaction(String orderId) {
// 查本地消息表,按訂單ID判斷本地事務(wù)狀態(tài)
MessageLog log = messageLogMapper.selectByOrderId(orderId);
if (log == null) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 本地?zé)o記錄,回滾
}
// 本地消息已記錄,說明本地事務(wù)成功,返回COMMIT
if ("INIT".equals(log.getMsgStatus()) || "PENDING".equals(log.getReconcileStatus())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 工具方法:生成唯一訂單號
private String generateOrderId() {
return "ORDER_" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}
}3.2 Consumer 端(庫存服務(wù)):消息消費與冪等控制
下游服務(wù)消費消息時,需通過 “訂單 ID” 實現(xiàn)冪等(避免重復(fù)扣庫存),同時記錄消費狀態(tài),為后續(xù)對賬提供查詢依據(jù);消費失敗時返回RECONSUME_LATER,觸發(fā) RocketMQ 重試,重試耗盡后進(jìn)入死信隊列。
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Component
// 訂閱訂單主題,僅消費“扣庫存”標(biāo)簽的消息
@RocketMQMessageListener(topic = "OrderTopic", selectorExpression = "INVENTORY_DEDUCT", consumerGroup = "inventory_consumer_group")
public class InventoryConsumer implements RocketMQListener<MessageExt> {
@Resource
private InventoryMapper inventoryMapper;
@Resource
private InventoryDeductLogMapper deductLogMapper;
@Resource
private MessageLogFeignClient messageLogFeignClient; // 調(diào)用訂單服務(wù)的消息表接口
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(MessageExt messageExt) {
// 1. 解析消息(獲取訂單ID、商品ID、扣減數(shù)量)
String msgContent = new String(messageExt.getBody());
OrderCreateDTO dto = JSON.parseObject(msgContent, OrderCreateDTO.class);
String orderId = dto.getOrderId();
Long skuId = dto.getSkuId();
Integer quantity = dto.getQuantity();
// 2. 冪等控制:查詢是否已扣減(按訂單ID)
InventoryDeductLog existLog = deductLogMapper.selectByOrderId(orderId);
if (existLog != null && "SUCCESS".equals(existLog.getDeductStatus())) {
// 已成功扣減,直接返回成功
messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED"); // 通知訂單服務(wù)更新消息狀態(tài)
return;
}
try {
// 3. 執(zhí)行庫存扣減(先查庫存是否充足)
Inventory inventory = inventoryMapper.selectBySkuId(skuId);
if (inventory == null || inventory.getStock() < quantity) {
// 庫存不足,記錄失敗狀態(tài),返回失敗(不重試,避免無效循環(huán))
deductLogMapper.insert(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("FAIL")
.build());
messageLogFeignClient.updateMsgStatus(orderId, "FAIL"); // 通知訂單服務(wù)更新消息狀態(tài)
throw new RuntimeException("庫存不足,扣減失敗");
}
// 4. 扣減庫存并記錄日志
inventory.setStock(inventory.getStock() - quantity);
inventoryMapper.updateById(inventory);
deductLogMapper.insert(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("SUCCESS")
.build());
// 5. 通知訂單服務(wù)更新消息狀態(tài)為“已消費”
messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED");
// 返回消費成功
} catch (Exception e) {
// 消費失敗,記錄“處理中”狀態(tài),返回重試
deductLogMapper.insertOrUpdate(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("PROCESSING")
.build());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
/
* 對外提供庫存扣減狀態(tài)查詢接口(供XXL-Job對賬調(diào)用)
*/
public String queryDeductStatus(String orderId) {
InventoryDeductLog log = deductLogMapper.selectByOrderId(orderId);
if (log == null) {
return "NOT_PROCESSED"; // 未處理
}
return log.getDeductStatus(); // SUCCESS/FAIL/PROCESSING
}
/
* 對外提供庫存扣減重試接口(供XXL-Job對賬修復(fù)調(diào)用)
*/
public boolean retryDeduct(String orderId) {
// 邏輯同onMessage,僅針對“PROCESSING/FAIL”狀態(tài)的記錄重試,此處省略
return true;
}
}4. 基礎(chǔ)流程說明
4.1 正常流程
1、用戶發(fā)起下單請求,訂單服務(wù)調(diào)用 createOrder 方法,發(fā)送事務(wù)消息。
2、RocketMQ 收到 “半消息” 后,觸發(fā)訂單服務(wù)的本地事務(wù)(executeLocalTransaction): - 本地事務(wù)成功:保存訂單、記錄本地消息表(msg_status=INIT,reconcile_status=PENDING),返回 COMMIT_MESSAGE。 - RocketMQ 確認(rèn)后,將消息投遞至庫存服務(wù)、通知服務(wù)。
3、庫存服務(wù) / 通知服務(wù)消費消息,執(zhí)行業(yè)務(wù)邏輯(扣庫存 / 發(fā)通知),消費成功后通知訂單服務(wù)更新消息狀態(tài)為 CONSUMED。
4、5 分鐘后,XXL-Job 對賬任務(wù)觸發(fā),核查訂單服務(wù)與下游服務(wù)狀態(tài)一致,更新對賬狀態(tài)為 SUCCESS,流程閉環(huán)。
4.2 基礎(chǔ)異常流程(無對賬時)
- 本地事務(wù)失敗:訂單 / 消息表插入失敗,返回
ROLLBACK_MESSAGE,RocketMQ 不投遞消息,無下游影響。 - Broker 超時未收狀態(tài):RocketMQ 觸發(fā)事務(wù)回查(
checkLocalTransaction),按本地消息表狀態(tài)返回COMMIT_MESSAGE,重新投遞消息。 - Consumer 消費失敗:返回
RECONSUME_LATER,RocketMQ 按重試策略重試(默認(rèn) 16 次),重試耗盡后進(jìn)入死信隊列。
5. XXL-Job 定時事務(wù)對賬機(jī)制
5.1 對賬任務(wù)核心目標(biāo)
解決 “基礎(chǔ)流程無法覆蓋的一致性問題”,例如:
- Consumer 消費成功但未通知訂單服務(wù)更新消息狀態(tài);
- RocketMQ 投遞成功但 Consumer 因網(wǎng)絡(luò)問題未接收,重試超時;
- 本地事務(wù)成功、消息投遞成功,但 Consumer 業(yè)務(wù)執(zhí)行一半(如庫存扣減成功但日志未記錄)。
5.2 對賬任務(wù)配置
- 執(zhí)行頻率:每 5 分鐘執(zhí)行一次(與消息表
next_reconcile_time匹配,避免高頻占用資源)。 - 分片策略:按
order_id尾號分片(如 10 個分片,尾號 0-9),多執(zhí)行器并行對賬,支撐百萬級訂單對賬效率。 - 超時控制:單個分片任務(wù)超時時間設(shè)為 30 秒,超時標(biāo)記為 “待重試”,下次對賬重新處理。
- 任務(wù)依賴:依賴訂單服務(wù)、庫存服務(wù)、通知服務(wù)的 “狀態(tài)查詢接口” 與 “重試執(zhí)行接口”。
5.3 核心對賬邏輯(XXL-Job 任務(wù)代碼)
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class OrderReconcileJob {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private OrderService orderService;
@Resource
private InventoryFeignClient inventoryFeignClient; // 調(diào)用庫存服務(wù)接口
@Resource
private NoticeFeignClient noticeFeignClient; // 調(diào)用通知服務(wù)接口
@Resource
private ReconcileWorkOrderMapper workOrderMapper; // 對賬工單表
/*
* XXL-Job 對賬任務(wù)(分片執(zhí)行)
/
@XxlJob("orderReconcileJob")
public void execute() throws Exception {
// 1. 獲取分片參數(shù)(當(dāng)前分片號、總分片數(shù))
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
// 2. 篩選待對賬數(shù)據(jù):
// - 對賬狀態(tài)為PENDING/RETRY
// - 下次對賬時間 <= 當(dāng)前時間
// - 重試次數(shù) < 5次
// - 按order_id尾號分片查詢(避免重復(fù))
List<MessageLog> pendingLogs = messageLogMapper.selectPendingReconcile(
shardIndex, shardTotal, 5, new Date()
);
if (pendingLogs.isEmpty()) {
XxlJobHelper.log("當(dāng)前分片無待對賬數(shù)據(jù),分片號:{}", shardIndex);
return;
}
// 3. 遍歷待對賬記錄,逐一對賬
for (MessageLog log : pendingLogs) {
String orderId = log.getOrderId();
try {
XxlJobHelper.log("開始對賬訂單:{}", orderId);
// 步驟1:查詢各服務(wù)狀態(tài)
// - 訂單服務(wù)狀態(tài):是否已創(chuàng)建(CREATED)
String orderStatus = orderService.queryOrderStatus(orderId);
// - 庫存服務(wù)狀態(tài):是否已扣減(SUCCESS/FAIL/PROCESSING)
String inventoryStatus = inventoryFeignClient.queryDeductStatus(orderId);
// - 通知服務(wù)狀態(tài):是否已發(fā)送(SUCCESS/FAIL/PROCESSING)
String noticeStatus = noticeFeignClient.queryNoticeStatus(orderId);
// 步驟2:狀態(tài)一致性判斷與處理
// 場景1:訂單已創(chuàng)建,庫存/通知均成功 → 對賬一致
if ("CREATED".equals(orderStatus)
&& "SUCCESS".equals(inventoryStatus)
&& "SUCCESS".equals(noticeStatus)) {
messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
XxlJobHelper.log("訂單{}對賬一致,狀態(tài)更新為SUCCESS", orderId);
continue;
}
// 場景2:訂單已創(chuàng)建,庫存/通知存在PROCESSING → 待重試(下次對賬再查)
if ("CREATED".equals(orderStatus)
&& ("PROCESSING".equals(inventoryStatus) || "PROCESSING".equals(noticeStatus))) {
// 更新下次對賬時間(10分鐘后)和重試次數(shù)
Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
XxlJobHelper.log("訂單{}存在處理中狀態(tài),下次對賬時間:{}", orderId, nextTime);
continue;
}
// 場景3:訂單已創(chuàng)建,庫存/通知存在FAIL → 觸發(fā)自動重試
if ("CREATED".equals(orderStatus)
&& ("FAIL".equals(inventoryStatus) || "FAIL".equals(noticeStatus))) {
// 重試次數(shù)未超5次,調(diào)用下游重試接口
if (log.getRetryCount() < 5) {
boolean inventoryRetry = "FAIL".equals(inventoryStatus)
? inventoryFeignClient.retryDeduct(orderId) : true;
boolean noticeRetry = "FAIL".equals(noticeStatus)
? noticeFeignClient.retrySend(orderId) : true;
if (inventoryRetry && noticeRetry) {
// 重試成功,更新下次對賬時間(5分鐘后)
Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
XxlJobHelper.log("訂單{}重試下游服務(wù)成功,下次對賬時間:{}", orderId, nextTime);
} else {
// 重試失敗,更新重試次數(shù),下次繼續(xù)
Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(30));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
XxlJobHelper.log("訂單{}重試下游服務(wù)失敗,下次對賬時間:{}", orderId, nextTime);
}
continue;
}
// 重試次數(shù)超5次,生成人工工單
workOrderMapper.insert(ReconcileWorkOrder.builder()
.orderId(orderId)
.workOrderNo("RECONCILE_" + System.currentTimeMillis())
.faultDesc("訂單" + orderId + ":庫存狀態(tài)=" + inventoryStatus + ",通知狀態(tài)=" + noticeStatus + ",重試5次失敗")
.workOrderStatus("PENDING")
.createTime(new Date())
.build());
// 更新對賬狀態(tài)為FAIL
messageLogMapper.updateReconcileStatus(orderId, "FAIL");
XxlJobHelper.log("訂單{}對賬失敗,生成人工工單", orderId);
continue;
}
// 場景4:訂單狀態(tài)異常(如CANCELED)→ 對賬一致(無需下游處理)
if ("CANCELED".equals(orderStatus)) {
messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
XxlJobHelper.log("訂單{}已取消,對賬狀態(tài)更新為SUCCESS", orderId);
continue;
}
} catch (Exception e) {
// 對賬過程異常(如接口超時),標(biāo)記為RETRY,下次再查
Date nextTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, nextTime, "RETRY");
XxlJobHelper.log("訂單{}對賬異常,原因:{},下次對賬時間:{}", orderId, e.getMessage(), nextTime);
}
}
XxlJobHelper.handleSuccess("當(dāng)前分片對賬完成,分片號:{},處理記錄數(shù):{}", shardIndex, pendingLogs.size());
}
}5.4 對賬關(guān)鍵保障
1、冪等性:對賬任務(wù)按 order_id 與 reconcile_status 篩選,僅處理 “待對賬 / 待重試” 記錄,避免重復(fù)對賬。
2、重試策略:采用 “指數(shù)退避” 重試(5 分鐘→10 分鐘→30 分鐘),減少無效重試對服務(wù)的壓力。
3、人工兜底:重試 5 次仍失敗后生成工單,由運維 / 業(yè)務(wù)人員介入(如手動補(bǔ)扣庫存、重發(fā)通知),確保無數(shù)據(jù)遺漏。
4、數(shù)據(jù)清理:對賬狀態(tài)為 SUCCESS 且超過 30 天的記錄,定期歸檔至歷史表(如 message_log_hist),避免主表數(shù)據(jù)量過大影響查詢效率。
6. 完整異常場景處理
異常場景 | 現(xiàn)象 | 處理機(jī)制 |
本地事務(wù)失敗 | 訂單 / 消息表未插入,返回 ROLLBACK | RocketMQ 不投遞消息,無下游影響 |
Broker 回查 | 未收到 Commit/Rollback,觸發(fā)回查 | 查本地消息表,存在則返回 Commit,重新投遞 |
Consumer 消費超時 | 消息未被消費,RocketMQ 重試 | 重試 16 次后進(jìn)入死信隊列,對賬任務(wù)發(fā)現(xiàn)后重試 |
消費成功未更狀態(tài) | Consumer 成功但未通知訂單服務(wù),消息表仍為 INIT | 對賬任務(wù)查下游狀態(tài)為 SUCCESS,更新消息表狀態(tài)為 CONSUMED |
下游業(yè)務(wù)失?。◣齑娌蛔悖?/span> | 庫存扣減失敗,狀態(tài)為 FAIL | 對賬任務(wù)重試 5 次后生成人工工單,手動處理(如補(bǔ)充庫存) |
經(jīng)典ebay本地消息表 事務(wù)表數(shù)據(jù)爆炸 問題
經(jīng)典ebay本地消息表 事務(wù)表數(shù)據(jù)爆炸, 定時任務(wù)掃表會很慢,存在巨大的延遲問題
解決的方案如下:
1、索引優(yōu)化:在消息表中對狀態(tài)字段增加索引,以加速掃表操作。索引可以加速消息的檢索和篩選,從而提高操作效率。
2、分頁查詢:將掃表操作劃分為多次分頁查詢,避免一次性查詢大量數(shù)據(jù)造成的性能問題。
3、多線程 + 分段查詢:
- 如果有業(yè)務(wù)標(biāo)識,可以通過業(yè)務(wù)標(biāo)識進(jìn)行多線程分段掃表查詢。
- 如果沒有業(yè)務(wù)標(biāo)識可以按區(qū)間查詢比如線程1查詢0-1000的數(shù)據(jù),線程2查詢1001-2000的數(shù)據(jù)。
4、表較大時進(jìn)行分庫分表:如果表較大可以進(jìn)行分庫分表操作。
10Wqps 本地消息表事務(wù)架構(gòu)方案大總結(jié)
最終,通過引入一個中間的Rocketmq承擔(dān)本地消息表的職責(zé),除了解決事務(wù)的一致性外,同樣可以解決消息的丟失與冪等性問題,一舉多得。
而且從業(yè)務(wù)的健壯性與數(shù)據(jù)一致性來看,一般都會增加一個補(bǔ)償機(jī)制, 實現(xiàn)數(shù)據(jù)的 最終一致性。這也是BASE理論所支持的。
如何設(shè)計 10Wqps高并發(fā)分布式事務(wù)? 如果能講 到尼恩答案 的 水平 , 面試官一定口水直流, 大廠 offer 就到手啦。

































