談一下關(guān)于CQRS架構(gòu)如何實(shí)現(xiàn)高性能
CQRS架構(gòu)簡(jiǎn)介
關(guān)于CQRS(Command Query Responsibility Segration)架構(gòu),大家應(yīng)該不會(huì)陌生了。簡(jiǎn)單的說,就是一個(gè)系統(tǒng),從架構(gòu)上把它拆分為兩部分:命令處理(寫請(qǐng)求)+查詢處理(讀請(qǐng)求)。然后讀寫兩邊可以用不同的架構(gòu)實(shí)現(xiàn),以實(shí)現(xiàn)CQ兩端(即Command Side,簡(jiǎn)稱C端;Query Side,簡(jiǎn)稱Q端)的分別優(yōu)化。CQRS作為一個(gè)讀寫分離思想的架構(gòu),在數(shù)據(jù)存儲(chǔ)方面,沒有做過多的約束。所以,我覺得CQRS可以有不同層次的實(shí)現(xiàn),比如:
- CQ兩端數(shù)據(jù)庫共享,CQ兩端只是在上層代碼上分離;這種做法,帶來的好處是可以讓我們的代碼讀寫分離,更好維護(hù),且沒有CQ兩端的數(shù)據(jù)一致性問題,因?yàn)槭枪蚕硪粋€(gè)數(shù)據(jù)庫的。我個(gè)人認(rèn)為,這種架構(gòu)很實(shí)用,既兼顧了數(shù)據(jù)的強(qiáng)一致性,又能讓代碼好維護(hù)。
- CQ兩端數(shù)據(jù)庫和上層代碼都分離,然后Q的數(shù)據(jù)由C端同步過來,一般是通過Domain Event進(jìn)行同步。同步方式有兩種,同步或異步,如果需要CQ兩端的強(qiáng)一致性,則需要用同步;如果能接受CQ兩端數(shù)據(jù)的最終一致性,則可以使用異步。采用這種方式的架構(gòu),個(gè)人覺得,C端應(yīng)該采用Event Sourcing(簡(jiǎn)稱ES)模式才有意義,否則就是自己給自己找麻煩。因?yàn)檫@樣做你會(huì)發(fā)現(xiàn)會(huì)出現(xiàn)冗余數(shù)據(jù),同樣的數(shù)據(jù),在C端的db中有,而在Q端的db中也有。和上面***種做法相比,我想不到什么好處。而采用ES,則所有C端的***數(shù)據(jù)全部用Domain Event表達(dá)即可;而要查詢顯示用的數(shù)據(jù),則從Q端的ReadDB(關(guān)系型數(shù)據(jù)庫)查詢即可。
我覺得要實(shí)現(xiàn)高性能,可以談的東西還有很多。下面我想重點(diǎn)說說我想到的一些設(shè)計(jì)思路:
避開資源爭(zhēng)奪
秒殺活動(dòng)的例子分析
我覺得這是很重要的一點(diǎn)。什么是資源爭(zhēng)奪?我想就是多個(gè)線程同時(shí)修改同一個(gè)數(shù)據(jù)。就像阿里秒殺活動(dòng)一樣,秒殺開搶時(shí),很多人同時(shí)搶一個(gè)商品,導(dǎo)致商品的庫存會(huì)被并發(fā)更新減庫存,這就是一個(gè)資源爭(zhēng)奪的例子。一般如果資源競(jìng)爭(zhēng)不激烈,那無所謂,不會(huì)影響性能;但是如果像秒殺這種場(chǎng)景,那db就會(huì)抗不住了。在秒殺這種場(chǎng)景下,大量線程需要同時(shí)更新同一條記錄,進(jìn)而導(dǎo)致MySQL內(nèi)部大量線程堆積,對(duì)服務(wù)性能、穩(wěn)定性造成很大傷害。那怎么辦呢?我記得阿里的丁奇寫過一個(gè)分享,思路就是當(dāng)MySQL的服務(wù)端多個(gè)線程同時(shí)修改一條記錄時(shí),可以對(duì)這些修改請(qǐng)求進(jìn)行排隊(duì),然后對(duì)于InnoDB引擎層,就是串行的。這樣排隊(duì)后,不管上層應(yīng)用發(fā)過來多少并行的修改同一行的請(qǐng)求,對(duì)于MySQL Server端來說,內(nèi)部總是會(huì)聰明的對(duì)同一行的修改請(qǐng)求都排隊(duì)處理;這樣就能確保不會(huì)有并發(fā)產(chǎn)生,從而不會(huì)導(dǎo)致線程浪費(fèi)堆積,導(dǎo)致數(shù)據(jù)庫性能下降。這個(gè)方案可以見下圖所示:
如上圖所示,當(dāng)很多請(qǐng)求都要修改A記錄時(shí),MySQL Server內(nèi)部會(huì)對(duì)這些請(qǐng)求進(jìn)行排隊(duì),然后一個(gè)個(gè)將對(duì)A的修改請(qǐng)求提交到InnoDB引擎層。這樣看似在排隊(duì),實(shí)際上會(huì)確保MySQL Server不會(huì)死掉,可以保證對(duì)外提供穩(wěn)定的TPS。
但是,對(duì)于商品秒殺這個(gè)場(chǎng)景,還有優(yōu)化的空間,就是Group Commit技術(shù)。Group Commit就是對(duì)多個(gè)請(qǐng)求合并為一次操作進(jìn)行處理。秒殺時(shí),大家都在購買這個(gè)商品,A買2件,B買3件,C買1件;其實(shí)我們可以把A,B,C的這三個(gè)請(qǐng)求合并為一次減庫存操作,就是一次性減6件。這樣,對(duì)于A,B,C的這三個(gè)請(qǐng)求,在InnoDB層我們只需要做一次減庫存操作即可。假設(shè)我們Group Commit的每一批的size是50,那就是可以將50個(gè)減操作合并為一次減操作,然后提交到InnoDB。這樣,將大大提高秒殺場(chǎng)景下,商品減庫存的TPS。但是這個(gè)Group Commit的每批大小不是越大越好,而是要根據(jù)并發(fā)量以及服務(wù)器的實(shí)際情況做測(cè)試來得到一個(gè)***的值。通過Group Commit技術(shù),根據(jù)丁奇的PPT,商品減庫存的TPS性能從原來的1.5W提高到了8.5W。
從上面這個(gè)例子,我們可以看到阿里是如何在實(shí)際場(chǎng)景中,通過優(yōu)化MySQL Server來實(shí)現(xiàn)高并發(fā)的商品減庫存的。但是,這個(gè)技術(shù)一般人還真的不會(huì)!因?yàn)闆]多少人有能力去優(yōu)化MySQL的服務(wù)端,排隊(duì)也不行,更別說Group Commit了。這個(gè)功能并不是MySQL Server自帶的,而是需要自己實(shí)現(xiàn)的。但是,這個(gè)思路我想我們都可以借鑒。
CQRS如何實(shí)現(xiàn)避免資源競(jìng)爭(zhēng)
那么對(duì)于CQRS架構(gòu),如何按照這個(gè)思路來設(shè)計(jì)呢?我想重點(diǎn)說一下我上面提到的第二種CQRS架構(gòu)。對(duì)于C端,我們的目標(biāo)是盡可能的在1s內(nèi)處理更多的Command,也就是數(shù)據(jù)寫請(qǐng)求。在經(jīng)典DDD的四層架構(gòu)中,我們會(huì)有一個(gè)模式叫工作單元模式,即Unit of Work(簡(jiǎn)稱UoW)模式。通過該模式,我們能在應(yīng)用層,一次性以事務(wù)的方式將當(dāng)前請(qǐng)求所涉及的多個(gè)對(duì)象的修改提交到DB。微軟的EF實(shí)體框架的DbContext就是一個(gè)UoW模式的實(shí)現(xiàn)。這種做法的好處是,一個(gè)請(qǐng)求對(duì)多個(gè)聚合根的修改,能做到強(qiáng)一致性,因?yàn)槭鞘聞?wù)的。但是這種做法,實(shí)際上,沒有很好的遵守避開資源競(jìng)爭(zhēng)的原則。試想,事務(wù)A要修改a1,a2,a3三個(gè)聚合根;事務(wù)B要修改a2,a3,a4;事務(wù)C要修改a3,a4,a5三個(gè)聚合根。那這樣,我們很容易理解,這三個(gè)事務(wù)只能串行執(zhí)行,因?yàn)樗鼈円薷南嗤馁Y源。比如事務(wù)A和事務(wù)B都要修改a2,a3這兩個(gè)聚合根,那同一時(shí)刻,只能由一個(gè)事務(wù)能被執(zhí)行。同理,事務(wù)B和事務(wù)C也是一樣。如果A,B,C這種事務(wù)執(zhí)行的并發(fā)很高,那數(shù)據(jù)庫就會(huì)出現(xiàn)嚴(yán)重的并發(fā)沖突,甚至死鎖。那要如何避免這種資源競(jìng)爭(zhēng)呢?我覺得我們可以采取三個(gè)措施:
讓一個(gè)Command總是只修改一個(gè)聚合根
這個(gè)做法其實(shí)就是縮小事務(wù)的范圍,確保一個(gè)事務(wù)一次只涉及一條記錄的修改。也就是做到,只有單個(gè)聚合根的修改才是事務(wù)的,讓聚合根成為數(shù)據(jù)強(qiáng)一致性的最小單位。這樣我們就能***化的實(shí)現(xiàn)并行修改。但是你會(huì)問,但是我一個(gè)請(qǐng)求就是會(huì)涉及多個(gè)聚合根的修改的,這種情況怎么辦呢?在CQRS架構(gòu)中,有一個(gè)東西叫Saga。Saga是一種基于事件驅(qū)動(dòng)的思想來實(shí)現(xiàn)業(yè)務(wù)流程的技術(shù),通過Saga,我們可以用最終一致性的方式最終實(shí)現(xiàn)對(duì)多個(gè)聚合根的修改。對(duì)于一次涉及多個(gè)聚合根修改的業(yè)務(wù)場(chǎng)景,一般總是可以設(shè)計(jì)為一個(gè)業(yè)務(wù)流程,也就是可以定義出要先做什么后做什么。比如以銀行轉(zhuǎn)賬的場(chǎng)景為例子,如果是按照傳統(tǒng)事務(wù)的做法,那可能是先開啟一個(gè)事務(wù),然后讓A賬號(hào)扣減余額,再讓B賬號(hào)加上余額,***提交事務(wù);如果A賬號(hào)余額不足,則直接拋出異常,同理B賬號(hào)如果加上余額也遇到異常,那也拋出異常即可,事務(wù)會(huì)保證原子性以及自動(dòng)回滾。也就是說,數(shù)據(jù)一致性已經(jīng)由DB幫我們做掉了。
但是,如果是Saga的設(shè)計(jì),那就不是這樣了。我們會(huì)把整個(gè)轉(zhuǎn)賬過程定義為一個(gè)業(yè)務(wù)流程。然后,流程中會(huì)包括多個(gè)參與該流程的聚合根以及一個(gè)用于協(xié)調(diào)聚合根交互的流程管理器(ProcessManager,無狀態(tài)),流程管理器負(fù)責(zé)響應(yīng)流程中的每個(gè)聚合根產(chǎn)生的領(lǐng)域事件,然后根據(jù)事件發(fā)送相應(yīng)的Command,從而繼續(xù)驅(qū)動(dòng)其他的聚合根進(jìn)行操作。
轉(zhuǎn)賬的例子,涉及到的聚合根有:兩個(gè)銀行賬號(hào)聚合根,一個(gè)交易(Transaction)聚合根,它用于負(fù)責(zé)存儲(chǔ)流程的當(dāng)前狀態(tài),它還會(huì)維護(hù)流程狀態(tài)變更時(shí)的規(guī)則約束;然后當(dāng)然還有一個(gè)流程管理器。轉(zhuǎn)賬開始時(shí),我們會(huì)先創(chuàng)建一個(gè)Transaction聚合根,然后它產(chǎn)生一個(gè)TransactionStarted的事件,然后流程管理器響應(yīng)事件,然后發(fā)送一個(gè)Command讓A賬號(hào)聚合根做減余額的操作;A賬號(hào)操作完成后,產(chǎn)生領(lǐng)域事件;然后流程管理器響應(yīng)事件,然后發(fā)送一個(gè)Command通知Transaction聚合根確認(rèn)A賬號(hào)的操作;確認(rèn)完成后也會(huì)產(chǎn)生事件,然后流程管理器再響應(yīng),然后發(fā)送一個(gè)Command通知B賬號(hào)做加上余額的操作;后續(xù)的步驟就不詳細(xì)講了。大概意思我想已經(jīng)表達(dá)了??傊?,通過這樣的設(shè)計(jì),我們可以通過事件驅(qū)動(dòng)的方式,來完成整個(gè)業(yè)務(wù)流程。如果流程中的任何一步出現(xiàn)了異常,那我們可以在流程中定義補(bǔ)償機(jī)制實(shí)現(xiàn)回退操作。或者不回退也沒關(guān)系,因?yàn)門ransaction聚合根記錄了流程的當(dāng)前狀態(tài),這樣我們可以很方便的后續(xù)排查有狀態(tài)沒有正常結(jié)束的轉(zhuǎn)賬交易。具體的設(shè)計(jì)和代碼,有興趣的可以去看一下ENode源代碼中的銀行轉(zhuǎn)賬的例子,里面有完整的實(shí)現(xiàn)。
對(duì)修改同一個(gè)聚合根的Command進(jìn)行排隊(duì)
和上面秒殺的設(shè)計(jì)一樣,我們可以對(duì)要同時(shí)修改同一個(gè)聚合根的Command進(jìn)行排隊(duì)。只不過這里的排隊(duì)不是在MySQL Server端,而是在我們自己程序里做這個(gè)排隊(duì)。如果我們是單臺(tái)服務(wù)器處理所有的Command,那排隊(duì)很容易做。就是只要在內(nèi)存中,當(dāng)要處理某個(gè)Command時(shí),判斷當(dāng)前Command要修改的聚合根是否前面已經(jīng)有Command在處理,如果有,則排隊(duì);如果沒有,則直接執(zhí)行。然后當(dāng)這個(gè)聚合根的前一個(gè)Command執(zhí)行完后,我們就能處理該聚合根的下一個(gè)Command了;但是如果是集群的情況下呢,也就是你不止有一臺(tái)服務(wù)器在處理Command,而是有十臺(tái),那要怎么辦呢?因?yàn)橥粫r(shí)刻,完全有可能有兩個(gè)不同的Command在修改同一個(gè)聚合根。這個(gè)問題也簡(jiǎn)單,就是我們可以對(duì)要修改聚合根的Command根據(jù)聚合根的ID進(jìn)行路由,根據(jù)聚合根的ID的hashcode,然后和當(dāng)前處理Command的服務(wù)器數(shù)目取模,就能確定當(dāng)前Command要被路由到哪個(gè)服務(wù)器上處理了。這樣我們能確保在服務(wù)器數(shù)目不變的情況下,針對(duì)同一個(gè)聚合根實(shí)例修改的所有Command都是被路由到同一臺(tái)服務(wù)器處理。然后加上我們前面在單個(gè)服務(wù)器里面內(nèi)部做的排隊(duì)設(shè)計(jì),就能最終保證,對(duì)同一個(gè)聚合根的修改,同一時(shí)刻只有一個(gè)線程在進(jìn)行。
通過上面這兩個(gè)設(shè)計(jì),我們可以確保C端所有的Command,都不會(huì)出現(xiàn)并發(fā)沖突。但是也要付出代價(jià),那就是要接受最終一致性。比如Saga的思想,就是在最終一致性的基礎(chǔ)上而實(shí)現(xiàn)的一種設(shè)計(jì)。然后,基于以上兩點(diǎn)的這種架構(gòu)的設(shè)計(jì),我覺得最關(guān)鍵的是要做到:1)分布式消息隊(duì)列的可靠,不能丟消息,否則Saga流程就斷了;2)消息隊(duì)列要高性能,支持高吞吐量;這樣才能在高并發(fā)時(shí),實(shí)現(xiàn)整個(gè)系統(tǒng)的整體的高性能。我開發(fā)的EQueue就是為了這個(gè)目標(biāo)而設(shè)計(jì)的一個(gè)分布式消息隊(duì)列,有興趣的朋友可以去了解下哦。
Command和Event的冪等處理
CQRS架構(gòu)是基于消息驅(qū)動(dòng)的,所以我們要盡量避免消息的重復(fù)消費(fèi)。否則,可能會(huì)導(dǎo)致某個(gè)消息被重復(fù)消費(fèi)而導(dǎo)致最終數(shù)據(jù)無法一致。對(duì)于CQRS架構(gòu),我覺得主要考慮三個(gè)環(huán)節(jié)的消息冪等處理。
Command的冪等處理
這一點(diǎn),我想不難理解。比如轉(zhuǎn)賬的例子中,假如A賬號(hào)扣減余額的命令被重復(fù)執(zhí)行了,那會(huì)導(dǎo)致A賬號(hào)扣了兩次錢。那***就數(shù)據(jù)無法一致了。所以,我們要保證Command不能被重復(fù)執(zhí)行。那怎么保證呢?想想我們平時(shí)一些判斷重復(fù)的操作怎么做的?一般有兩個(gè)做法:1)db對(duì)某一列建唯一索引,這樣可以嚴(yán)格保證某一列數(shù)據(jù)的值不會(huì)重復(fù);2)通過程序保證,比如插入前先通過select查詢判斷是否存在,如果不存在,則insert,否則就認(rèn)為重復(fù);顯然通過第二種設(shè)計(jì),在并發(fā)的情況下,是不能保證絕對(duì)的唯一性的。然后CQRS架構(gòu),我認(rèn)為我們可以通過持久化Command的方式,然后把CommandId作為主鍵,確保Command不會(huì)重復(fù)。那我們是否要每次執(zhí)行Command前線判斷該Command是否存在呢?不用。因?yàn)槌霈F(xiàn)Command重復(fù)的概率很低,一般只有是在我們服務(wù)器機(jī)器數(shù)量變動(dòng)時(shí)才會(huì)出現(xiàn)。比如增加了一臺(tái)服務(wù)器后,會(huì)影響到Command的路由,從而最終會(huì)導(dǎo)致某個(gè)Command會(huì)被重復(fù)處理,關(guān)于這里的細(xì)節(jié),我這里不想多展開了,呵呵。有問題到回復(fù)里討論吧。這個(gè)問題,我們也可以***程度上避免,比如我們可以在某一天系統(tǒng)最空的時(shí)候預(yù)先增加好服務(wù)器,這樣可以把出現(xiàn)重復(fù)消費(fèi)消息的情況降至***。自然也就***化的避免了Command的重復(fù)執(zhí)行。所以,基于這個(gè)原因,我們沒有必要在每次執(zhí)行一個(gè)Command時(shí)先判斷該Command是否已執(zhí)行。而是只要在Command執(zhí)行完之后,直接持久化該Command即可,然后因?yàn)閐b中以CommandId為主鍵,所以如果出現(xiàn)重復(fù),會(huì)主鍵重復(fù)的異常。我們只要捕獲該異常,然后就知道了該Command已經(jīng)存在,這就說明該Command之前已經(jīng)被處理過了,那我們只要忽略該Command即可(當(dāng)然實(shí)際上不能直接忽略,這里我由于篇幅問題,我就不詳細(xì)展開了,具體我們可以再討論)。然后,如果持久化沒有問題,說明該Command之前沒有被執(zhí)行過,那就OK了。這里,還有個(gè)問題也不能忽視,就是某個(gè)Command***次執(zhí)行完成了,也持久化成功了,但是它由于某種原因沒有從消息隊(duì)列中刪除。所以,當(dāng)它下次再被執(zhí)行時(shí),Command Handler里可能會(huì)報(bào)異常,所以,健壯的做法時(shí),我們要捕獲這個(gè)異常。當(dāng)出現(xiàn)異常時(shí),我們要檢查該Command是否之前已執(zhí)行過,如果有,就要認(rèn)為當(dāng)前Command執(zhí)行正確,然后要把之前Command產(chǎn)生的事件拿出來做后續(xù)的處理。這個(gè)問題有點(diǎn)深入了,我暫時(shí)不細(xì)化了。有興趣的可以找我私聊。
Event持久化的冪等處理
然后,因?yàn)槲覀兊募軜?gòu)是基于ES的,所以,針對(duì)新增或修改聚合根的Command,總是會(huì)產(chǎn)生相應(yīng)的領(lǐng)域事件(Domain Event)。我們接下來的要做的事情就是要先持久化事件,再分發(fā)這些事件給所有的外部事件訂閱者。大家知道,聚合根有生命周期,在它的生命周期里,會(huì)經(jīng)歷各種事件,而事件的發(fā)生總有確定的時(shí)間順序。所以,為了明確哪個(gè)事件先發(fā)生,哪個(gè)事件后發(fā)生,我們可以對(duì)每個(gè)事件設(shè)置一個(gè)版本號(hào),即version。聚合根***個(gè)產(chǎn)生的事件的version為1,第二個(gè)為2,以此類推。然后聚合根本身也有一個(gè)版本號(hào),用于記錄當(dāng)前自己的版本是什么,它每次產(chǎn)生下一個(gè)事件時(shí),也能根據(jù)自己的版本號(hào)推導(dǎo)出下一個(gè)要產(chǎn)生的事件的版本號(hào)是什么。比如聚合根當(dāng)前的版本號(hào)為5,那下一個(gè)事件的版本號(hào)則為6。通過為每個(gè)事件設(shè)計(jì)一個(gè)版本號(hào),我們就能很方便的實(shí)現(xiàn)聚合根產(chǎn)生事件時(shí)的并發(fā)控制了,因?yàn)橐粋€(gè)聚合根不可能產(chǎn)生兩個(gè)版本號(hào)一樣的事件,如果出現(xiàn)這種情況,那說明一定是出現(xiàn)并發(fā)沖突了。也就是一定是出現(xiàn)了同一個(gè)聚合根同時(shí)被兩個(gè)Command修改的情況了。所以,要實(shí)現(xiàn)事件持久化的冪等處理,也很好做了,就是db中的事件表,對(duì)聚合根ID+聚合根當(dāng)前的version建唯一索引。這樣就能在db層面,確保Event持久化的冪等處理。另外,對(duì)于事件的持久化,我們也可以像秒殺那樣,實(shí)現(xiàn)Group Commit。就是Command產(chǎn)生的事件不用立馬持久化,而是可以先積累到一定的量,比如50個(gè),然后再一次性Group Commit所有的事件。然后事件持久化完成后,再修改每個(gè)聚合根的狀態(tài)即可。如果Group Commit事件時(shí)遇到并發(fā)沖突(由于某個(gè)聚合根的事件的版本號(hào)有重復(fù)),則退回為單個(gè)一個(gè)個(gè)持久化事件即可。為什么可以放心的這樣做?因?yàn)槲覀円呀?jīng)基本做到確保一個(gè)聚合根同一時(shí)刻只會(huì)被一個(gè)Command修改。這樣就能基本保證,這些Group Commit的事件也不會(huì)出現(xiàn)版本號(hào)沖突的情況。所以,大家是否覺得,很多設(shè)計(jì)其實(shí)是一環(huán)套一環(huán)的。Group Commit何時(shí)出發(fā)?我覺得可以只要滿足兩個(gè)條件了就可以觸發(fā):1)某個(gè)定時(shí)的周期到了就可以觸發(fā),這個(gè)定時(shí)周期可以根據(jù)自己的業(yè)務(wù)場(chǎng)景進(jìn)行配置,比如每隔50ms觸發(fā)一次;2)要Commit的事件到達(dá)某個(gè)***值,即每批可以持久化的事件個(gè)數(shù)的***值,比如每50個(gè)事件為一批,這個(gè)BatchSize也需要根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景和你的存儲(chǔ)db的性能綜合測(cè)試評(píng)估來得到一個(gè)最適合的值;何時(shí)可以使用Group Commit?我覺得只有是在并發(fā)非常高,當(dāng)單個(gè)持久化事件遇到性能瓶頸時(shí),才需要使用。否則反而會(huì)降低事件持久化的實(shí)時(shí)性,Group Commit提高的是高并發(fā)下單位時(shí)間內(nèi)持久化的事件數(shù)。目的是為了降低應(yīng)用和DB之間交互的次數(shù),從而減少IO的次數(shù)。不知不覺就說到了最開始說的那3點(diǎn)性能優(yōu)化中的,盡量減少IO了,呵呵。
Event消費(fèi)時(shí)的冪等處理
CQRS架構(gòu)圖中,事件持久化完成后,接下來就是會(huì)把這些事件發(fā)布出去(發(fā)送到分布式消息隊(duì)列),給消費(fèi)者消費(fèi)了,也就是給所有的Event Handler處理。這些Event Handler可能是更新Q端的ReadDB,也可能是發(fā)送郵件,也可能是調(diào)用外部系統(tǒng)的接口。作為框架,應(yīng)該有職責(zé)盡量保證一個(gè)事件盡量不要被某個(gè)Event Handler重復(fù)消費(fèi),否則,就需要Event Handler自己保證了。這里的冪等處理,我能想到的辦法就是用一張表,存儲(chǔ)某個(gè)事件是否被某個(gè)Event Handler處理的信息。每次調(diào)用Event Handler之前,判斷該Event Handler是否已處理過,如果沒處理過,就處理,處理完后,插入一條記錄到這個(gè)表。這個(gè)方法相信大家也都很容易想到。如果框架不做這個(gè)事情,那Event Handler內(nèi)部就要自己做好冪等處理。這個(gè)思路就是select if not exist, then handle, and at last insert的過程??梢钥吹竭@個(gè)過程不像前面那兩個(gè)過程那樣很嚴(yán)謹(jǐn),因?yàn)樵诓l(fā)的情況下,理論上還是會(huì)出現(xiàn)重復(fù)執(zhí)行Event Handler的情況?;蛘呒幢悴皇遣l(fā)時(shí)也可能會(huì)造成,那就是假如event handler執(zhí)行成功了,但是last insert失敗了,那框架還是會(huì)重試執(zhí)行event handler。這里,你會(huì)很容易想到,為了做這個(gè)冪等支持,Event Handler的一次完整執(zhí)行,需要增加不少時(shí)間,從而會(huì)***導(dǎo)致Query Side的數(shù)據(jù)更新的延遲。不過CQRS架構(gòu)的思想就是Q端的數(shù)據(jù)由C端通過事件同步過來,所以Q端的更新本身就是有一定的延遲的。這也是CQRS架構(gòu)所說的要接收最終一致性的原因。
關(guān)于冪等處理的性能問題的思考
關(guān)于CommandStore的性能瓶頸分析
大家知道,整個(gè)CQRS架構(gòu)中,Command,Event的產(chǎn)生以及處理是非常頻繁的,數(shù)據(jù)量也是非常大的。那如何保證這幾步冪等處理的高性能呢?對(duì)于Command的冪等處理,如果對(duì)性能要求不是很高,那我們可以簡(jiǎn)單使用關(guān)系型DB即可,比如Sql Server, MySQL都可以。要實(shí)現(xiàn)冪等處理,只需要把主鍵設(shè)計(jì)為CommandId即可。其他不需要額外的唯一索引。所以這里的性能瓶頸相當(dāng)于是對(duì)單表做大量insert操作的***TPS。一般MySQL數(shù)據(jù)庫,SSD硬盤,要達(dá)到2W TPS應(yīng)該沒什么問題。對(duì)于這個(gè)表,我們基本只有寫入操作,不需要讀取操作。只有是在Command插入遇到主鍵沖突,然后才可能需要偶爾根據(jù)主鍵讀取一下已經(jīng)存在的Command的信息。然后,如果單表數(shù)據(jù)量太大,那怎么辦,就是分表分庫了。這就是最開始談到的,要避開海量數(shù)據(jù)這個(gè)原則了,我想就是通過sharding避開大數(shù)據(jù)來實(shí)現(xiàn)繞過IO瓶頸的設(shè)計(jì)了。不過一旦涉及到分庫,分表,就又涉及到根據(jù)什么分庫分表了,對(duì)于存儲(chǔ)Command的表,我覺得比較簡(jiǎn)單,我們可以先根據(jù)Command的類型(相當(dāng)于根據(jù)業(yè)務(wù)做垂直拆分)做***級(jí)路由,然后相同Command類型的Command,根據(jù)CommandId的hashcode路由(水平拆分)即可。這樣就能解決Command通過關(guān)系型DB存儲(chǔ)的性能瓶頸問題。其實(shí)我們還可以通過流行的基于key/value的NoSQL來存儲(chǔ),比如可以選擇本地運(yùn)行的leveldb,或者支持分布式的ssdb,或者其他的,具體選擇哪個(gè),可以結(jié)合自己的業(yè)務(wù)場(chǎng)景來選擇??傊珻ommand的存儲(chǔ)可以有很多選擇。
關(guān)于EventStore的性能瓶頸分析
通過上面的分析,我們知道Event的存儲(chǔ)唯一需要的是AggregateRootId+Version的唯一索引,其他就無任何要求了。那這樣就和CommandStore一樣好辦了。如果也是采用關(guān)系型DB,那只要用AggregateRootId+Version這兩個(gè)作為聯(lián)合主鍵即可。然后如果要分庫分表,我們可以先根據(jù)AggregateRootType做***級(jí)垂直拆分,即把不同的聚合根類型產(chǎn)生的事件分開存儲(chǔ)。然后和Command一樣,相同聚合根產(chǎn)生的事件,可以根據(jù)AggregateRootId的hashcode來拆分,同一個(gè)AggregateRootId的所有事件都放一起。這樣既能保證AggregateRootId+Version的唯一性,又能保證數(shù)據(jù)的水平拆分。從而讓整個(gè)EventStore可以***制水平伸縮。當(dāng)然,我們也完全可以采用基于key/value的NoSQL來存儲(chǔ)。另外,我們查詢事件,也都是會(huì)確定聚合根的類型以及聚合根的ID,所以,這和路由機(jī)制一直,不會(huì)導(dǎo)致我們無法知道當(dāng)前要查詢的聚合根的事件在哪個(gè)分區(qū)上。
設(shè)計(jì)存儲(chǔ)時(shí)的重點(diǎn)考慮點(diǎn)
在設(shè)計(jì)command, event的存儲(chǔ)時(shí),我認(rèn)為主要考慮的應(yīng)該是提高整體的吞吐量,而不是追求單機(jī)存儲(chǔ)的性能。因?yàn)榧偃缥覀兊南到y(tǒng)平均每秒產(chǎn)生1W個(gè)事件,那一天就是8.64億個(gè)事件。已經(jīng)是很大的數(shù)據(jù)量。所以,我們必須要對(duì)command, event這種進(jìn)行分片。比如我們?cè)O(shè)計(jì)864個(gè)表,那每個(gè)表每天產(chǎn)生100W條記錄,這是在可以接受的范圍內(nèi)。然后,我們一旦分了864個(gè)表了,肯定會(huì)把它們分布在不同的物理數(shù)據(jù)庫上。這樣就是多個(gè)物理數(shù)據(jù)庫同時(shí)提供存儲(chǔ)服務(wù),可以整體提高存儲(chǔ)的吞吐量。我個(gè)人比較傾向于使用MySQL來存儲(chǔ)即可,因?yàn)橐环矫鍹ySQL是開源的,各種分庫分表的成熟做法比較多。另一方面,關(guān)系型數(shù)據(jù)庫相比Mongodb這種,自己更熟悉,能更好的控制。比如數(shù)據(jù)擴(kuò)容方案可以自己做,不像MongoDB這種,雖然它都幫我們搞定了大數(shù)據(jù)存儲(chǔ),但一旦出了問題,也許自己無法掌控。另一方面,關(guān)于RT,即單條數(shù)據(jù)存儲(chǔ)時(shí)的響應(yīng)時(shí)間,這個(gè)我覺得不管是關(guān)系型數(shù)據(jù)庫還是NoSQL,最終的瓶頸都是在磁盤IO。NoSQL之所以這么快,無非就是異步刷盤;而關(guān)系型DB不是很快,因?yàn)樗WC數(shù)據(jù)的落地,要保證數(shù)據(jù)的更高級(jí)別的可靠性。所以,我覺得,要在保證數(shù)據(jù)不會(huì)丟失的情況下,盡量提高RT,可以考慮使用SSD硬盤。另一方面,我覺得由于我們已經(jīng)做了分庫分表了,所以單個(gè)DB的壓力不會(huì)太大,所以一般局域網(wǎng)內(nèi)的RT也不會(huì)延遲很大,應(yīng)該可以接受。
聚合根的內(nèi)存模式(In-Memory)
In-Memory模式也是一種減少網(wǎng)絡(luò)IO的一種設(shè)計(jì),通過讓所有生命周期還沒結(jié)束的聚合根一直常駐在內(nèi)存,從而實(shí)現(xiàn)當(dāng)我們要修改某個(gè)聚合根時(shí),不必再像傳統(tǒng)的方式那樣,先從db獲取聚合根,再更新,完成后再保存到db了。而是聚合根一直在內(nèi)存,當(dāng)Command Handler要修改某個(gè)聚合根時(shí),直接從內(nèi)存拿到該聚合根對(duì)象即可,不需要任何序列化反序列化或IO的操作。基于ES模式,我們不需要直接保存聚合根,而是只要簡(jiǎn)單的保存聚合根產(chǎn)生的事件即可。當(dāng)服務(wù)器斷電要恢復(fù)聚合根時(shí),則只要用事件溯源(Event Sourcing, ES)的方式恢復(fù)聚合根到***狀態(tài)即可。
了解過actor的人應(yīng)該也知道actor也是整個(gè)集群中就一個(gè)實(shí)例,然后每個(gè)actor自己都有一個(gè)mailbox,這個(gè)mailbox用于存放當(dāng)前actor要處理的所有的消息。只要服務(wù)器不斷電,那actor就一直存活在內(nèi)存。所以,In-Memory模式也是actor的一個(gè)設(shè)計(jì)思想之一。像之前很轟動(dòng)的國外的一個(gè)LMAX架構(gòu),號(hào)稱每秒單機(jī)單核可以處理600W訂單,也是完全基于in-memory模式。不過LMAX架構(gòu)我覺得只要作為學(xué)習(xí)即可,要大范圍使用,還是有很多問題要解決,老外他們使用這種架構(gòu)來處理訂單,也是基于特定場(chǎng)景的,并且對(duì)編程(代碼質(zhì)量)和運(yùn)維的要求都非常高。具體有興趣的可以去搜一下相關(guān)資料。
關(guān)于in-memory架構(gòu),想法是好的,通過將所有數(shù)據(jù)都放在內(nèi)存,所有持久化都異步進(jìn)行。也就是說,內(nèi)存的數(shù)據(jù)才是***的,db的數(shù)據(jù)是異步持久化的,也就是某個(gè)時(shí)刻,內(nèi)存中有些數(shù)據(jù)可能還沒有被持久化到db。當(dāng)然,如果你說你的程序不需要持久化數(shù)據(jù),那另當(dāng)別論了。那如果是異步持久化,主要的問題就是宕機(jī)恢復(fù)的問題了。我們看一下akka框架是怎么持久化akka的狀態(tài)的吧。
- 多個(gè)消息同時(shí)發(fā)送給actor時(shí),全部會(huì)先放入該actor的mailbox里排隊(duì);
- 然后actor單線程從mailbox順序消費(fèi)消息;
- 消費(fèi)一個(gè)后產(chǎn)生事件;
- 持久化事件,akka-persistence也是采用了ES的方式持久化;
- 持久化完成后,更新actor的狀態(tài);
- 更新狀態(tài)完成后,再處理mailbox中的下一個(gè)消息;
從上面的過程,我們可以看出,akka框架本質(zhì)上也實(shí)現(xiàn)了避免資源競(jìng)爭(zhēng)的原則,因?yàn)槊總€(gè)actor是單線程處理它的mailbox中的每個(gè)消息的,從而就避免了并發(fā)沖突。然后我們可以看到akka框架也是先持久化事件之后,再更新actor的狀態(tài)的。這說明,akka采用的也叫保守的方式,即必須先確保數(shù)據(jù)落地,再更新內(nèi)存,再處理下一個(gè)消息。真正理想的in-memory架構(gòu),應(yīng)該是可以忽略持久化,當(dāng)actor處理完一個(gè)消息后,立即修改自己的狀態(tài),然后立即處理下一個(gè)消息。然后actor產(chǎn)生的事件的持久化,完全是異步的;也就是不用等待持久化事件完成后再更新actor的狀態(tài),然后處理下一個(gè)消息。
我認(rèn)為,是不是異步持久化不重要,因?yàn)榧热淮蠹叶家媾R一個(gè)問題,就是要在宕機(jī)后,恢復(fù)actor的狀態(tài),那持久化事件是不可避免的。所以,我也是認(rèn)為,事件不必異步持久化,完全可以像akka框架那樣,產(chǎn)生的事件先同步持久化,完成后再更新actor的狀態(tài)即可。這樣做,在宕機(jī)恢復(fù)actor的狀態(tài)到***時(shí),就只要簡(jiǎn)單的從db獲取所有事件,然后通過ES得到actor***狀態(tài)即可。然后如果擔(dān)心事件同步持久化有性能瓶頸,那這個(gè)總是不可避免,這塊不做好,那整個(gè)系統(tǒng)的性能就上不去,所以我們可以采用SSD,sharding, Group Commit, NoSQL等方法,優(yōu)化持久化的性能即可。當(dāng)然,如果采用異步持久化事件的方式,確實(shí)能大大提高actor的處理性能。但是要做到這點(diǎn),還需要有一些前提的。比如要確保整個(gè)集群中一個(gè)actor只有一個(gè)實(shí)例,不能有兩個(gè)一樣的actor在工作。因?yàn)槿绻霈F(xiàn)這種情況,那這兩個(gè)一樣的actor就會(huì)同時(shí)產(chǎn)生事件,導(dǎo)致***事件持久化的時(shí)候必定會(huì)出現(xiàn)并發(fā)沖突(事件版本號(hào)相同)的問題。但要保證急群眾一個(gè)actor只有一個(gè)實(shí)例,是很困難的,因?yàn)槲覀兛赡軙?huì)動(dòng)態(tài)往集群中增加服務(wù)器,此時(shí)必定會(huì)有一些actor要遷移到新服務(wù)器。這個(gè)遷移過程也很復(fù)雜,一個(gè)actor從原來的服務(wù)器遷移到新的服務(wù)器,意味著要先停止原服務(wù)器的actor的工作。然后還要把a(bǔ)ctor再新服務(wù)器上啟動(dòng);然后原服務(wù)器上的actor的mailbox中的消息還要發(fā)給新的actor,然后后續(xù)可能還在發(fā)給原actor的消息也要轉(zhuǎn)發(fā)到新的actor。然后新的actor重啟也很復(fù)雜,因?yàn)橐_保啟動(dòng)之后的actor的狀態(tài)一定是***的,而我們知道這種純in-memory模式下,事件的持久化時(shí)異步的,所以可能還有一些事件還在消息隊(duì)列,還沒被持久化。所以重啟actor時(shí)還要檢查消息隊(duì)列中是否還有未消費(fèi)的事件。如果還有,就需要等待。否則,我們恢復(fù)的actor的狀態(tài)就不是***的,這樣就無法保證內(nèi)存數(shù)據(jù)是***的這個(gè)目的,這樣in-memory也就失去了意義。這些都是麻煩的技術(shù)問題??傊?,要實(shí)現(xiàn)真正的in-memory架構(gòu),沒那么容易。當(dāng)然,如果你說你可以用數(shù)據(jù)網(wǎng)格之類的產(chǎn)品,無分布式,那也許可行,不過這是另外一種架構(gòu)了。
上面說了,akka框架的核心工作原理,以及其他一些方面,比如akka會(huì)確保一個(gè)actor實(shí)例在集群中只有一個(gè)。這點(diǎn)其實(shí)也是和本文說的一樣,也是為了避免資源競(jìng)爭(zhēng),包括它的mailbox也是一樣。之前我設(shè)計(jì)ENode時(shí),沒了解過akka框架,后來我學(xué)習(xí)后,發(fā)現(xiàn)和ENode的思想是如此接近,呵呵。比如:1)都是集群中只有一個(gè)聚合根實(shí)例;2)都對(duì)單個(gè)聚合根的操作的Command做排隊(duì)處理;3)都采用ES的方式進(jìn)行狀態(tài)持久化;4)都是基于消息驅(qū)動(dòng)的架構(gòu)。雖然實(shí)現(xiàn)方式有所區(qū)別,但目的都是相同的。
小結(jié)
本文,從CQRS+Event Sourcing的架構(gòu)出發(fā),結(jié)合實(shí)現(xiàn)高性能的幾個(gè)要注意的點(diǎn)(避開網(wǎng)絡(luò)開銷(IO),避開海量數(shù)據(jù),避開資源爭(zhēng)奪),分析了這種架構(gòu)下,我所想到的一些可能的設(shè)計(jì)。整個(gè)架構(gòu)中,一個(gè)Command在被處理時(shí),一般是需要做兩次IO,1)持久化Command;2)持久化事件;當(dāng)然,這里沒有算上消息的發(fā)送和接收的IO。整個(gè)架構(gòu)完全基于消息驅(qū)動(dòng),所以擁有一個(gè)穩(wěn)定可擴(kuò)展高性能的分布式消息隊(duì)列中間件是比不可少的,EQueue正是在向這個(gè)目標(biāo)努力的一個(gè)成果。目前EQueue的TCP通信層,可以做到發(fā)送100W消息,在一臺(tái)i7 CPU的普通機(jī)器上,只需3s;有興趣的同學(xué)可以看一下。***,ENode框架就是按照本文中所說的這些設(shè)計(jì)來實(shí)現(xiàn)的,有興趣的朋友歡迎去下載并和我交流哦!