五張圖帶你了解分布式事務(wù) Saga 模式中的狀態(tài)機(jī)
大家好,我是君哥。
狀態(tài)機(jī)在我們的工作中應(yīng)用非常廣泛,今天聊一聊分布式事務(wù)中間件 Seata 中 Saga 模式的狀態(tài)機(jī)。
1.狀態(tài)機(jī)簡(jiǎn)介
狀態(tài)機(jī)是一個(gè)數(shù)學(xué)模型,它將工作中的運(yùn)行狀態(tài)和流轉(zhuǎn)規(guī)則抽象出來(lái),可以協(xié)調(diào)相關(guān)信號(hào)來(lái)完成預(yù)先設(shè)定的操作。
下面介紹狀態(tài)機(jī)中的幾個(gè)概念:
- 狀態(tài):狀態(tài)機(jī)目前的狀態(tài)標(biāo)識(shí);
- 狀態(tài)轉(zhuǎn)移:定義狀態(tài)之間的轉(zhuǎn)移路由;
- 動(dòng)作(Action):狀態(tài)轉(zhuǎn)移需要的操作;
- 事件:要執(zhí)行某個(gè)操作時(shí)的觸發(fā)器或者口令。
狀態(tài)機(jī)一般用在狀態(tài)類型比較多(超過(guò) 3 個(gè)),分支流程比較多,初始狀態(tài)經(jīng)過(guò)多個(gè)流程的流轉(zhuǎn)達(dá)到最終狀態(tài)的場(chǎng)景。
2.Saga 模式
Saga 模式是分布式事務(wù)中長(zhǎng)事務(wù)的一種解決方案,Seata 中 Saga 模式的理論基礎(chǔ)是 Hector & Kenneth 在 1987 年發(fā)表的論文 Sagas。下圖(來(lái)自官網(wǎng))是 Seata 中 Saga 模型:
在 Saga 模式中,如果一部分分支事務(wù)已經(jīng)提交成功,當(dāng)其中一個(gè)分支事務(wù)提交失敗,狀態(tài)機(jī)就會(huì)觸發(fā)所有提交成功的分支事務(wù)進(jìn)行回滾。
分支事務(wù)中提交和回滾的邏輯需要由業(yè)務(wù)代碼來(lái)實(shí)現(xiàn)。
3.Saga 實(shí)現(xiàn)
Seata 中 Saga 模式是基于狀態(tài)機(jī)來(lái)實(shí)現(xiàn)的,使用 Saga 模式時(shí),先畫一張狀態(tài)圖,這個(gè)狀態(tài)圖定義服務(wù)調(diào)用流程,每個(gè)節(jié)點(diǎn)調(diào)用一個(gè)分支事務(wù),并且每個(gè)節(jié)點(diǎn)需要配備一個(gè)補(bǔ)償節(jié)點(diǎn)用于分支事務(wù)失敗后的補(bǔ)償動(dòng)作。
以經(jīng)典電商案例來(lái)講,一個(gè)分布式事務(wù)中有三個(gè)分支事務(wù)參數(shù)者:
分支事務(wù) | 動(dòng)作 | 狀態(tài) |
訂單服務(wù) | 保存訂單 | 保存成功、失敗 |
賬戶服務(wù) | 扣減金額 | 扣減成功、失敗 |
庫(kù)存服務(wù) | 扣減庫(kù)存 | 扣減成功、失敗 |
在這個(gè)分布式事務(wù)中,只有訂單、賬戶、庫(kù)存這三個(gè)分支事務(wù)都提交成功,整個(gè)事務(wù)才能成功。每一個(gè)分支事務(wù)提交失敗,其他執(zhí)行成功的事務(wù)都需要反向補(bǔ)償。如下圖:
圖片
比如扣減金額這個(gè)分支事務(wù)失敗了,需要反向補(bǔ)償扣減金額、保存訂單這兩個(gè)分支事務(wù)。那 Seata 是怎么做到事件觸發(fā)、狀態(tài)流轉(zhuǎn)和補(bǔ)償操作的呢?
使用 Seata 狀態(tài)機(jī),首先需要定義一個(gè) Json 文件,這個(gè) Json 文件把圖中的每個(gè)節(jié)點(diǎn)都定義成一個(gè) State,State 的類型共有四種:
- ServiceTask:對(duì)應(yīng)分支事務(wù)的提交操作。
- Choice:對(duì)應(yīng)流程中下一個(gè) State 的選擇。
- CompensationTrigger:觸發(fā)補(bǔ)償服務(wù)。
- Succeed:成功狀態(tài),當(dāng)所有分支事務(wù)都成功后才會(huì)流轉(zhuǎn)到這個(gè)狀態(tài)。
- Fail:失敗狀態(tài)。
(1)ServiceTask
下面我們看"保存訂單"這個(gè)狀態(tài):
"SaveOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "saveOrder",
"CompensateState": "DeleteOrder",
"Next": "ChoiceAccountState",
"Input": [
"$.[businessKey]",
"$.[order]"
],
"Output": {
"SaveOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
這個(gè) State 的類型是 ServiceTask,上面圖中的分支服務(wù)和補(bǔ)償服務(wù)都是這種類型,也對(duì)應(yīng)代碼中的一個(gè) Service。上面的 Json 中主要定義了三個(gè)內(nèi)容:
- 這個(gè) state 調(diào)用的 Service 方法.
- 提交失敗后的補(bǔ)償 State(CompensateState)。
- 提交成功后應(yīng)該跳轉(zhuǎn)的下一個(gè) State(ChoiceAccountState)。
(2)Choice
下面來(lái)看 ChoiceAccountState 這個(gè)狀態(tài)節(jié)點(diǎn),Json 文件定義如下:
"ChoiceAccountState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[SaveOrderResult] == true",
"Next":"ReduceAccount"
}
],
"Default":"Fail"
}
對(duì)應(yīng)的下個(gè)節(jié)點(diǎn)是 ReduceAccount,如果失敗就會(huì)跳轉(zhuǎn) Fail 狀態(tài)。
(3)Fail
上面 orderSave 這個(gè)狀態(tài)節(jié)點(diǎn)如果發(fā)生異常,會(huì)跳轉(zhuǎn)到 CompensationTrigger,CompensationTrigger 狀態(tài)節(jié)點(diǎn)定義如下:
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
}
這個(gè)節(jié)點(diǎn)會(huì)觸發(fā) SaveOrder 中定義的補(bǔ)償服務(wù),然后將最終狀態(tài)流轉(zhuǎn)到 Fail。同時(shí)我們也看到,只要到了 CompensationTrigger 這個(gè)狀態(tài)節(jié)點(diǎn),最終狀態(tài)就會(huì)流轉(zhuǎn)到 Fail。
下面我們把整個(gè) Json 文件的定義貼出來(lái)看一下:
{
"Name": "buyGoodsOnline",
"Comment": "buy a goods on line, add order, deduct account, deduct storage ",
"StartState": "SaveOrder",
"Version": "0.0.1",
#定義狀態(tài)
"States": {
"SaveOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "saveOrder",
"CompensateState": "DeleteOrder",
"Next": "ChoiceAccountState",
"Input": [
"$.[businessKey]",
"$.[order]"
],
"Output": {
"SaveOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
"ChoiceAccountState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[SaveOrderResult] == true",
"Next":"ReduceAccount"
}
],
"Default":"Fail"
},
"ReduceAccount": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"ServiceMethod": "decrease",
"CompensateState": "CompensateReduceAccount",
"Next": "ChoiceStorageState",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[money]",
{
"throwException" : "$.[mockReduceAccountFail]"
}
],
"Output": {
"ReduceAccountResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
]
},
"ChoiceStorageState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[ReduceAccountResult] == true",
"Next":"ReduceStorage"
}
],
"Default":"Fail"
},
"ReduceStorage": {
"Type": "ServiceTask",
"ServiceName": "storageService",
"ServiceMethod": "decrease",
"CompensateState": "CompensateReduceStorage",
"Input": [
"$.[businessKey]",
"$.[productId]",
"$.[count]",
{
"throwException" : "$.[mockReduceStorageFail]"
}
],
"Output": {
"ReduceStorageResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"DeleteOrder": {
"Type": "ServiceTask",
"ServiceName": "orderSave",
"ServiceMethod": "deleteOrder",
"Input": [
"$.[businessKey]",
"$.[order]"
]
},
"CompensateReduceAccount": {
"Type": "ServiceTask",
"ServiceName": "accountService",
"ServiceMethod": "compensateDecrease",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[money]"
]
},
"CompensateReduceStorage": {
"Type": "ServiceTask",
"ServiceName": "storageService",
"ServiceMethod": "compensateDecrease",
"Input": [
"$.[businessKey]",
"$.[productId]",
"$.[count]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type":"Succeed"
},
"Fail": {
"Type":"Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
上面 Json 文件中定義的 buyGoodsOnline,是狀態(tài)機(jī)加載的入口,狀態(tài)機(jī)會(huì)找到這個(gè) name,然后把狀態(tài)加載到自己的內(nèi)存中。下面,我們?cè)賮?lái)總結(jié)一下電商案例中分布式事務(wù)狀態(tài)流轉(zhuǎn)過(guò)程:
4.狀態(tài)機(jī)應(yīng)用
上面的電商例子中,三個(gè)分支服務(wù)分別定義了三個(gè) State,對(duì)應(yīng)的 ServiceMethod 如下:
- SaveOrder#saveOrder:
public boolean saveOrder(String businessKey, Order order) {
logger.info("保存訂單, businessKey:{}, order: {}", businessKey, order);
orderDao.create(order);
return true;
}
- ReduceAccount#decrease
public boolean decrease(String businessKey, Long userId, BigDecimal money) {
return accountApi.decrease(businessKey, userId, money);
}
- ReduceStorage#decrease
public boolean decrease(String businessKey, Long productId, Integer count) {
return storageApi.decrease(businessKey, productId, count);
}
狀態(tài)機(jī)在啟動(dòng)的時(shí)候,需要把上面方法中的參數(shù)都傳入,實(shí)例代碼如下:
StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");
Map<String, Object> startParams = new HashMap<>(3);
String businessKey = String.valueOf(System.currentTimeMillis());
startParams.put("businessKey", businessKey);
startParams.put("order", order);
startParams.put("mockReduceAccountFail", "true");
startParams.put("userId", order.getUserId());
startParams.put("money", order.getPayAmount());
startParams.put("productId", order.getProductId());
startParams.put("count", order.getCount());
//這里采用同步方法
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);
5.狀態(tài)機(jī)原理
下面這張圖來(lái)自于 Seata 官網(wǎng),主要講解了狀態(tài)機(jī)的工作原理:
圖片
- 狀態(tài)機(jī)啟動(dòng)時(shí),首先啟動(dòng)了全局事務(wù)。
- 將狀態(tài)機(jī)的參數(shù)記錄在本地 seata_state_machine_inst 表。
- 向 Seata Server 注冊(cè)分支事務(wù)。
- 執(zhí)行 StateA 并記錄狀態(tài)到本地?cái)?shù)據(jù)庫(kù),同時(shí)會(huì)產(chǎn)生路由事件放入 EventQueue,執(zhí)行 StateB 時(shí)取出路由消息觸發(fā)執(zhí)行。同樣 StateB 執(zhí)行時(shí)也會(huì)產(chǎn)生路由消息放入 EventQueue。
- 從 EventQueue 取出路由消息執(zhí)行 StateC。
- 狀態(tài)機(jī)結(jié)束流程,提交或回滾全局事務(wù)。
6.高可用
Seata 中的狀態(tài)機(jī)并不是獨(dú)立部署,而是內(nèi)嵌在應(yīng)用中,由于狀態(tài)機(jī)上下文和執(zhí)行日志都記錄在本地?cái)?shù)據(jù)庫(kù)中,所以狀態(tài)機(jī)本身是無(wú)狀態(tài)的。
狀態(tài)機(jī)啟動(dòng)時(shí),會(huì)發(fā)送狀態(tài)到 Seata Server,當(dāng)一個(gè)應(yīng)用宕機(jī)后,Seata Server 能感知到,并會(huì)把恢復(fù)請(qǐng)求發(fā)送到存活的實(shí)例,收到請(qǐng)求的實(shí)例從數(shù)據(jù)庫(kù)取出狀態(tài)機(jī)上下文和執(zhí)行日志進(jìn)行恢復(fù)。如下圖:
7.總結(jié)
本文講解了分布式事務(wù)中間件 Seata 給 Saga 模式設(shè)計(jì)的狀態(tài)機(jī)使用方式和原理。狀態(tài)機(jī)在我們的日常工作中使用非常廣泛,希望 Seata 的設(shè)計(jì)能對(duì)我們?cè)O(shè)計(jì)狀態(tài)機(jī)提供思路和參考。