微服務(wù)架構(gòu):利用事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
事務(wù)一致性
首先,我們來回顧一下ACID原則:
- Atomicity:原子性,改變數(shù)據(jù)狀態(tài)要么是一起完成,要么一起失敗
- Consistency:一致性,數(shù)據(jù)的狀態(tài)是完整一致的
- Isolation:隔離線,即使有并發(fā)事務(wù),互相之間也不影響
- Durability:持久性, 一旦事務(wù)提交,不可撤銷
在單體應(yīng)用中,我們可以利用關(guān)系型數(shù)據(jù)庫的特性去完成事務(wù)一致性,但是一旦應(yīng)用往微服務(wù)發(fā)展,根據(jù)業(yè)務(wù)拆分成不用的模塊,而且每個(gè)模塊的數(shù)據(jù)庫已經(jīng)分離開了,這時(shí)候,我們要面對的就是分布式事務(wù)了,需要自己在代碼里頭完成ACID了。比較流行的解決方案有:兩階段提交、補(bǔ)償機(jī)制、本地消息表(利用本地事務(wù)和MQ)、MQ的事務(wù)消息(RocketMQ)。
CAP定理
1998年,加州大學(xué)的計(jì)算機(jī)科學(xué)家 Eric Brewer 提出,分布式系統(tǒng)有三個(gè)指標(biāo)。
- Consistency:一致性
- Availability:可用性
- Partition tolerance:分區(qū)容錯(cuò)
Eric Brewer 說,這三個(gè)指標(biāo)不可能同時(shí)做到。這個(gè)結(jié)論就叫做 CAP 定理。
微服務(wù)中,不同模塊之間使用的數(shù)據(jù)庫是不同的,不同模塊之間部署的服務(wù)去也有可能是不用的,那么分區(qū)容錯(cuò)是無法避免的,因?yàn)榉?wù)之間的調(diào)用不能保證百分百的沒問題,所以系統(tǒng)設(shè)計(jì)必須考慮這種情況。因此,我們可以認(rèn)為CAP的P總是成立的,剩下的C和A無法同時(shí)做到。
實(shí)際上根據(jù)分布式系統(tǒng)中CAP原則,當(dāng)P(分區(qū)容忍)發(fā)生的時(shí)候,強(qiáng)行追求C(一致性),會(huì)導(dǎo)致(A)可用性、吞吐量下降,此時(shí)我們一般用最終一致性來保證我們系統(tǒng)的AP能力。當(dāng)然不是放棄C,而是放棄強(qiáng)一致性,而且在一般情況下CAP都能保證,只是在發(fā)生分區(qū)容錯(cuò)的情況下,我們可以通過最終一致性來保證數(shù)據(jù)一致。
事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
事件驅(qū)動(dòng)架構(gòu)在領(lǐng)域?qū)ο笾g通過異步的消息來同步狀態(tài),有些消息也可以同時(shí)發(fā)布給多個(gè)服務(wù),在消息引起了一個(gè)服務(wù)的同步后可能會(huì)引起另外消息,事件會(huì)擴(kuò)散開。嚴(yán)格意義上的事件驅(qū)動(dòng)是沒有同步調(diào)用的。
例子:
在電商里面,用戶下單必須根據(jù)庫存來確定訂單是否成交。
項(xiàng)目架構(gòu):SpringBoot2+Mybatis+tk-Mybatis+ActiveMQ【因?yàn)樾±?,不做成Spring Cloud架構(gòu)】
首先,我們來看看正常的服務(wù)之間調(diào)用:

代碼:
- @Override
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrder(OrderQuery query) {
- Result result = new Result();
- // 先遠(yuǎn)程調(diào)用Stock-Service去減少庫存
- RestTemplate restTemplate = new RestTemplate();
- //請求頭
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- //封裝成一個(gè)請求對象
- HttpEntity entity = new HttpEntity(query, headers);
- // 同步調(diào)用庫存服務(wù)的接口
- Result stockResult = restTemplate.postForObject("http://127.0.0.1:8081/stock/reduceStock",entity,Result.class);
- if (stockResult.getCode() == Result.ResultConstants.SUCCESS){
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(1);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- result.setMsg("下單成功");
- }else {
- result.setMsg("下單失敗");
- }
- }else {
- result.setCode(Result.ResultConstants.FAIL);
- result.setMsg("下單失?。?quot;+stockResult.getMsg());
- }
- return result;
- }
我們可以看到,這樣的服務(wù)調(diào)用的弊端多多:
1、訂單服務(wù)需同步等待庫存服務(wù)的返回結(jié)果,接口結(jié)果返回延誤。2、訂單服務(wù)直接依賴于庫存服務(wù),只要庫存服務(wù)崩了,訂單服務(wù)不能再正常運(yùn)行。3、訂單服務(wù)需考慮并發(fā)問題,庫存最后可能為負(fù)。
下面開始利用事件驅(qū)動(dòng)實(shí)現(xiàn)最終一致性
1、在訂單服務(wù)新增訂單后,訂單的狀態(tài)是“已開啟”,然后發(fā)布一個(gè)Order Created事件到消息隊(duì)列上

