電商真實對賬系統(tǒng)是如何設(shè)計并優(yōu)化的
前言
往期文章在熱點數(shù)據(jù)如何更新的一篇文章中有提到對賬系統(tǒng)。其實我在實際業(yè)務(wù)場景中是有遇到過類似對賬的優(yōu)化問題的。說優(yōu)化之前要掌握一點就是一定要掌握Java并發(fā)包的相關(guān)特性。本章節(jié)對此有很大依賴。
- 熱點數(shù)據(jù)高效更新文章:
inventory hint,解決熱點數(shù)據(jù)如何高效更新
Java并發(fā)包簡說
CountDownLatch和CyclicBarrier
區(qū)別
CountDownLatch:
- 不可以重復(fù)使用,計數(shù)器無法被重置
經(jīng)典案例比如門衛(wèi)休息休要等所有人下班才可以關(guān)門休息 CyclicBarrier:
- 可以重復(fù)使用,是一個同步輔助類,允許一組線程相互等待,直到到達某個公共屏障點(common barrier point)。假設(shè)設(shè)計一組固定大小的線程的程序中,這些線程必須不是的互相等待,此時就可以使用CyclicBarrier。因為該barrier在釋放等待線程后可以重用,從而稱之為循環(huán)的barrier。
經(jīng)典案例:比如運動員跑步,需要所有人準(zhǔn)備好之后裁判才可以發(fā)令讓大家在同一時刻去跑。有依賴關(guān)系
案例
有一天,老大匆忙趕來,提到對賬系統(tǒng)最近變得越來越緩慢,希望能迅速進行優(yōu)化。經(jīng)過了解對賬系統(tǒng)的業(yè)務(wù)流程,發(fā)現(xiàn)其實相當(dāng)簡單:用戶通過在線商城下單會生成電子訂單并存儲在訂單數(shù)據(jù)庫中;隨后物流會生成派送單用于發(fā)貨,派送單則保存在派送單庫中。為了避免漏發(fā)或重復(fù)派送,對賬系統(tǒng)每天會核查是否存在異常訂單。
目前對賬系統(tǒng)的處理邏輯很簡單:首先查詢訂單,然后查詢派送單,接著比對訂單和派送單,將差異記錄寫入差異庫。對賬系統(tǒng)的核心代碼經(jīng)過抽象后,也并不復(fù)雜,主要是在單線程中循環(huán)執(zhí)行訂單和派送單的查詢,進行對賬操作,最后將結(jié)果寫入差異庫。
偽代碼
while(存在未對賬訂單){
// 查詢未對賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
利用Java并行優(yōu)化對賬系統(tǒng)
首先要找出對賬系統(tǒng)的瓶頸所在。目前,由于訂單量和派送單量龐大,導(dǎo)致查詢未對賬訂單 getPOrders() 和查詢派送單 getDOrders() 的速度較慢。是否有一種快速優(yōu)化的方法呢?目前對賬系統(tǒng)是單線程執(zhí)行的。對于這樣的串行系統(tǒng),優(yōu)化性能的第一個想法是能否利用多線程并行處理。
因此,我們可以看出對賬系統(tǒng)的瓶頸在哪里:查詢未對賬訂單 getPOrders() 和查詢派送單 getDOrders() 是否能夠并行處理呢?很顯然,這兩個操作之間并沒有依賴關(guān)系。將這兩個耗時操作并行化后,與單線程執(zhí)行相比,您會發(fā)現(xiàn)在相同時間段內(nèi),并行執(zhí)行的吞吐量接近單線程的兩倍,優(yōu)化效果頗為明顯。
有了這個思路,接下來我們看看如何用代碼實現(xiàn)。在下面的代碼中,我們創(chuàng)建了兩個線程 T1 和 T2,分別并行執(zhí)行查詢未對賬訂單 getPOrders() 和查詢派送單 getDOrders() 的操作。主線程則負責(zé)執(zhí)行對賬操作 check()和將差異寫入 save() 的操作。值得注意的是:主線程需要等待線程 T1 和 T2 執(zhí)行完畢后才能執(zhí)行 check() 和 save() 這兩個操作。為此,我們通過調(diào)用 T1.join() 和 T2.join() 實現(xiàn)等待,當(dāng)線程 T1 和 T2 結(jié)束時,調(diào)用了 T1.join() 和 T2.join() 的主線程將從阻塞狀態(tài)中解除,隨后執(zhí)行后續(xù)的 check() 和 save() 操作。
偽代碼
while(存在未對賬訂單){
// 查詢未對賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 結(jié)束
T1.join();
T2.join();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
用 CountDownLatch 實現(xiàn)線程等待
經(jīng)過上述優(yōu)化,基本上可以向老板報告工作完成了,但仍有一些遺憾之處。我相信您也已經(jīng)注意到了,在 while 循環(huán)中每次都會創(chuàng)建新的線程,而創(chuàng)建線程是一個耗時的操作。因此,最好能夠重復(fù)利用已創(chuàng)建的線程。您想到了線程池,確實,線程池能夠解決這個問題。
通過線程池進行優(yōu)化后:我們首先創(chuàng)建了一個固定大小為2的線程池,并在 while 循環(huán)中重復(fù)利用這些線程。一切看起來都進行得很順利,但似乎有一個問題無法解決,即主線程如何知道 getPOrders() 和 getDOrders() 這兩個操作何時執(zhí)行完成。在前面的方案中,主線程通過調(diào)用線程 T1 和 T2 的 join() 方法來等待它們退出,但是在線程池方案中,線程根本就不會退出,因此 join() 方法失效了。
那么,如何解決這個問題呢?您可以想出許多方法,其中最直接的方法是使用一個計數(shù)器。將其初始值設(shè)為2,執(zhí)行完 pos = getPOrders(); 后減 1,執(zhí)行完 dos = getDOrders(); 后也減 1。主線程在這之后等待計數(shù)器等于0;當(dāng)計數(shù)器等于0時,說明這兩個查詢操作已執(zhí)行完畢。等待計數(shù)器為0實際上是一種條件變量,使用管程實現(xiàn)起來也并不復(fù)雜。
然而,我并不建議在實際項目中實施上述方案,因為Java并發(fā)包中已經(jīng)提供了類似功能的工具類:CountDownLatch,我們直接使用即可。在下面的代碼示例中,我們在 while 循環(huán)中首先創(chuàng)建了一個CountDownLatch,計數(shù)器的初始值為2。在 pos = getPOrders(); 和 dos = getDOrders(); 兩個語句后,通過調(diào)用 latch.countDown(); 實現(xiàn)對計數(shù)器的減1操作。在主線程中,通過調(diào)用 latch.await(); 實現(xiàn)對計數(shù)器等于0的等待。
偽代碼
// 創(chuàng)建 2 個線程的線程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未對賬訂單){
// 計數(shù)器初始化為 2
CountDownLatch latch =
new CountDownLatch(2);
// 查詢未對賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待兩個查詢操作結(jié)束
latch.await();
// 執(zhí)行對賬操作
diff = check(pos, dos);
// 差異寫入差異庫
save(diff);
}
進一步優(yōu)化性能
經(jīng)過以上一系列的優(yōu)化,終于可以松一口氣,準(zhǔn)備交付項目。然而,在交付之前,再次審視一番是值得的,或許還存在優(yōu)化的空間。
前面我們已經(jīng)實現(xiàn)了將 getPOrders() 和 getDOrders() 這兩個查詢操作并行化,但是這兩個查詢操作與對賬操作 check() 和 save() 之間仍然是串行執(zhí)行的。很顯然,這兩個查詢操作與對賬操作也可以并行執(zhí)行,即在執(zhí)行對賬操作的同時可以進行下一輪的查詢操作。
接下來,我們再思考如何實現(xiàn)這一優(yōu)化。兩次查詢操作能夠與對賬操作并行執(zhí)行,而對賬操作又依賴于查詢操作的結(jié)果,這明顯具有生產(chǎn)者-消費者模型的特征。兩次查詢操作充當(dāng)生產(chǎn)者,對賬操作為消費者。為了實現(xiàn)這種模型,我們需要一個隊列來存儲生產(chǎn)者產(chǎn)生的數(shù)據(jù),消費者則從隊列中取出數(shù)據(jù)執(zhí)行相應(yīng)操作。
針對這個對賬項目,我設(shè)計了兩個隊列,其元素之間存在對應(yīng)關(guān)系。具體來說,訂單查詢操作將訂單查詢結(jié)果插入訂單隊列,派送單查詢操作將派送單插入派送單隊列,這兩個隊列的元素之間是一一對應(yīng)的。使用兩個隊列的好處在于,對賬操作可以每次從訂單隊列取出一個元素和派送單隊列中取出一個元素,然后執(zhí)行對賬操作,確保數(shù)據(jù)的一致性。
接下來,讓我們看看如何通過雙隊列實現(xiàn)完全并行化。一個直接的思路是:一個線程 T1 執(zhí)行訂單查詢工作,另一個線程 T2 執(zhí)行派送單查詢工作。當(dāng)線程 T1 和 T2 都各自生產(chǎn)完一條數(shù)據(jù)時,通知線程 T3 執(zhí)行對賬操作。這一想法看似簡單,實際上仍然存在一個條件:T1 和 T2 的工作節(jié)奏必須一致,保持同步,否則一個快一個慢將影響各自生產(chǎn)數(shù)據(jù)并通知 T3 的過程。
只有在T1和T2各自生產(chǎn)完一條數(shù)據(jù)時才能繼續(xù)執(zhí)行,也就是說,T1和T2需要相互等待,保持步調(diào)一致。同時,當(dāng)T1和T2都生產(chǎn)完一條數(shù)據(jù)時,還需能夠通知T3執(zhí)行對賬操作。
用 CyclicBarrier 實現(xiàn)線程同步
接下來我們將實現(xiàn)上述方案中提到的方法。該方案的難點在于兩個方面:一是確保線程 T1 和 T2 的步調(diào)一致,二是能夠有效通知線程 T3。
在解決這兩個難點的過程中,仍然可以利用一個計數(shù)器。將計數(shù)器初始化為2,每當(dāng)線程 T1 和 T2 生產(chǎn)完一條數(shù)據(jù)時,都將計數(shù)器減1。若計數(shù)器大于0,則線程 T1 或 T2需要等待。當(dāng)計數(shù)器等于0時,通知線程 T3,喚醒等待的線程 T1 或 T2,并將計數(shù)器重置為2。如此,線程 T1 和 T2 在生產(chǎn)下一條數(shù)據(jù)時,可以繼續(xù)使用這個計數(shù)器。
建議不要在實際項目中直接實現(xiàn)這一邏輯,因為Java并發(fā)包中已經(jīng)提供了相關(guān)的工具類:CyclicBarrier。在下面的代碼中,我們首先創(chuàng)建了一個初始值為2的CyclicBarrier計數(shù)器。需要注意的是,在創(chuàng)建CyclicBarrier時,傳入了一個回調(diào)函數(shù)。當(dāng)計數(shù)器減至0時,該回調(diào)函數(shù)會被調(diào)用。
線程 T1 負責(zé)查詢訂單,每查到一條數(shù)據(jù),調(diào)用barrier.await()將計數(shù)器減1,并等待計數(shù)器變?yōu)?。線程 T2 負責(zé)查詢派送單,處理方式與線程 T1 類似。當(dāng) T1 和 T2 都調(diào)用barrier.await()時,計數(shù)器會減至0,此時 T1 和 T2可以繼續(xù)執(zhí)行下一步操作,并調(diào)用barrier的回調(diào)函數(shù)執(zhí)行對賬操作。
值得一提的是,CyclicBarrier的計數(shù)器具有自動重置功能。當(dāng)計數(shù)器減至0時,會自動重新設(shè)定為您設(shè)置的初始值。這一特性確實方便實用。
偽代碼
// 訂單隊列
Vector<P> pos;
// 派送單隊列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 執(zhí)行對賬操作
diff = check(p, d);
// 差異寫入差異庫
save(diff);
}
void checkAll(){
// 循環(huán)查詢訂單庫
Thread T1 = new Thread(()->{
while(存在未對賬訂單){
// 查詢訂單庫
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循環(huán)查詢運單庫
Thread T2 = new Thread(()->{
while(存在未對賬訂單){
// 查詢運單庫
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
CountDownLatch 和 CyclicBarrier 是Java并發(fā)包提供的兩個非常便捷的線程同步工具類。在這里,有必要再次強調(diào)它們之間的區(qū)別:CountDownLatch 主要用于解決一個線程等待多個線程的情況,可以類比于旅游團團長必須等待所有游客齊集后才能繼續(xù)前行;而CyclicBarrier 則是一組線程相互等待,有點像幾個驢友之間的互助合作。此外,CountDownLatch 的計數(shù)器不支持重復(fù)利用,即一旦計數(shù)器降至0,后續(xù)調(diào)用await()的線程將直接通過。相比之下,CyclicBarrier 的計數(shù)器可以循環(huán)利用,同時具有自動重置功能,一旦計數(shù)器減至0,將會自動重置為設(shè)定的初始值。此外,CyclicBarrier 還支持設(shè)置回調(diào)函數(shù),功能更加豐富。