深度解析:基于 RocketMQ 實(shí)現(xiàn)分布式事務(wù)的技術(shù)實(shí)踐與原理探究
在上一篇文章Spring Boot自動(dòng)裝配原理以及實(shí)踐我們完成了服務(wù)通用日志監(jiān)控組件的開發(fā),確保每個(gè)服務(wù)都可以基于一個(gè)注解實(shí)現(xiàn)業(yè)務(wù)功能的監(jiān)控。 而本文我們嘗試基于RocketMQ實(shí)現(xiàn)下單的分布式的事務(wù)??赡軙?huì)有讀者會(huì)有疑問,之前我們不是基于Seata完成了分布式事務(wù),為什么我們還要用到RocketMQ呢?
我們的再來回顧一下我們下單功能大抵是做以下三件事情:
- 創(chuàng)建訂單,將訂單記錄存到數(shù)據(jù)庫中。
- 扣款,記錄用戶扣款后錢包所剩下的額度。
- 扣除商品庫存,并發(fā)放商品。

我們將該場景放到高并發(fā)場景下,這個(gè)功能勢必要考慮性能和可靠性問題,所以我們在業(yè)務(wù)需求清楚明了的情況下,就希望能有一種方式確保下單功能在高并發(fā)場景保證性能、可靠性。 而Seata的AT模式確實(shí)可以保證最終一致性,但是seata的AT模式本質(zhì)上是依賴于global_table、branch_table等數(shù)據(jù)表維護(hù)應(yīng)用層分布式事務(wù),在操作期間會(huì)涉及大量的更新和刪除操作,隨著時(shí)間的推移還是會(huì)出現(xiàn)大量的索引碎片,導(dǎo)致索引性能下降。
所以我們就考慮采用RocketMQ實(shí)現(xiàn)分布式事務(wù),盡管RocketMQ對于分布式事務(wù)的實(shí)現(xiàn)業(yè)務(wù)侵入性相對強(qiáng)一些,但它可以保證業(yè)務(wù)層面的功能解耦從而提升并發(fā)性能,且RocketMQ還對消息消費(fèi)可靠性做了許多不錯(cuò)的優(yōu)化,例如:失敗重試、死信隊(duì)列等,所以我們還是嘗試使用RocketMQ來改良我們的下單分布式事務(wù)問題。
一、詳解RocketMQ落地分布式事務(wù)案例
1. 需求說明
用戶下單大抵需要在三個(gè)服務(wù)中完成:
- 訂單服務(wù)完成訂單創(chuàng)建,基于用戶傳入的產(chǎn)品編碼、用戶編碼、產(chǎn)品購買數(shù)生成訂單信息,對應(yīng)的調(diào)用參數(shù)如下:
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}- 基于入?yún)⒌挠脩舸a定位到用戶錢包金額,完成賬戶扣款。
- 基于產(chǎn)品和購買數(shù)完成庫存扣減。
這其中會(huì)跨域三個(gè)服務(wù),分別是訂單服務(wù)創(chuàng)建訂單、賬戶服務(wù)扣款、商品服務(wù)扣減庫存。

