一套萬能的異步處理方案(典藏版)
兄弟們,今天咱們來聊個(gè)能讓你代碼從 “卡成 PPT” 變 “飛一般絲滑” 的話題 —— 異步處理。
我敢打賭,你肯定遇到過這種情況:寫了個(gè)接口,邏輯里又要查數(shù)據(jù)庫、又要調(diào)第三方接口、還要算一堆數(shù)據(jù),跑起來那叫一個(gè)慢,用戶在前端戳半天沒反應(yīng),老板在后面盯著問 “怎么回事啊,是不是服務(wù)器該換了?”,結(jié)果你排查半天發(fā)現(xiàn),不是服務(wù)器不行,是代碼里全是 “同步等待” 的坑 —— 查完 A 等 A,查完 B 等 B,CPU 閑著沒事干也不幫你多跑點(diǎn)活,這不純純浪費(fèi)資源嘛!
所以今天這篇,我不整那些 “異步非阻塞 IO 原理”“線程模型深度剖析” 的虛頭巴腦,就給你一套能直接抄作業(yè)、覆蓋 80% 業(yè)務(wù)場景的異步處理方案,從基礎(chǔ)到實(shí)戰(zhàn),從坑點(diǎn)到避坑,保證你看完就能用,用了就見效。
一、先搞懂:為啥咱們非得用異步?
在講方案之前,先掰扯清楚 “異步” 到底是個(gè)啥,以及為啥它能救你的慢接口。
你可以把代碼執(zhí)行想象成去奶茶店買喝的:
- 同步就是你點(diǎn)完單,站在柜臺前一動不動等,不管店員做沒做你的,你都得等著,期間啥也干不了 —— 這就是咱們平時(shí)寫的doA(); doB(); doC();,A 沒干完,B 絕對不開始,哪怕 A 是在等數(shù)據(jù)庫返回(店員做奶茶),CPU(你)也只能閑著。
- 異步就是你點(diǎn)完單拿個(gè)取餐碼,然后找個(gè)位置刷手機(jī)(干別的活),等店員喊你(任務(wù)完成),你再過去拿 —— 對應(yīng)到代碼里,就是發(fā)起一個(gè)任務(wù)后,不用等著它結(jié)束,繼續(xù)執(zhí)行別的代碼,等任務(wù)有結(jié)果了再回來處理。
那異步能解決啥問題?舉個(gè)真實(shí)場景:
比如你寫個(gè) “訂單詳情接口”,要做三件事:
- 查訂單基本信息(DB,100ms)
- 查訂單對應(yīng)的商品列表(DB,150ms)
- 查用戶的收貨地址(調(diào)用用戶服務(wù)接口,200ms)
如果用同步,總耗時(shí)就是 100+150+200=450ms,這還沒算其他邏輯,接口響應(yīng)時(shí)間輕松破 500ms,用戶體驗(yàn)直接拉胯。
但如果用異步,這三件事可以同時(shí)跑,總耗時(shí)差不多就是最長的那個(gè) 200ms,直接把響應(yīng)時(shí)間砍半,CPU 也能充分利用起來 —— 這就是異步的魔力。
不過先別急著興奮,異步雖好,但早年 Java 里的異步方案,坑可不少。
二、那些年我們踩過的異步 “坑”
最早咱們想搞異步,第一反應(yīng)就是new Thread(),對吧?比如這樣:
// 同步代碼
public void syncOrderDetail() {
Order order = orderDao.getById(1L); // 100ms
List<Goods> goodsList = goodsDao.getByOrderId(1L); // 150ms
Address address = userService.getAddress(1L); // 200ms
// 組裝數(shù)據(jù)返回
}
// 想搞異步,就new Thread
public void asyncBadExample() {
Order[] order = new Order[1]; // 用數(shù)組存,因?yàn)槟涿麅?nèi)部類要final
List<Goods>[] goodsList = new List[1];
Address[] address = new Address[1];
// 查訂單
new Thread(() -> order[0] = orderDao.getById(1L)).start();
// 查商品
new Thread(() -> goodsList[0] = goodsDao.getByOrderId(1L)).start();
// 查地址
new Thread(() -> address[0] = userService.getAddress(1L)).start();
// 等結(jié)果
while (order[0] == null || goodsList[0] == null || address[0] == null) {
// 空循環(huán)等,CPU直接干燒
}
// 組裝數(shù)據(jù)
}你瞅瞅這代碼,問題一大堆:
- 線程管不?。好看萎惒骄?new 一個(gè)線程,要是接口并發(fā)高,瞬間幾百上千個(gè)線程,JVM 直接 OOM 給你看 —— 就像奶茶店來了 100 個(gè)客人,每個(gè)客人都讓店員單獨(dú)開個(gè)機(jī)器做奶茶,機(jī)器根本不夠用。
- 拿結(jié)果太費(fèi)勁:用數(shù)組存結(jié)果,還得空循環(huán)等,這叫 “忙等”,CPU 利用率直接拉滿,其他活都沒法干了 —— 相當(dāng)于你刷手機(jī)的時(shí)候,每隔 1 秒就去問店員 “我的奶茶好了嗎”,店員煩,你也沒法專心刷手機(jī)。
- 沒異常處理:要是查商品的時(shí)候數(shù)據(jù)庫崩了,這個(gè)線程直接拋異常死了,外面還不知道,一直在空循環(huán)等,直接超時(shí) —— 就像店員做奶茶的時(shí)候機(jī)器壞了,沒告訴你,你還傻乎乎等半天。
后來 Java 5 出了Future和ExecutorService,算是解決了一部分問題。ExecutorService就是線程池,能幫你管理線程(相當(dāng)于奶茶店固定幾個(gè)機(jī)器,客人多了排隊(duì),不瞎開機(jī)器);Future能幫你拿結(jié)果,還能判斷任務(wù)是否完成。
比如這樣改:
// 先搞個(gè)線程池
private ExecutorService executor = Executors.newFixedThreadPool(3);
public void asyncWithFuture() throws ExecutionException, InterruptedException {
// 提交任務(wù),返回Future
Future<Order> orderFuture = executor.submit(() -> orderDao.getById(1L));
Future<List<Goods>> goodsFuture = executor.submit(() -> goodsDao.getByOrderId(1L));
Future<Address> addressFuture = executor.submit(() -> userService.getAddress(1L));
// 拿結(jié)果(這里會阻塞,直到任務(wù)完成)
Order order = orderFuture.get();
List<Goods> goodsList = goodsFuture.get();
Address address = addressFuture.get();
// 組裝數(shù)據(jù)
}比new Thread()強(qiáng)多了,線程有池管著,也不用空循環(huán)了,但還是有坑:
- get () 方法會阻塞:雖然三個(gè)任務(wù)是同時(shí)跑的,但orderFuture.get()會等到訂單查完才執(zhí)行下一句,要是訂單查了 100ms,商品查了 150ms,那goodsFuture.get()其實(shí)還要等 50ms—— 相當(dāng)于你先問 “我的奶茶好了嗎”,等拿到奶茶,再問 “我的薯?xiàng)l好了嗎”,但其實(shí)薯?xiàng)l早好了,你白等了。
- 沒法鏈?zhǔn)秸{(diào)用:如果查完訂單,需要用訂單里的用戶 ID 查用戶信息,再用用戶信息查會員等級,用Future就得嵌套,代碼越寫越丑,跟回調(diào)地獄似的。
- 異常處理麻煩:得用 try-catch 包著get(),要是三個(gè)任務(wù)都可能拋異常,代碼里全是 try-catch,看著就頭疼。
直到 Java 8 出了CompletableFuture,才算把這些坑都填上了 —— 這玩意兒就是咱們今天這套 “萬能方案” 的核心,相當(dāng)于給異步處理裝了個(gè) “自動擋”,好用到飛起。
三、核心武器:CompletableFuture 詳解(干貨密集區(qū))
CompletableFuture本質(zhì)上是Future的增強(qiáng)版,它解決了Future的阻塞、沒法鏈?zhǔn)秸{(diào)用、異常難處理的問題,還支持多個(gè)異步任務(wù)的組合,簡直是為實(shí)戰(zhàn)而生。
咱們先從基礎(chǔ)用法開始,再講進(jìn)階技巧,最后結(jié)合場景落地。
3.1 基礎(chǔ):怎么用 CompletableFuture 發(fā)起異步任務(wù)?
CompletableFuture提供了幾個(gè)靜態(tài)方法來發(fā)起異步任務(wù),最常用的是這兩個(gè):
- supplyAsync(Supplier<U> supplier):適合有返回值的任務(wù),比如查數(shù)據(jù)庫、調(diào)接口。
- runAsync(Runnable runnable):適合沒返回值的任務(wù),比如寫日志、發(fā)通知。
這倆方法都可以傳一個(gè)Executor(線程池),如果不傳,就用默認(rèn)的ForkJoinPool.commonPool()—— 但強(qiáng)烈不建議用默認(rèn)線程池,因?yàn)槟J(rèn)線程池的線程數(shù)是 CPU 核心數(shù),要是任務(wù)是 IO 密集型(比如調(diào)接口、查數(shù)據(jù)庫),線程會經(jīng)常等 IO,導(dǎo)致任務(wù)堆積,后面再說為啥。
先寫個(gè)基礎(chǔ)例子,還是之前的訂單詳情場景:
// 自定義線程池(后面詳細(xì)講怎么配)
private ExecutorService orderExecutor = new ThreadPoolExecutor(
4, // 核心線程數(shù)
8, // 最大線程數(shù)
60L, // 空閑線程存活時(shí)間
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 隊(duì)列
new ThreadFactoryBuilder().setNameFormat("order-async-%d").build(), // 線程名
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
public void asyncWithCompletableFuture() {
// 1. 異步查訂單
CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查訂單");
return orderDao.getById(1L);
}, orderExecutor);
// 2. 異步查商品(不用等訂單,直接跑)
CompletableFuture<List<Goods>> goodsFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查商品");
return goodsDao.getByOrderId(1L);
}, orderExecutor);
// 3. 異步查地址(需要訂單里的userId,所以等訂單查完再跑)
CompletableFuture<Address> addressFuture = orderFuture.thenCompose(order -> {
// thenCompose:用前一個(gè)任務(wù)的結(jié)果,發(fā)起新的異步任務(wù)
return CompletableFuture.supplyAsync(() -> {
System.out.println("線程" + Thread.currentThread().getName() + ":查地址,userId=" + order.getUserId());
return userService.getAddress(order.getUserId());
}, orderExecutor);
});
// 4. 等所有任務(wù)完成,組裝結(jié)果(不阻塞主線程,用whenComplete處理結(jié)果)
CompletableFuture.allOf(orderFuture, goodsFuture, addressFuture)
.whenComplete((v, e) -> {
// v是allOf的返回值,因?yàn)閍llOf沒有返回值,所以v是null
// e是異常,如果三個(gè)任務(wù)中有一個(gè)拋異常,e就是那個(gè)異常
if (e != null) {
System.out.println("異步任務(wù)出錯了:" + e.getMessage());
return;
}
// 拿結(jié)果(這里get()不會阻塞,因?yàn)閍llOf已經(jīng)確保任務(wù)完成了)
try {
Order order = orderFuture.get();
List<Goods> goodsList = goodsFuture.get();
Address address = addressFuture.get();
// 組裝數(shù)據(jù)
OrderDetailVO vo = new OrderDetailVO(order, goodsList, address);
System.out.println("組裝完成:" + vo);
} catch (Exception ex) {
// 這里一般不會拋異常,因?yàn)榍懊鎍llOf已經(jīng)處理了
ex.printStackTrace();
}
});
// 主線程繼續(xù)干別的,不用等上面的任務(wù)完成
System.out.println("主線程:異步任務(wù)已發(fā)起,我先溜了~");
}這段代碼里有幾個(gè)關(guān)鍵點(diǎn),咱們掰開揉碎了說:
- thenCompose 方法:它的作用是 “用前一個(gè)異步任務(wù)的結(jié)果,發(fā)起一個(gè)新的異步任務(wù)”,而且返回的是CompletableFuture,這樣就能鏈?zhǔn)秸{(diào)用,避免嵌套。比如查地址需要訂單里的 userId,所以得等訂單查完,用thenCompose就很優(yōu)雅,要是用Future,就得寫成:
// Future的嵌套寫法,丑哭
Future<Order> orderFuture = executor.submit(() -> orderDao.getById(1L));
Order order = orderFuture.get();
Future<Address> addressFuture = executor.submit(() -> userService.getAddress(order.getUserId()));- allOf 方法:它的作用是 “等待所有異步任務(wù)完成”,返回的是一個(gè)CompletableFuture<Void>(沒有返回值)。適合多個(gè)任務(wù)都完成后再做后續(xù)處理的場景,比如訂單詳情里的三個(gè)任務(wù)都完成了,才能組裝 VO。
還有個(gè)類似的方法叫anyOf,是 “只要有一個(gè)任務(wù)完成就觸發(fā)”,適合比如 “查商品信息,先查本地緩存,再查遠(yuǎn)程服務(wù),哪個(gè)快用哪個(gè)” 的場景。
- whenComplete 方法:它是 “任務(wù)完成后觸發(fā)的回調(diào)”,不管任務(wù)是成功還是失敗,都會執(zhí)行。里面的v是任務(wù)的返回值(allOf 的話 v 是 null),e是異常,如果任務(wù)成功,e 是 null;如果失敗,e 就是拋出的異常。
這里要注意:whenComplete不會阻塞主線程,所以主線程會先打印 “我先溜了~”,等異步任務(wù)都完成了,再執(zhí)行回調(diào)里的組裝邏輯 —— 這才是真正的異步非阻塞。
3.2 進(jìn)階:CompletableFuture 的鏈?zhǔn)秸{(diào)用和異常處理
CompletableFuture的鏈?zhǔn)秸{(diào)用是它的核心優(yōu)勢,除了前面的thenCompose,還有幾個(gè)常用的方法,咱們用一個(gè)場景串起來:
比如 “查完訂單后,計(jì)算訂單的優(yōu)惠金額,然后根據(jù)優(yōu)惠金額和商品列表,計(jì)算最終實(shí)付金額”:
// 鏈?zhǔn)秸{(diào)用示例
CompletableFuture<BigDecimal> payAmountFuture = CompletableFuture.supplyAsync(() -> {
// 1. 查訂單
System.out.println("查訂單");
return orderDao.getById(1L);
}, orderExecutor)
.thenApply(order -> {
// 2. 計(jì)算優(yōu)惠金額(用訂單的金額和用戶等級,同步操作)
System.out.println("計(jì)算優(yōu)惠金額,訂單金額:" + order.getAmount());
BigDecimal discount = calculateDiscount(order); // 同步方法,不用開新線程
order.setDiscount(discount);
return order;
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
// 3. 查商品列表(異步,和計(jì)算優(yōu)惠并行)
System.out.println("查商品列表");
return goodsDao.getByOrderId(1L);
}, orderExecutor), (order, goodsList) -> {
// 4. 結(jié)合訂單(帶優(yōu)惠)和商品列表,計(jì)算實(shí)付金額
System.out.println("計(jì)算實(shí)付金額");
BigDecimal totalGoodsAmount = goodsList.stream()
.map(Goods::getPrice)
.reduce(BigDecimal.ZERO, BigDecimal::add);
// 實(shí)付 = 商品總金額 - 優(yōu)惠
return totalGoodsAmount.subtract(order.getDiscount());
})
.exceptionally(ex -> {
// 5. 異常處理:如果前面任何一步出錯,返回默認(rèn)值0
System.out.println("計(jì)算實(shí)付金額出錯:" + ex.getMessage());
return BigDecimal.ZERO;
});
// 拿結(jié)果(如果想阻塞等結(jié)果,可以用get(),也可以用join(),join()不用拋異常)
BigDecimal payAmount = payAmountFuture.join();
System.out.println("最終實(shí)付金額:" + payAmount);這里又多了幾個(gè)關(guān)鍵方法,咱們一個(gè)個(gè)說:
- thenApply:用前一個(gè)任務(wù)的結(jié)果做同步處理,返回新的結(jié)果。比如計(jì)算優(yōu)惠金額是同步方法,不需要開新線程,就用thenApply—— 它會復(fù)用前一個(gè)任務(wù)的線程,或者在主線程執(zhí)行(如果前一個(gè)任務(wù)已經(jīng)完成)。
- thenCombine:結(jié)合兩個(gè)異步任務(wù)的結(jié)果,做處理后返回新結(jié)果。比如查商品列表是異步的,和計(jì)算優(yōu)惠可以并行,等兩個(gè)都完成了,再計(jì)算實(shí)付金額 —— 這樣比 “等計(jì)算優(yōu)惠完了再查商品” 快多了。
- exceptionally:捕獲前面所有步驟的異常,返回一個(gè)默認(rèn)值。比如前面任何一步出錯(查訂單失敗、查商品失敗、計(jì)算優(yōu)惠失?。?,都會走到這里,返回 0 元,避免整個(gè)流程崩潰。
除了exceptionally,還有個(gè)handle方法也能處理異常,它和exceptionally的區(qū)別是:handle不管成功還是失敗都會執(zhí)行,而exceptionally只在失敗時(shí)執(zhí)行。比如:
.handle((result, ex) -> {
if (ex != null) {
System.out.println("出錯了:" + ex.getMessage());
return BigDecimal.ZERO;
}
return result;
});3.3 關(guān)鍵:超時(shí)控制(避免異步任務(wù) “卡死人”)
異步任務(wù)最怕啥?最怕它一直不完成,比如調(diào)第三方接口,對方服務(wù)掛了,你的任務(wù)就一直阻塞在那,線程被占著不放,最后線程池滿了,整個(gè)服務(wù)都卡了。
所以超時(shí)控制是異步處理的必選項(xiàng),CompletableFuture提供了兩個(gè)方法來做超時(shí):
- orTimeout(long timeout, TimeUnit unit):超時(shí)后拋出TimeoutException。
- completeOnTimeout(U value, long timeout, TimeUnit unit):超時(shí)后返回一個(gè)默認(rèn)值,不拋異常。
比如查地址的時(shí)候,設(shè)置 200ms 超時(shí),超時(shí)了就返回默認(rèn)地址:
CompletableFuture<Address> addressFuture = CompletableFuture.supplyAsync(() -> {
// 模擬調(diào)用第三方接口超時(shí)
try {
Thread.sleep(300); // 睡300ms,超過200ms的超時(shí)時(shí)間
} catch (InterruptedException e) {
e.printStackTrace();
}
return userService.getAddress(1L);
}, orderExecutor)
// 超時(shí)控制:200ms超時(shí),返回默認(rèn)地址
.completeOnTimeout(new Address("默認(rèn)地址:未知"), 200, TimeUnit.MILLISECONDS);
// 或者超時(shí)拋異常
// .orTimeout(200, TimeUnit.MILLISECONDS);這里要注意:超時(shí)后,原來的異步任務(wù)其實(shí)還在跑(比如那個(gè)睡 300ms 的線程),只是CompletableFuture會忽略它的結(jié)果,直接返回默認(rèn)值或拋異常。如果原來的任務(wù)是寫數(shù)據(jù)庫、調(diào)支付接口這種 “有狀態(tài)” 的操作,得自己處理 “任務(wù)超時(shí)但實(shí)際還在執(zhí)行” 的問題,比如用分布式鎖、冪等設(shè)計(jì)。
四、異步的 “基石”:線程池配置(90% 的人都配錯了)
講完CompletableFuture,咱們得聊聊它的 “搭檔”—— 線程池。線程池是異步處理的基石,配置得不好,異步不僅沒效果,還會出大問題。
很多人用線程池,要么直接用Executors.newFixedThreadPool(10),要么用默認(rèn)的ForkJoinPool,這都是坑。咱們先搞懂線程池的核心參數(shù),再講怎么根據(jù)業(yè)務(wù)場景配置。
4.1 線程池的 7 個(gè)核心參數(shù)(必懂)
線程池的核心類是ThreadPoolExecutor,它的構(gòu)造方法有 7 個(gè)參數(shù),每個(gè)參數(shù)都影響線程池的行為,咱們用 “奶茶店” 的例子一個(gè)個(gè)解釋:
new ThreadPoolExecutor(
int corePoolSize, // 核心線程數(shù)
int maximumPoolSize, // 最大線程數(shù)
long keepAliveTime, // 空閑線程存活時(shí)間
TimeUnit unit, // 存活時(shí)間單位
BlockingQueue<Runnable> workQueue, // 任務(wù)隊(duì)列
ThreadFactory threadFactory, // 線程工廠
RejectedExecutionHandler handler // 拒絕策略
);- corePoolSize(核心線程數(shù)):奶茶店的 “常駐店員”,不管有沒有訂單,這些店員都在店里(線程不會被銷毀)。比如核心線程數(shù)設(shè) 4,就是不管忙不忙,都有 4 個(gè)店員在。
- maximumPoolSize(最大線程數(shù)):奶茶店的 “最多能有多少店員”,包括常駐店員和臨時(shí)店員。比如最大線程數(shù)設(shè) 8,就是忙的時(shí)候最多再招 4 個(gè)臨時(shí)店員,總共 8 個(gè)。
- keepAliveTime(空閑線程存活時(shí)間):臨時(shí)店員沒事干的時(shí)候,能在店里待多久。比如設(shè) 60 秒,就是臨時(shí)店員閑了 60 秒還沒活干,就讓他走了(線程被銷毀)。
- unit(時(shí)間單位):keepAliveTime 的單位,比如秒、毫秒。
- workQueue(任務(wù)隊(duì)列):訂單太多,店員忙不過來的時(shí)候,訂單排隊(duì)的地方。比如設(shè) 100,就是最多能排 100 個(gè)訂單,超過了就拒絕。
- threadFactory(線程工廠):用來創(chuàng)建線程的工廠,主要是給線程起個(gè)名字,方便排查問題。比如給線程起名 “order-async-1”“order-async-2”,后面查日志的時(shí)候就知道哪個(gè)線程出的問題。
- handler(拒絕策略):訂單太多,店員忙不過來,隊(duì)列也排滿了,該怎么處理新訂單。有四種默認(rèn)策略:
- CallerRunsPolicy:讓發(fā)起訂單的人自己處理(比如讓顧客自己做奶茶),也就是讓調(diào)用線程執(zhí)行任務(wù),這樣能減緩請求速度,避免任務(wù)丟失。
- AbortPolicy:直接拋異常(RejectedExecutionException),默認(rèn)策略,容易導(dǎo)致業(yè)務(wù)報(bào)錯。
- DiscardPolicy:直接扔掉新訂單,不拋異常,適合不重要的任務(wù)(比如日志)。
- DiscardOldestPolicy:扔掉隊(duì)列里最老的訂單,再把新訂單加進(jìn)去,適合任務(wù)有時(shí)效性的場景(比如實(shí)時(shí)統(tǒng)計(jì))。
4.2 怎么配置?看業(yè)務(wù)場景!
線程池的配置沒有 “萬能值”,但有兩個(gè)核心場景:CPU 密集型和 IO 密集型,配置思路完全不同。
場景 1:CPU 密集型任務(wù)(比如復(fù)雜計(jì)算、數(shù)據(jù)處理)
CPU 密集型任務(wù)的特點(diǎn)是:線程一直在用 CPU 算,很少等 IO(比如查數(shù)據(jù)庫、調(diào)接口)。比如計(jì)算訂單的優(yōu)惠金額、處理 Excel 數(shù)據(jù)。
這種場景下,線程數(shù)太多反而會導(dǎo)致 CPU 頻繁切換線程,效率下降。所以核心線程數(shù)和最大線程數(shù)建議設(shè)為:CPU 核心數(shù) + 1。
比如你的服務(wù)器是 4 核 CPU,就設(shè)為 5。為什么加 1?因?yàn)槿绻硞€(gè)線程偶爾等 IO(比如讀本地文件),多出來的那個(gè)線程能利用 CPU 的空閑時(shí)間,提高利用率。
怎么獲取 CPU 核心數(shù)?用Runtime.getRuntime().availableProcessors()。
場景 2:IO 密集型任務(wù)(比如查數(shù)據(jù)庫、調(diào)第三方接口、發(fā) MQ)
IO 密集型任務(wù)的特點(diǎn)是:線程大部分時(shí)間都在等 IO 完成(比如等數(shù)據(jù)庫返回、等第三方接口響應(yīng)),CPU 大部分時(shí)間是空閑的。比如查訂單、查商品、調(diào)用支付接口。
這種場景下,線程數(shù)可以多設(shè)一點(diǎn),讓 CPU 能充分利用起來。建議核心線程數(shù)設(shè)為:CPU 核心數(shù) * 2,最大線程數(shù)可以設(shè)為 CPU 核心數(shù) * 4,或者根據(jù)實(shí)際并發(fā)調(diào)整。
比如 4 核 CPU,核心線程數(shù)設(shè) 8,最大線程數(shù)設(shè) 16。
場景 3:混合任務(wù)(既有 CPU 密集又有 IO 密集)
如果一個(gè)線程里既有 IO 操作又有 CPU 計(jì)算,建議把任務(wù)拆成兩個(gè):IO 密集的任務(wù)和 CPU 密集的任務(wù),分別用兩個(gè)線程池處理。比如查訂單(IO 密集)用一個(gè)線程池,計(jì)算優(yōu)惠(CPU 密集)用另一個(gè)線程池,這樣能提高效率。
4.3 實(shí)戰(zhàn)配置示例(直接抄)
咱們以 “訂單服務(wù)” 為例,訂單服務(wù)里的異步任務(wù)大多是 IO 密集型(查 DB、調(diào)接口),所以線程池配置如下:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
publicclass ThreadPoolConfig {
// CPU核心數(shù)
privatestaticfinalint CPU_CORES = Runtime.getRuntime().availableProcessors();
// 訂單相關(guān)異步任務(wù)線程池(IO密集型)
@Bean
public Executor orderAsyncExecutor() {
// 線程工廠:給線程起名字
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("order-async-%d") // 線程名:order-async-1, order-async-2...
.setDaemon(false) // 非守護(hù)線程(守護(hù)線程會隨著主線程退出而退出,這里要避免)
.build();
// 線程池參數(shù)
int corePoolSize = CPU_CORES * 2; // 核心線程數(shù):CPU*2
int maximumPoolSize = CPU_CORES * 4; // 最大線程數(shù):CPU*4
long keepAliveTime = 60L; // 空閑線程存活時(shí)間:60秒
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100); // 任務(wù)隊(duì)列:100個(gè)
RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.CallerRunsPolicy(); // 拒絕策略:調(diào)用者執(zhí)行
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectedHandler
);
// 允許核心線程超時(shí)銷毀(默認(rèn)核心線程不會超時(shí),這里設(shè)為true,空閑時(shí)可以銷毀,節(jié)省資源)
executor.allowCoreThreadTimeOut(true);
return executor;
}
// 計(jì)算相關(guān)異步任務(wù)線程池(CPU密集型,比如計(jì)算優(yōu)惠、統(tǒng)計(jì)數(shù)據(jù))
@Bean
public Executor calculateAsyncExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("calculate-async-%d")
.build();
int corePoolSize = CPU_CORES + 1; // 核心線程數(shù):CPU+1
int maximumPoolSize = CPU_CORES + 1; // 最大線程數(shù)和核心線程數(shù)一致,不需要臨時(shí)線程
long keepAliveTime = 60L;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(50);
RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
threadFactory,
rejectedHandler
);
return executor;
}
}這里有兩個(gè)關(guān)鍵點(diǎn)要注意:
- 給線程起名字:用ThreadFactoryBuilder給線程起有意義的名字,比如 “order-async-% d”,后面查日志的時(shí)候,看到線程名就知道是哪個(gè)線程池的任務(wù),排查問題效率翻倍。
- 拒絕策略選 CallerRunsPolicy:對于業(yè)務(wù)任務(wù)(比如訂單處理),不建議用 AbortPolicy(拋異常)或 DiscardPolicy(扔任務(wù)),因?yàn)闀?dǎo)致業(yè)務(wù)失敗。CallerRunsPolicy 讓調(diào)用線程自己執(zhí)行任務(wù),雖然會讓主線程(比如 Tomcat 的線程)變慢,但能避免任務(wù)丟失,是比較安全的選擇。
五、實(shí)戰(zhàn)場景:一套方案覆蓋 80% 業(yè)務(wù)
前面講了CompletableFuture和線程池,現(xiàn)在咱們結(jié)合實(shí)際業(yè)務(wù)場景,把這套方案落地。以 “電商下單流程” 為例,看看異步怎么用。
5.1 下單流程分析
一個(gè)典型的電商下單流程包括:
- 參數(shù)校驗(yàn)(必須同步,因?yàn)橐磿r(shí)返回錯誤)
- 庫存扣減(必須同步,因?yàn)橐WC庫存不超賣)
- 創(chuàng)建訂單(必須同步,因?yàn)橐祷赜唵翁柦o用戶)
- 記錄下單日志(異步,不影響下單主流程)
- 更新用戶積分(異步,積分晚一點(diǎn)更新沒關(guān)系)
- 發(fā)送下單成功通知(異步,通知晚一點(diǎn)沒關(guān)系)
- 同步商品銷量(異步,銷量統(tǒng)計(jì)可以延遲)
其中 1-3 是核心主流程,必須同步執(zhí)行,4-7 是非核心流程,可以異步執(zhí)行,這樣能減少下單接口的響應(yīng)時(shí)間。
5.2 異步方案落地代碼
咱們用 Spring Boot 來寫這個(gè)下單接口,結(jié)合CompletableFuture和自定義線程池:
@RestController
@RequestMapping("/order")
publicclassOrderController {
@Autowired
private OrderService orderService;
@Autowired
private Executor orderAsyncExecutor; // 注入前面配置的訂單異步線程池
/**
* 下單接口
*/
@PostMapping("/create")
public Result<OrderVO> createOrder(@RequestBody OrderCreateDTO dto) {
// 1. 參數(shù)校驗(yàn)(同步)
validateParams(dto);
// 2. 核心流程:創(chuàng)建訂單(同步,包括庫存扣減、創(chuàng)建訂單記錄)
Order order = orderService.createOrder(dto);
// 3. 非核心流程:異步執(zhí)行
asyncProcessAfterCreate(order);
// 4. 返回結(jié)果(核心流程完成就返回,不用等異步流程)
OrderVO vo = convertToVO(order);
return Result.success(vo);
}
/**
* 參數(shù)校驗(yàn)(同步)
*/
private void validateParams(OrderCreateDTO dto) {
if (dto.getUserId() == null) {
thrownew BusinessException("用戶ID不能為空");
}
if (CollectionUtils.isEmpty(dto.getGoodsList())) {
thrownew BusinessException("商品列表不能為空");
}
// 其他校驗(yàn)...
}
/**
* 下單后異步處理(非核心流程)
*/
private void asyncProcessAfterCreate(Order order) {
// 3.1 異步記錄日志
CompletableFuture.runAsync(() -> {
orderService.recordOrderLog(order.getId(), "訂單創(chuàng)建成功");
}, orderAsyncExecutor)
.exceptionally(ex -> {
// 日志記錄失敗不影響主流程,只打日志
log.error("記錄訂單日志失敗,orderId={}", order.getId(), ex);
returnnull;
});
// 3.2 異步更新用戶積分(需要訂單金額,所以用supplyAsync)
CompletableFuture.supplyAsync(() -> {
// 計(jì)算積分:訂單金額1元=1積分
int points = order.getAmount().intValue();
return orderService.updateUserPoints(order.getUserId(), points);
}, orderAsyncExecutor)
.exceptionally(ex -> {
log.error("更新用戶積分失敗,userId={}, orderId={}", order.getUserId(), order.getId(), ex);
// 積分更新失敗可以重試,這里用定時(shí)任務(wù)重試,或者發(fā)MQ重試
sendRetryUpdatePointsMQ(order.getUserId(), order.getAmount().intValue());
returnnull;
});
// 3.3 異步發(fā)送通知(需要用戶手機(jī)號,所以先查用戶信息)
CompletableFuture.supplyAsync(() -> {
// 查用戶信息(IO密集型)
return userService.getUserById(order.getUserId());
}, orderAsyncExecutor)
.thenCompose(user -> {
// 用用戶手機(jī)號發(fā)送短信通知(異步)
return CompletableFuture.runAsync(() -> {
notifyService.sendSms(user.getPhone(), "您的訂單" + order.getId() + "已創(chuàng)建成功");
}, orderAsyncExecutor);
})
.exceptionally(ex -> {
log.error("發(fā)送下單通知失敗,orderId={}", order.getId(), ex);
returnnull;
});
// 3.4 異步同步商品銷量
CompletableFuture.runAsync(() -> {
for (OrderGoods goods : order.getOrderGoodsList()) {
goodsService.syncSalesCount(goods.getGoodsId(), goods.getQuantity());
}
}, orderAsyncExecutor)
.exceptionally(ex -> {
log.error("同步商品銷量失敗,orderId={}", order.getId(), ex);
returnnull;
});
}
/**
* 轉(zhuǎn)換VO
*/
private OrderVO convertToVO(Order order) {
OrderVO vo = new OrderVO();
vo.setOrderId(order.getId());
vo.setUserId(order.getUserId());
vo.setAmount(order.getAmount());
vo.setStatus(order.getStatus());
vo.setCreateTime(order.getCreateTime());
return vo;
}
/**
* 發(fā)送積分更新重試MQ
*/
private void sendRetryUpdatePointsMQ(Long userId, int points) {
RetryUpdatePointsMQ mq = new RetryUpdatePointsMQ();
mq.setUserId(userId);
mq.setPoints(points);
mq.setRetryCount(0); // 重試次數(shù)
mq.setNextRetryTime(System.currentTimeMillis() + 5 * 60 * 1000); // 5分鐘后重試
rabbitTemplate.convertAndSend("retry-update-points-exchange", "retry.update.points", mq);
}
}5.3 方案亮點(diǎn)和避坑點(diǎn)
亮點(diǎn):
- 核心流程和非核心流程分離:核心流程(校驗(yàn)、扣庫存、創(chuàng)建訂單)同步執(zhí)行,保證即時(shí)性和數(shù)據(jù)一致性;非核心流程(日志、積分、通知、銷量)異步執(zhí)行,減少接口響應(yīng)時(shí)間。
- 異常處理不影響主流程:每個(gè)異步任務(wù)都用exceptionally處理異常,即使某個(gè)非核心流程失?。ū热缤ㄖl(fā)送失敗),也不會導(dǎo)致下單接口報(bào)錯,用戶能正常下單。
- 失敗重試機(jī)制:比如積分更新失敗,發(fā)送 MQ 消息,用定時(shí)任務(wù)或 MQ 的重試機(jī)制(比如 RabbitMQ 的死信隊(duì)列)進(jìn)行重試,保證數(shù)據(jù)最終一致性。
避坑點(diǎn):
- 異步任務(wù)不要用默認(rèn)線程池:前面已經(jīng)講過,默認(rèn)線程池線程數(shù)少,容易堆積任務(wù),這里用自定義的orderAsyncExecutor,參數(shù)適配 IO 密集型任務(wù)。
- 異步任務(wù)不要依賴主線程的變量:比如下單接口里的dto對象,在異步任務(wù)里不要直接用,因?yàn)橹骶€程可能已經(jīng)把dto回收了,導(dǎo)致數(shù)據(jù)不一致。應(yīng)該用order對象里的字段(比如order.getUserId()),而不是dto.getUserId()。
- 異步任務(wù)要考慮冪等性:比如同步商品銷量,萬一異步任務(wù)執(zhí)行了兩次,會導(dǎo)致銷量統(tǒng)計(jì)錯誤。所以goodsService.syncSalesCount方法要做冪等處理,比如用 “訂單 ID + 商品 ID” 作為唯一鍵,避免重復(fù)統(tǒng)計(jì)。
六、進(jìn)階:分布式場景下的異步處理
前面講的都是 “本地異步”,也就是在同一個(gè)服務(wù)里的異步任務(wù)。如果遇到 “跨服務(wù)的異步任務(wù)”,比如:
- 訂單服務(wù)創(chuàng)建訂單后,需要通知庫存服務(wù)、支付服務(wù)、物流服務(wù)。
- 用戶服務(wù)更新用戶信息后,需要同步到搜索服務(wù)、推薦服務(wù)。
這時(shí)候光靠CompletableFuture就不夠了,因?yàn)榭绶?wù)的任務(wù)沒法用本地線程池執(zhí)行,而且如果服務(wù)掛了,本地異步任務(wù)會丟失。
這時(shí)候就需要分布式異步方案,核心是用 “消息隊(duì)列(MQ)” 來實(shí)現(xiàn),比如 RabbitMQ、Kafka、RocketMQ。
6.1 分布式異步方案設(shè)計(jì)
以 “訂單創(chuàng)建后通知多服務(wù)” 為例,方案如下:
- 訂單服務(wù):創(chuàng)建訂單成功后,發(fā)送一條 “訂單創(chuàng)建成功” 的 MQ 消息(比如發(fā)送到 RabbitMQ 的order.created隊(duì)列)。
- 其他服務(wù):
- 庫存服務(wù)監(jiān)聽order.created隊(duì)列,收到消息后更新庫存狀態(tài)。
- 支付服務(wù)監(jiān)聽order.created隊(duì)列,收到消息后創(chuàng)建支付單。
- 物流服務(wù)監(jiān)聽order.created隊(duì)列,收到消息后創(chuàng)建物流單。
- 重試和死信隊(duì)列:如果某個(gè)服務(wù)處理消息失?。ū热缥锪鞣?wù)暫時(shí)不可用),MQ 會自動重試,重試幾次還失敗的話,把消息放到死信隊(duì)列,后續(xù)人工處理。
6.2 代碼示例(用 RabbitMQ)
訂單服務(wù)發(fā)送 MQ 消息:
@Service
publicclass OrderServiceImpl implements OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public Order createOrder(OrderCreateDTO dto) {
// 1. 扣庫存
stockService.deductStock(dto.getGoodsList());
// 2. 創(chuàng)建訂單
Order order = orderMapper.insert(buildOrder(dto));
// 3. 發(fā)送MQ消息(分布式異步)
sendOrderCreatedMQ(order);
return order;
}
/**
* 發(fā)送訂單創(chuàng)建成功的MQ消息
*/
private void sendOrderCreatedMQ(Order order) {
OrderCreatedMQ mq = new OrderCreatedMQ();
mq.setOrderId(order.getId());
mq.setUserId(order.getUserId());
mq.setAmount(order.getAmount());
mq.setCreateTime(new Date());
try {
// 發(fā)送消息到order.created隊(duì)列
rabbitTemplate.convertAndSend(
"order-exchange", // 交換機(jī)
"order.created", // 路由鍵
mq,
message -> {
// 設(shè)置消息過期時(shí)間:30分鐘(超過30分鐘沒處理,就放到死信隊(duì)列)
message.getMessageProperties().setExpiration("1800000");
// 設(shè)置消息唯一ID,用于冪等處理
message.getMessageProperties().setMessageId(order.getId().toString());
return message;
}
);
log.info("發(fā)送訂單創(chuàng)建MQ消息成功,orderId={}", order.getId());
} catch (Exception e) {
log.error("發(fā)送訂單創(chuàng)建MQ消息失敗,orderId={}", order.getId(), e);
// 消息發(fā)送失敗,可以記錄到本地表,用定時(shí)任務(wù)重試
mqRetryService.saveRetryRecord("order.created", mq);
}
}
}物流服務(wù)監(jiān)聽 MQ 消息:
@Service
publicclass LogisticsConsumer {
@Autowired
private LogisticsService logisticsService;
/**
* 監(jiān)聽訂單創(chuàng)建消息,創(chuàng)建物流單
*/
@RabbitListener(
queues = "order.created.logistics.queue", // 物流服務(wù)的隊(duì)列
containerFactory = "rabbitListenerContainerFactory"
)
public void handleOrderCreated(Message message, @Payload OrderCreatedMQ mq) {
String messageId = message.getMessageProperties().getMessageId();
log.info("收到訂單創(chuàng)建消息,準(zhǔn)備創(chuàng)建物流單,messageId={}, orderId={}", messageId, mq.getOrderId());
try {
// 1. 冪等處理:先查有沒有處理過這個(gè)消息
if (logisticsService.existsByOrderId(mq.getOrderId())) {
log.info("物流單已存在,跳過處理,orderId={}", mq.getOrderId());
return;
}
// 2. 創(chuàng)建物流單
logisticsService.createLogisticsOrder(mq);
// 3. 手動確認(rèn)消息(如果用的是手動確認(rèn)模式)
Channel channel = message.getMessageProperties().getChannel();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("處理訂單創(chuàng)建消息失敗,messageId={}, orderId={}", messageId, mq.getOrderId(), e);
// 處理失敗,手動拒絕消息,讓MQ重試(重試次數(shù)由MQ配置)
try {
Channel channel = message.getMessageProperties().getChannel();
// requeue=false:不重新放回原隊(duì)列,讓MQ放到死信隊(duì)列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException ex) {
log.error("拒絕消息失敗,messageId={}", messageId, ex);
}
}
}
}6.3 分布式異步的注意點(diǎn)
- 消息冪等性:必須保證同一個(gè)消息不會被重復(fù)處理,比如用 “訂單 ID” 作為唯一鍵,處理前先查有沒有處理過。
- 消息可靠性:確保消息不會丟失,比如:
- 生產(chǎn)者發(fā)送消息后,MQ 要返回確認(rèn)(ack)。
- 消費(fèi)者處理完消息后,手動確認(rèn)(ack)。
- 消息發(fā)送失敗時(shí),記錄到本地重試表,用定時(shí)任務(wù)重試。
- 消息過期時(shí)間:設(shè)置消息過期時(shí)間,避免無效消息一直占用 MQ 資源,比如超過 30 分鐘沒處理的訂單消息,就放到死信隊(duì)列。
七、總結(jié):這套 “萬能” 方案的核心是什么?
看到這里,你可能會問:“你說這是‘萬能’方案,真的能覆蓋所有場景嗎?”
其實(shí)沒有絕對的 “萬能”,但這套方案能覆蓋 80% 以上的 Java 異步場景,它的核心是 “分層設(shè)計(jì)”:
- 本地異步層:用CompletableFuture做本地異步任務(wù)的編排,解決 “多任務(wù)并行”“鏈?zhǔn)秸{(diào)用”“異常處理”“超時(shí)控制” 的問題,配合自定義線程池,保證線程資源可控。
- 分布式異步層:用 MQ 做跨服務(wù)的異步通信,解決 “服務(wù)解耦”“消息可靠傳遞”“失敗重試” 的問題,配合冪等設(shè)計(jì)和死信隊(duì)列,保證數(shù)據(jù)最終一致性。
- 基礎(chǔ)保障層:包括線程池合理配置、異步任務(wù)冪等處理、異常日志記錄、失敗重試機(jī)制,這些是異步方案能穩(wěn)定運(yùn)行的基礎(chǔ)。
最后給你幾個(gè)實(shí)戰(zhàn)建議:
- 不要過度異步:只有非核心流程才用異步,核心流程(比如下單、支付)必須同步,保證數(shù)據(jù)一致性和即時(shí)性。
- 優(yōu)先用本地異步:能在一個(gè)服務(wù)里解決的,就不用分布式異步(MQ),因?yàn)?MQ 會增加系統(tǒng)復(fù)雜度。
- 監(jiān)控異步任務(wù):給線程池加監(jiān)控,比如用 Spring Boot Actuator 監(jiān)控線程池的活躍線程數(shù)、隊(duì)列長度、拒絕次數(shù);給 MQ 加監(jiān)控,監(jiān)控消息發(fā)送成功率、消費(fèi)成功率、死信隊(duì)列長度。
- 多做壓測:異步方案上線前,一定要做壓測,看看線程池會不會滿、MQ 會不會堆積,確保在高并發(fā)下能穩(wěn)定運(yùn)行。
掌握了這套方案,下次再遇到 “接口太慢”“線程不夠用”“跨服務(wù)異步” 的問題,你就能游刃有余地解決,再也不用被老板催著優(yōu)化性能了~




