代碼:
- @Transactional(rollbackFor = Exception.class)
- public Result placeOrderByMQ(OrderQuery query) {
- Result result = new Result();
- // 先創(chuàng)建訂單,狀態(tài)為下單0
- Order order = new Order();
- BeanUtils.copyProperties(query,order);
- order.setOrderStatus(0);
- Integer insertCount = orderMapper.insertSelective(order);
- if (insertCount == 1){
- // 發(fā)送 訂單消息
- MqOrderMsg mqOrderMsg = new MqOrderMsg();
- mqOrderMsg.setId(order.getId());
- mqOrderMsg.setGoodCount(query.getGoodCount());
- mqOrderMsg.setGoodName(query.getGoodName());
- mqOrderMsg.setStockId(query.getStockId());
- jmsProducer.sendOrderCreatedMsg(mqOrderMsg);
- // 此時(shí)的訂單只是開啟狀態(tài)
- result.setMsg("下單成功");
- }
- return result;
- }
2、庫存服務(wù)在監(jiān)聽到消息隊(duì)列OrderCreated中的消息,將庫存表中商品的庫存減去下單數(shù)量,然后再發(fā)送一個(gè)Stock Locked事件給消息隊(duì)列。

代碼:
- /**
- * 接收下單消息
- * @param message 接收到的消息
- * @param session 上下文
- */
- @JmsListener(destination = ORDER_CREATE,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveOrderCreatedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- MqStockMsg result = new MqStockMsg();
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqOrderMsg msg = (MqOrderMsg)objectMessage.getObject();
- Integer updateCount = stockMapper.updateNumByStockId(msg.getStockId(),msg.getGoodCount());
- if (updateCount >= 1){
- result.setSuccess(true);
- result.setOrderId(msg.getId());
- }else {
- result.setSuccess(false);
- }
- // 手動(dòng)ack,使消息出隊(duì)列,不然會(huì)不斷消費(fèi)
- message.acknowledge();
- // 發(fā)送庫存鎖定消息到MQ
- jmsProducer.sendStockLockedMsg(result);
- }
- } catch (JMSException e) {
- log.error("接收訂單創(chuàng)建消息報(bào)錯(cuò):"+e.getMessage());
- }
- }
仔細(xì)的朋友可能會(huì)看到:message.acknowledge(),即手動(dòng)確認(rèn)消息。因?yàn)樵诒WC庫存服務(wù)的邏輯能正常執(zhí)行后再確認(rèn)消息已消費(fèi),可以保證消息的投遞可靠性,萬一在庫存服務(wù)執(zhí)行時(shí)報(bào)出異常,我們可以做到重新消費(fèi)該下單消息。
3、訂單服務(wù)接收到Stock Locked事件,將訂單的狀態(tài)改為“已確認(rèn)”

代碼:
- /**
- * 判斷是否還有庫存,有庫存更新訂單狀態(tài)為1,無庫存更新訂單狀態(tài)為2,并且通知用戶(WebSocket)
- * @param message
- */
- @JmsListener(destination = STOCK_LOCKED,containerFactory = "myListenerContainerFactory")
- @Transactional(rollbackFor = Exception.class)
- public void receiveStockLockedMsg(Message message, Session session){
- try {
- if (message instanceof ActiveMQObjectMessage){
- ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
- MqStockMsg msg = (MqStockMsg)objectMessage.getObject();
- if (msg.isSuccess()){
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(1);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- log.info("訂單【"+msg.getOrderId()+"】下單成功");
- }else {
- Order updateOrder = new Order();
- updateOrder.setId(msg.getOrderId());
- updateOrder.setOrderStatus(2);
- orderMapper.updateByPrimaryKeySelective(updateOrder);
- // 通知用戶庫存不足,訂單被取消
- log.error("訂單【"+msg.getOrderId()+"】因庫存不足被取消");
- }
- // 手動(dòng)ack,使消息出隊(duì)列,不然會(huì)不斷消費(fèi)
- message.acknowledge();
- }
- } catch (JMSException e) {
- log.error("接收庫存鎖定消息報(bào)錯(cuò):"+e.getMessage());
- }
- }
同樣,這里我們也是會(huì)利用手動(dòng)確認(rèn)消息來保證消息的投遞可靠性。
至此,已經(jīng)全部搞定了。我們看一下和正常的服務(wù)調(diào)用對比如何:
1、訂單服務(wù)不再直接依賴于庫存服務(wù),而是將下單事件發(fā)送到MQ中,讓庫存監(jiān)聽。
2、訂單服務(wù)能真正的作為一個(gè)模塊獨(dú)立運(yùn)行。
3、解決了并發(fā)問題,而且MQ的隊(duì)列處理效率非常的高。
但是也存在下面的問題:
1、用戶體驗(yàn)改變了:因?yàn)槭褂檬录C(jī)制,訂單是立即生成的,可是很有可能過一會(huì),系統(tǒng)會(huì)提醒你沒貨了。。這就像是排隊(duì)搶購一樣,排著排著就被通知沒貨了,不用再排隊(duì)了。
2、數(shù)據(jù)庫可能會(huì)存在很對沒有完成下單的訂單。
最后,如果真的要考慮用戶體驗(yàn),并且不想數(shù)據(jù)庫存在很多不必要的數(shù)據(jù),該怎么辦?
那就把訂單服務(wù)和庫存服務(wù)聚合在一起吧。解決當(dāng)前的問題應(yīng)當(dāng)是首先要考慮的,我們設(shè)計(jì)微服務(wù)的目的是本想是解決業(yè)務(wù)并發(fā)量。而現(xiàn)在面臨的卻是用戶體驗(yàn)的問題,所以架構(gòu)設(shè)計(jì)也是需要妥協(xié)的。
最主要是,我們是經(jīng)過思考和分析的,每個(gè)方案能做到哪種程度,能應(yīng)用到哪種場景。正所謂,技術(shù)要和實(shí)際場景結(jié)合,我們不能為了追求新技術(shù)而生搬硬套。