2. 落地思路
以我們業(yè)務(wù)為最終目標(biāo),RocketMQ實(shí)現(xiàn)分布式事務(wù)的原理是基于2PC的,流程大抵如下:
- 訂單服務(wù)發(fā)送一個(gè)事務(wù)消息到消息隊(duì)列,消息內(nèi)容就是我們的訂單信息,這里面包含用戶賬號、購買的產(chǎn)品代碼、購買產(chǎn)品數(shù)量等數(shù)據(jù)。
- MQ收到half消息,并回復(fù)ack確認(rèn)。
- 生產(chǎn)者(訂單服務(wù)order-service)得知我們發(fā)送的消息已被收到,訂單服務(wù)則執(zhí)行本地事務(wù)并提交事務(wù),即將訂單信息寫入數(shù)據(jù)庫中,同時(shí)在該事務(wù)內(nèi)將訂單插入結(jié)果寫入transaction_log表中。
- 生產(chǎn)者(訂單服務(wù)order-service)完成本地事務(wù)的提交,告知MQ將事務(wù)消息commit,此時(shí)消費(fèi)者就可以消費(fèi)這條消息了,注意若生產(chǎn)者消費(fèi)失敗,則將消息rollback,一切就當(dāng)沒有發(fā)生過。
- 如果上述的消息是commit則將消息持久化到commitLog中,以便后續(xù)MQ宕機(jī)或者服務(wù)宕機(jī)后依然可以繼續(xù)消費(fèi)這條沒有被消費(fèi)的消息。
- (非必要步驟)若MQ長時(shí)間沒有收到生產(chǎn)者的commit或者rollback的信號,則攜帶事務(wù)id找生產(chǎn)者查詢transaction_log索要當(dāng)前消息狀態(tài),如果看到對應(yīng)的消息則判定生產(chǎn)者事務(wù)成功將消息commit給消費(fèi)者消費(fèi),若沒看到則說明生產(chǎn)者本地事務(wù)執(zhí)行失敗,回滾該消息。
- 消費(fèi)者即我們的用戶服務(wù)或者庫存服務(wù)收到消息則執(zhí)行本地事務(wù)并提交,若失敗則會(huì)不斷重試,直到達(dá)到上限則將消息存到死信隊(duì)列并告警。
- 人工介入查看死信隊(duì)列查看失敗消息手工補(bǔ)償數(shù)據(jù)。

二、實(shí)踐-基于RocketMQ實(shí)現(xiàn)分布式事務(wù)
1. 部署RocketMQ(Linux環(huán)境)
在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個(gè)版本:https://rocketmq.apache.org/download/。
完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:
export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時(shí)啟動(dòng)時(shí)腳本會(huì)拋出runserver.sh: 70: [[: Exec format error錯(cuò)誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報(bào)錯(cuò)找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:

以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:
choose_gc_options()
{
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
}完成后鍵入./mqnamesrv &將MQ啟動(dòng),如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動(dòng)成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動(dòng)broker,需要注意的是默認(rèn)情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時(shí)建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動(dòng)成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:98762. 服務(wù)引入MQ完成下單功能開發(fā)
(1) 服務(wù)引入RocketMQ依賴
完成RocketMQ部署之后,我們就可以著手編碼工作了,首先我們要在在三個(gè)服務(wù)中引入RocketMQ的依賴,由于筆者的spring-boot版本比較老,所以這里筆者為了統(tǒng)一管理在父pom中指定了mq較新的版本號:
<!--rocketmq-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>然后我們分別對order、account、product三個(gè)服務(wù)中引入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>(2) 注冊中心配置RocketMQ信息
由于我們的分布式事務(wù)涉及3個(gè)服務(wù),而且mq的消費(fèi)模式采用的是發(fā)布訂閱模式,所以我們的生產(chǎn)者(order-service)和消費(fèi)者(account-serivce)都配置為cloud-group
rocketmq.name-server=172.29.193.12:9876
# 指定消費(fèi)者組
rocketmq.producer.group=cloud-group之所以沒有沒將消費(fèi)者2(product-service)也配置到cloud-group中的原因也很簡單,同一個(gè)消息只能被同一個(gè)消費(fèi)者組中的一個(gè)成員消費(fèi),假如我們的將product-service配置到同一個(gè)消費(fèi)者組中就會(huì)出現(xiàn)因一條消息只能被一個(gè)服務(wù)消費(fèi)而導(dǎo)致product-service收不到消息。

對此我們實(shí)現(xiàn)思路有兩種:
- 將服務(wù)都放到同一個(gè)消費(fèi)者組,消費(fèi)模式改為廣播模式。
- 將product-service設(shè)置到別的消費(fèi)者組中。
考慮后續(xù)擴(kuò)展筆者選擇方案2,將產(chǎn)品服務(wù)的訂閱者放到消費(fèi)者組2中:
rocketmq.name-server=172.29.193.12:9876
rocketmq.producer.group=cloud-group2(3) 創(chuàng)建消息日志表
我們在上文進(jìn)行需求梳理時(shí)有提到一個(gè)MQServer沒收到生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài)進(jìn)行回查的操作,所以我們在生產(chǎn)者在執(zhí)行本地事務(wù)時(shí),需要?jiǎng)?chuàng)建一張表記錄生產(chǎn)者本地事務(wù)執(zhí)行狀態(tài),建表SQL如下:
DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(50) DEFAULT NULL,
`log` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;(4) 完成order服務(wù)half消息發(fā)送、監(jiān)聽、回查回調(diào)邏輯
我們的訂單服務(wù)需要做以下三件事:
- 發(fā)送half消息給MQ。
- half消息發(fā)送成功執(zhí)行本地事務(wù)并記錄日志。
- 告知MQ可以提交事務(wù)消息。
所以我們需要定義一下消息格式,對象類中必須包含訂單號、產(chǎn)品編碼、用戶編碼、購買產(chǎn)品數(shù)量等信息。
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {
private static final long serialVersionUID = 1L;
//設(shè)置主鍵自增,避免插入時(shí)沒必要的報(bào)錯(cuò)
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
/**
* 訂單號
*/
private String orderNo;
/**
* 用戶編碼
*/
private String accountCode;
/**
* 產(chǎn)品編碼
*/
private String productCode;
/**
* 產(chǎn)品扣減數(shù)量
*/
private Integer count;
/**
* 余額
*/
private BigDecimal amount;
/**
* 本次扣減金額
*/
private BigDecimal price;
}然后我們就可以編寫控制層的代碼了,通過獲取前端傳輸?shù)膮?shù)調(diào)用orderService完成half消息發(fā)送。
@PostMapping("/order/createOrderByMQ")
public ResultData<String> createOrderByMQ(@RequestBody OrderDto orderDTO) {
log.info("基于mq完成用戶下單流程,請求參數(shù): " + JSON.toJSONString(orderDTO));
orderService.createOrderByMQ(orderDTO);
return ResultData.success("基于mq完成用戶下單完成");
}orderService的實(shí)現(xiàn)邏輯很簡單,定義好消息設(shè)置消息頭內(nèi)容和消息載體的對象,通過sendMessageInTransaction方法完成半消息發(fā)送,需要了解一下消息的主題(topic)為ORDER_MSG_TOPIC,只有訂閱這個(gè)主題的消費(fèi)者才能消費(fèi)這條消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void createOrderByMQ(OrderDto orderDto) {
//創(chuàng)建half消息對應(yīng)的事務(wù)日志的id
String transactionId = UUID.randomUUID().toString();
//調(diào)用產(chǎn)品服務(wù)獲取商品詳情
ResultData<ProductDTO> productInfo = productFeign.getByCode(orderDto.getProductCode());
//計(jì)算總售價(jià)
BigDecimal amount = productInfo.getData().getPrice().multiply(new BigDecimal(orderDto.getCount()));
orderDto.setAmount(amount);
//將訂單信息作為載體
Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
//下單用戶編碼
.setHeader("accountCode", orderDto.getAccountCode())
//產(chǎn)品編碼
.setHeader("productCode", orderDto.getProductCode())
//產(chǎn)品購買數(shù)
.setHeader("count", orderDto.getCount())
//下單金額
.setHeader("amount", amount)
.build();
//發(fā)送half消息
rocketMQTemplate.sendMessageInTransaction("ORDER_MSG_TOPIC", message, orderDto);
}完成half消息發(fā)送之后,我們就必須知曉消息發(fā)送結(jié)果才能確定是否執(zhí)行本地事務(wù)并提交,所以我們的訂單服務(wù)必須創(chuàng)建一個(gè)監(jiān)聽器了解half消息的發(fā)送情況,executeLocalTransaction方法就是mq成功收到半消息后的回調(diào)函數(shù),一旦我們得知消息成功發(fā)送之后,MQ就會(huì)執(zhí)行這個(gè)方法,筆者通過這個(gè)方法獲取消息頭的參數(shù)創(chuàng)建訂單對象,調(diào)用createOrderWithRocketMqLog完成訂單的創(chuàng)建的本地事務(wù)成功的日志記錄。
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
private final IOrderService orderService;
private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 監(jiān)聽到發(fā)送half消息,執(zhí)行本地事務(wù)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("order執(zhí)行本地事務(wù)");
try {
//解析消息頭
MessageHeaders headers = message.getHeaders();
//獲取購買金額
BigDecimal amount = new BigDecimal(String.valueOf(headers.get("amount")));
//獲取訂單信息
Order order = Order.builder()
.accountCode((String) headers.get("accountCode"))
.amount(amount)
.productCode((String) headers.get("productCode"))
.count(Integer.valueOf(String.valueOf(headers.get("count"))))
.build();
//獲取事務(wù)id
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
//執(zhí)行本地事務(wù)和記錄事務(wù)日志
orderService.createOrderWithRocketMqLog(order, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("創(chuàng)建訂單失敗,失敗原因: {}", e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事務(wù)的檢查,檢查本地事務(wù)是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//獲取事務(wù)ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("檢查本地事務(wù),事務(wù)ID:{}", transactionId);
//根據(jù)事務(wù)id從日志表檢索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id", transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
//如果消息表存在,則說明生產(chǎn)者事務(wù)執(zhí)行完成,回復(fù)commit
if (null != rocketmqTransactionLog) {
return RocketMQLocalTransactionState.COMMIT;
}
//回復(fù)rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}createOrderWithRocketMqLog做了兩件事,分別是插入訂單信息和創(chuàng)建消息日志,這里筆者用到了事務(wù)注解確保了兩個(gè)操作的原子性。 這樣一來,MQserver后續(xù)的回查邏輯完全可以基于RocketmqTransactionLog 進(jìn)行判斷,如果消息的事務(wù)id在表中存在,則說明生產(chǎn)者本地事務(wù)成功,反之就是失敗。
@Transactional(rollbackFor = Exception.class)
@Override
public void createOrderWithRocketMqLog(Order order, String transactionId) {
//創(chuàng)建訂單編號
order.setOrderNo(UUID.randomUUID().toString());
//插入訂單信息
orderMapper.insert(order);
//事務(wù)日志
RocketmqTransactionLog log = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("執(zhí)行創(chuàng)建訂單操作")
.build();
rocketmqTransactionLogMapper.insert(log);
}補(bǔ)充一下基于MP生成的RocketmqTransactionLog 類代碼:
@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog對象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
private String transactionId;
private String log;
}(5) 完成account、product監(jiān)聽事件
然后我們就可以實(shí)現(xiàn)用戶服務(wù)和商品服務(wù)的監(jiān)聽事件了,一旦生產(chǎn)者提交事務(wù)消息之后,這幾個(gè)消費(fèi)者都會(huì)收到這個(gè)topic(主題)的消息,進(jìn)而完成當(dāng)前服務(wù)的業(yè)務(wù)邏輯。
先來看看實(shí)現(xiàn)扣款的用戶服務(wù),我們的監(jiān)聽器繼承了RocketMQListener,基于@RocketMQMessageListener注解設(shè)置它訂閱的主題為createByRocketMQ,一旦收到這個(gè)主題的消息時(shí)這個(gè)監(jiān)聽器就會(huì)執(zhí)行onMessage方法,我們的邏輯很簡單,就是獲取消息的內(nèi)容完成扣款,唯一需要注意的就是線程安全問題。我們的壓測的情況下,單用戶可能會(huì)頻繁創(chuàng)建訂單,在并發(fā)期間同一個(gè)用戶的扣款消息可能同時(shí)到達(dá)扣款服務(wù)中,這就導(dǎo)致單位時(shí)間內(nèi)扣款服務(wù)從數(shù)據(jù)庫中查詢到相同的余額,執(zhí)行相同的扣款邏輯,導(dǎo)致金額少扣了。

所以我們必須保證扣款操作互斥和原子化,考慮到筆者當(dāng)前項(xiàng)目環(huán)境是單體,所以就用簡單的synchronized 關(guān)鍵字解決問題。
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {
@Resource
private AccountMapper accountMapper;
//強(qiáng)制轉(zhuǎn)為runTimeException
@SneakyThrows
@Override
public void onMessage(OrderDto orderDto) {
log.info("賬戶服務(wù)收到消息,開始消費(fèi)");
QueryWrapper<Account> query = new QueryWrapper<>();
query.eq("account_code", orderDto.getAccountCode());
//解決單體服務(wù)下線程安全問題
synchronized (this){
Account account = accountMapper.selectOne(query);
BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
if (subtract.compareTo(BigDecimal.ZERO)<0){
throw new Exception("用戶余額不足");
}
account.setAmount(subtract);
log.info("更新賬戶服務(wù),請求參數(shù):{}", JSON.toJSONString(account));
accountMapper.updateById(account);
}
}
}然后就說商品服務(wù),邏輯也很簡單,也同樣要注意一下線程安全問題:
@Slf4j
@Service
@RocketMQMessageListener(topic = "ORDER_MSG_TOPIC", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
@Resource
private ProductMapper productMapper;
@Override
public void onMessage(OrderDto orderDto) {
log.info(" 產(chǎn)品服務(wù)收到消息,開始消費(fèi)");
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("product_code",orderDto.getProductCode());
synchronized (this){
Product product = productMapper.selectOne(queryWrapper);
if (product.getCount()<orderDto.getCount()){
throw new RuntimeException("庫存不足");
}
product.setCount(product.getCount()-orderDto.getCount());
log.info("更新產(chǎn)品庫存信息,請求參數(shù):{}", JSON.toJSONString(product));
productMapper.updateById(product);
}
}
}三、基于幾個(gè)測試用例驗(yàn)證MQ半消息事務(wù)
1. 前置準(zhǔn)備與說明
完整編碼工作后,自測是非常有必要的,我們?nèi)粘M瓿砷_發(fā)任務(wù)后,都會(huì)結(jié)合需求場景以及功能編排一些自測用例查看最終結(jié)果是否與預(yù)期一致。 需要注意的是由于訂單業(yè)務(wù)邏輯較為復(fù)雜,很多業(yè)務(wù)場景一篇博客是不可能全部覆蓋,所以這里我們就測試一下基于RocketMQ實(shí)現(xiàn)分布式事務(wù)常見的幾個(gè)問題場景是否和預(yù)期一致。
在測試前我們必須做好前置準(zhǔn)備工作,準(zhǔn)備功能測試時(shí)涉及到的SQL語句,以本次用戶購買產(chǎn)品的業(yè)務(wù)為例,涉及到訂單表、用戶賬戶信息表、產(chǎn)品表、以及生產(chǎn)者本地事務(wù)日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;在每次測試完成之后,我們希望數(shù)據(jù)能夠還原,所以這里也需要準(zhǔn)備一下每次測試結(jié)束后的更新語句,由于訂單表和消息日志表都是主鍵自增,考慮到這兩張表只涉及插入,所以筆者為了重置主鍵的值采取的是truncate語句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;2. 測試正常消費(fèi)
第一個(gè)用例是查看所有服務(wù)都正常的情況下,訂單表是否有數(shù)據(jù),用戶表的用戶是否會(huì)正??劭?,以及商品表庫存是否會(huì)扣減。
測試前,我們先查看訂單表,確認(rèn)沒有數(shù)據(jù)

查看我們的測試用戶,錢包額度為10000

再查看庫存表,可以看到數(shù)量為1000

確認(rèn)完數(shù)據(jù)之后,我們就可以測試服務(wù)是否按照預(yù)期的方式執(zhí)行,將所有服務(wù)啟動(dòng)。

我們通過網(wǎng)關(guān)發(fā)起調(diào)用,請求地址如下:
http://localhost:8090/order/order/createOrderByMQ請求參數(shù)如下,從參數(shù)可以看出這個(gè)請求意為用戶代碼(accountCode)為demoData這個(gè)用戶希望購買1個(gè)(count)產(chǎn)品代碼(productCode)為P001的產(chǎn)品,該產(chǎn)品當(dāng)前售價(jià)(price)為1元。
{
"accountCode": "0932897",
"productCode": "P003",
"count": 1
}調(diào)用完成后,查看訂單表,訂單數(shù)據(jù)生成無誤:
圖片
查看用戶服務(wù)是否完成用戶扣款,扣款無誤:

查看產(chǎn)品表,可以看到產(chǎn)品數(shù)量也準(zhǔn)確扣減:

3. 測試生產(chǎn)者commit提交失敗
我們希望測試一下發(fā)送完half消息之后,執(zhí)行本地事務(wù)完成,但是未提交commit請求時(shí),MQServer是否會(huì)調(diào)用回查邏輯。
為了完成這一點(diǎn)我們必須按照以下兩個(gè)步驟執(zhí)行:
- 在訂單服務(wù)提交事務(wù)消息處打個(gè)斷點(diǎn)。

- 發(fā)起請求,當(dāng)代碼執(zhí)行到這里的時(shí)候通過jps定位到進(jìn)程號,將其強(qiáng)制殺死。如下所示,我們的代碼執(zhí)行到了提交事務(wù)消息這一步:

我們通過jps定位并將其殺死::

完成這些步驟后,我們再次將服務(wù)啟動(dòng),等待片刻之后可以發(fā)現(xiàn),MQServer會(huì)調(diào)用checkLocalTransaction回查生產(chǎn)者本地事務(wù)的情況。我們放行這塊代碼讓程序執(zhí)行下去,最后再查看數(shù)據(jù)庫中的數(shù)據(jù)結(jié)果是否符合預(yù)期。

4. 測試消費(fèi)者消費(fèi)失敗
測試消費(fèi)者執(zhí)行報(bào)錯(cuò)后是否會(huì)進(jìn)行重試,這一點(diǎn)就比較好測試了,我們在消費(fèi)者監(jiān)聽器中插入隨便插入一個(gè)報(bào)錯(cuò)查看其是否會(huì)不斷重試。這里筆者就不多做演示,實(shí)驗(yàn)結(jié)果是會(huì)進(jìn)行不斷重試,當(dāng)重試次數(shù)達(dá)到閾值時(shí)會(huì)將結(jié)果存到死信隊(duì)列中。

四、壓測MQ和Seata的性能
由于MQ是采用異步消費(fèi)的形式解耦了服務(wù)間的業(yè)務(wù),而我們的Seata采用默認(rèn)的AT模式每次執(zhí)行分布式事務(wù)時(shí)都會(huì)需要借助undo-log、全局鎖等的方式保證最終一致性。所以理論上RocketMQ的性能肯定是高于Seata的,對此我們不妨使用Jmeter進(jìn)行壓測來驗(yàn)證一下。
本次壓測只用了1000個(gè)并發(fā),MQ和seata的壓測結(jié)果如下,可以看到MQ無論從執(zhí)行時(shí)間還是成功率都遠(yuǎn)遠(yuǎn)優(yōu)秀于Seata的。
MQ的壓測結(jié)果:

Seata的壓測結(jié)果,可以看到大量的數(shù)據(jù)因?yàn)閘ock_table鎖超時(shí)而導(dǎo)致失敗,所以整體性能表現(xiàn)非常差勁:

五、詳解RocketMQ落地分布式事務(wù)常見問題
1. RocketMQ 如何保證事務(wù)的最終一致性
最終一致性是一種允許軟狀態(tài)存在的分布式事務(wù)解決方案,RocketMQ 保證事務(wù)最終一致性的方式主要是依賴生產(chǎn)者本地事務(wù)和消息可靠發(fā)送的原子性來最大努力保證最終一致性,注意這里筆者所強(qiáng)調(diào)的盡最大努力交付。
之所以說是最大努力交付是說RocketMQ是通過保證生產(chǎn)者事務(wù)和消息發(fā)送可靠性的原子性和一致性,由此保證消費(fèi)者一定能夠消費(fèi)到消息,理想情況下,只要消費(fèi)者能夠正確消費(fèi)消息,事務(wù)結(jié)果最終是可以保證一致性的,但是復(fù)雜的系統(tǒng)因素消費(fèi)者可能會(huì)存在消費(fèi)失敗的情況,此時(shí)事務(wù)最終一致性就無法保證,業(yè)界的做法是通過手動(dòng)操作或者腳本等方式完成數(shù)據(jù)補(bǔ)償。

2. 什么是half消息
half消息即半消息,和普通消息的區(qū)別是該消息不會(huì)立馬被消費(fèi)者消費(fèi),原因是half消息的存在是為了保證生產(chǎn)者本地事務(wù)和消費(fèi)者的原子性和一致性,其過程如上文所介紹,初始發(fā)送的half消息是存儲(chǔ)在MQ一個(gè)內(nèi)存隊(duì)列中(并未投遞到topic中),只有生產(chǎn)者本地事務(wù)成功并發(fā)送commit通知后,這個(gè)消息才會(huì)被持久化到commitLog同時(shí)提交到topic隊(duì)列中,此時(shí)消費(fèi)者才能夠消費(fèi)該消息并執(zhí)行本地事務(wù)。
3. 為什么要先發(fā)送half消息再執(zhí)行本地事務(wù)?先執(zhí)行本地事務(wù),成功后在發(fā)送不行嗎?
先發(fā)送half消息的原因是為了盡可能確保生產(chǎn)者和消息隊(duì)列通信正常,只有通信正常了才能確保生產(chǎn)者本地事務(wù)和消息發(fā)送的原子性和一致性,由此保證分布式事務(wù)的可靠性。
先執(zhí)行本地事務(wù),執(zhí)行成功后再發(fā)送存在一個(gè)問題,試想一下,假設(shè)我們本地事務(wù)執(zhí)行成功,但是發(fā)送的消息因?yàn)榫W(wǎng)絡(luò)波動(dòng)等諸多原因?qū)е翸Q沒有收到消息,此時(shí)生產(chǎn)者和消費(fèi)者的分布式事務(wù)就會(huì)出現(xiàn)數(shù)據(jù)不一致問題。

而half消息則不同,它會(huì)優(yōu)先發(fā)送一個(gè)消費(fèi)者感知不到的half消息確認(rèn)通信可達(dá),然后執(zhí)行本地事務(wù)后降消息設(shè)置未commit讓消費(fèi)者消費(fèi),即使說commit消息未收到,因?yàn)閔alf消息的存在,MQ在指定超時(shí)先限制后也可以通過回查的方式到生產(chǎn)者事務(wù)表查詢執(zhí)行情況。
4. 如果mq收到half消息,準(zhǔn)備發(fā)送success信號的消息給生產(chǎn)者,但因?yàn)榫W(wǎng)絡(luò)波動(dòng)導(dǎo)致生產(chǎn)者沒有收到這個(gè)消息要怎么辦?
此時(shí)生產(chǎn)者就會(huì)認(rèn)為half消息發(fā)送失敗,本地事務(wù)不執(zhí)行,隨著時(shí)間推移MQ長時(shí)間沒收到commit或者rollback消息就會(huì)回查生產(chǎn)者消息日志表,明確沒看到數(shù)據(jù)則知曉生產(chǎn)者本地事務(wù)執(zhí)行失敗,直接rollback掉half消息,而消費(fèi)者全程無感知,業(yè)務(wù)上的一致性也是可以保證。

5. MQ沒有收到生產(chǎn)者(訂單服務(wù))的commit或者rollback信號怎么保證事務(wù)最終一致性?
常規(guī)的做法就是建立一張表記錄消息狀態(tài),只要我們訂單信息插入成功就需要日志一下這條數(shù)據(jù),所以我們必須保證訂單數(shù)據(jù)插入和日志插入表中的原子性,確保生產(chǎn)者的事務(wù)和消息日志的ACID:

6. 如果生產(chǎn)者執(zhí)行本地事務(wù)失敗了怎么辦?
這一點(diǎn)前面的部分也已經(jīng)說明,首先將本地會(huì)事務(wù)回滾,并向消息隊(duì)列提交一個(gè)rollback的請求不提交half消息,消息就不會(huì)被消費(fèi)者消費(fèi),保證最終一致性。
7. 前面說的都是事務(wù)流程?這和事務(wù)消息如何保證數(shù)據(jù)最終一致性有什么關(guān)系?
生產(chǎn)者和消息隊(duì)列事務(wù)流程可以確保生產(chǎn)者和消息隊(duì)列發(fā)送的一致性,確保寫操作都是同時(shí)成功或者失敗。只有保證兩者正常通信,才能確保消費(fèi)者可以消費(fèi)MQ中的消息從而完成數(shù)據(jù)最終一致性。
8. 消費(fèi)者提交本地事務(wù)失敗了怎么辦?
我們都知道消息隊(duì)列只能保證消息可靠性,而無法保證分布式事務(wù)的強(qiáng)一致性,出現(xiàn)這種情況,消費(fèi)者 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會(huì)在一定時(shí)間后,繼續(xù)將這條消息推送給消費(fèi)者,消費(fèi)者就可以繼續(xù)執(zhí)行本地事務(wù)并提交了,直到成功消息隊(duì)列會(huì)進(jìn)行N次重試,如果還是失敗,則可以到死信隊(duì)列中查看失敗消息,然后通過補(bǔ)償機(jī)制實(shí)現(xiàn)分布式事務(wù)最終一致性。

































