線程池ThreadPoolExecutor源碼深度解析
一、引言
二、為什么使用線程池?
三、JDK線程池的架構(gòu)設計
1. JUC并發(fā)包下的Executor框架的uml類圖
2. ThreadPoolExecutor的參數(shù)解析
3. 運行機制詳解
4. 線程池的生命周期
四、JDK內(nèi)置線程池的問題
五、線程池中的問題與最佳實踐
1. invokeAll 超時機制無效?
2. submit()的異常消失了?
3. 異常處理實踐
4. 拒絕策略實踐
5. 池隔離實踐
六、總結(jié)
一、引 言
為什么進行源碼角度的深度解析?
大家在項目中到處都在使用線程池做一些性能接口層次的優(yōu)化,原先串行的多個遠程調(diào)用,因為rt過高,通過線程池批量異步優(yōu)化,從而降低rt。還有像RocketMQ中broker啟動時,同時通過ScheduledThreadPoolExecutor線程池執(zhí)行其他組件的定時任務,每隔一段時間處理相關(guān)的任務。線程池廣泛的應用在外面各種實際開發(fā)場景中,我們很多同學可能在項目里只是簡單的copy了一些前人的代碼參數(shù)并不知道其中的含義,從而導致生產(chǎn)級別的bug。所以本篇文章,旨在幫助還不熟悉或者想要熟悉線程池的同學,分享我自己在學習線程池源碼上的一些內(nèi)容來更簡單、快速的掌握線程池。
二、為什么使用線程池?
并發(fā)編程中,對于常見的操作系統(tǒng),線程都是執(zhí)行任務的基本單元,如果每次執(zhí)行任務時都創(chuàng)建新的線程,任務執(zhí)行完畢又進行銷毀,會出現(xiàn)以下的問題:
- 資源開銷:比如在Linux系統(tǒng)中,頻繁的創(chuàng)建和銷毀線程,一個是頻繁的進行一個系統(tǒng)調(diào)用,另外是一些內(nèi)存和CPU資源調(diào)度的占用。雖然有一些寫時復制的策略防止lwp的創(chuàng)建時的內(nèi)存占用,但是實際寫入還是會申請系統(tǒng)內(nèi)存的,何況一些頁表等本身就有內(nèi)存占用。
- 性能瓶頸:線程的創(chuàng)建需要系統(tǒng)調(diào)用,如果只是簡單的計算任務,可能耗時還沒創(chuàng)建的rt高,這里反而降低了系統(tǒng)的吞吐量。
- 缺乏資源管理:無限制的創(chuàng)建線程會導致內(nèi)存溢出,java.lang.OutOfMemoryError: unable to create native thread,這里主要因為Java的線程其實Linux中是lwp線程,需要通過JNI進行系統(tǒng)調(diào)用創(chuàng)建,每個線程默認需要1MB的棧空間,很容易導致無休止的創(chuàng)建線程導致內(nèi)存溢出,另外就是頻繁的系統(tǒng)調(diào)用,導致的上下文切換,占用了過多的CPU,反而起到了相反的作用。
- 功能受限:手動管理線程難以實現(xiàn)更高級的功能,如定時任務、周期任務、任務管理、并發(fā)任務數(shù)的控制等。
通過上面的問題,我們其實可以清晰的感知到這些問題都是歸攏到資源沒有得到合理的分配和控制導致的,線程池出現(xiàn)的核心宗旨其實就是對資源的合理分配和控制。除了線程池,其實更多的也接觸過數(shù)據(jù)庫連接池、netty的對象池等池化技術(shù),這些池化思想其實都是為了更好的降低資源的消耗以及更好的進行資源管理。
三、JDK線程池的架構(gòu)設計
JUC并發(fā)包下的Executor框架的uml類圖
圖片
- Executor:任務執(zhí)行的頂層接口,主要是分離任務提交與執(zhí)行邏輯,支持同步/異步執(zhí)行,遵循Java內(nèi)存模型的 happen-before規(guī)則。
- ExecutorService:繼承Executor接口,提供了更完善的生命周期管理能力,通過Future對象提供任務取消、狀態(tài)查詢、結(jié)果獲取能力實現(xiàn)了任務監(jiān)控。
- AbstractExecutorService:常見的設計模式為了簡化線程池的開發(fā),通常通過父類進行一些基礎的默認實現(xiàn)讓子類繼承。
- ScheduledExecutorService:ExecutorService的擴展接口,支持延遲執(zhí)行和周期性任務調(diào)度。
- ThreadPoolExecutor:是ExecutorService接口最核心和最常用的實現(xiàn)類,它提供了高度可配置的線程池,允許我們精細控制線程池的各種行為。
- ScheduledThreadPoolExecutor:是ScheduledExecutorService接口的實現(xiàn)類,它繼承自ThreadPoolExecutor,專門用于處理定時和周期任務。
- Executors:一個靜態(tài)工廠模式的工具類,提供了一系列靜態(tài)方法來創(chuàng)建各種常見配置的線程池,newFixedThreadPool(), newCachedThreadPool(),等,簡化了創(chuàng)建線程池的使用但是會帶來一些問題,很多開發(fā)規(guī)范里都不建議大家直接使用。JDK內(nèi)置的線程池如果我們不熟悉里面的參數(shù)很有可能導致出乎自己意料的結(jié)果,池大小設置、阻塞隊列選擇等等都是有考究的,這一點后續(xù)會進行一些詳細說明。生產(chǎn)環(huán)境中建議謹慎使用或直接使用ThreadPoolExecutor構(gòu)造函數(shù)自定義。
ThreadPoolExecutor的參數(shù)解析
圖片
- corePoolSize 核心線程數(shù):
線程池中還未退出的alive的核心線程數(shù)量。
雖然線程處于空閑狀態(tài)(其實是阻塞在阻塞隊列中),除非顯示設置了allowCoreThreadTimeOut=true,否則這些線程不會從自己的run方法中退出被回收。
添加新任務時,如果當前工作線程小于coreSize,此時即使存在空閑的core線程,線程池也會通過addWorker方法創(chuàng)建一個新的線程。
- maximumPoolSize 最大線程數(shù):
線程池可以創(chuàng)建的最大線程數(shù)。
如果是有界隊列,當隊列滿時,仍然有任務進來,此時線程池會創(chuàng)建小于最大線程數(shù)的線程來完成任務,空閑。
如果是無界隊列,那么永遠不會出現(xiàn)第二點的情況,除了內(nèi)存異常,否則會一直保持核心線程數(shù),多余的任務會一直往隊列中加入。
- keepAliveTime 線程空閑存活時間
線程數(shù)超過corePoolSize后創(chuàng)建的線程我們理解為非核心線程,對于這類線程,他的回收機制在于我們設置的keepAliveTime,線程會限期阻塞在隊列中獲取任務,如果超時未獲取就會進行清理并退出。
另外如果設置allowCoreThreadTimeOut=true,所謂的核心線程在空閑時間達到keepAliveTime時也會被回收。
- unit 時間單位
keepAliveTime參數(shù)的時間單位,TimeUnit中時分秒等。
- workQueue 任務隊列
阻塞隊列,核心線程數(shù)滿時,新加入的任務,會先添加到阻塞隊列中等待線程獲取任務并執(zhí)行。
常用的BlockingQueue實現(xiàn)有:
1)ArrayBlockingQueue:數(shù)組實現(xiàn)的先進先出原則的有界阻塞隊列,構(gòu)造方法必須指定容量。
2)LinkedBlockingQueue:鏈表實現(xiàn)的阻塞隊列,構(gòu)造傳入容量則有界,未傳則是無界隊列,此時設置的最大線程數(shù)其實就不會有作用了。
3)SynchronousQueue:一個不存儲元素的阻塞隊列。每個put操作必須等待一個take操作,反之亦然。它相當于一個傳遞通道,非常適合傳遞性需求,吞吐量高,但要求maximumPoolSize足夠大。
4)PriorityBlockingQueue:二叉堆實現(xiàn)的優(yōu)先級阻塞隊列,構(gòu)造時可自行調(diào)整排序行為(小頂堆或大頂堆)。
5)DelayQueue:支持延時的無界阻塞隊列,主要用于周期性的任務,我們可以直接通過它來實現(xiàn)一些簡單的延遲任務需求,復雜的周期性任務建議使用ScheduledThreadPoolExecutor。
- threadFactory 線程工廠
用于創(chuàng)建新線程的工廠。通過自定義ThreadFactory,我們可以為線程池中的線程設置更有意義的名稱、設置守護線程狀態(tài)、設置線程優(yōu)先級、指定UncaughtExceptionHandler等。
Executors.defaultThreadFactory()是默認實現(xiàn)。
- handler 拒絕策略
ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
JDK內(nèi)置了四種拒絕策略:
1)ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
2)ThreadPoolExecutor.CallerRunsPolicy:提交任務的線程,直接執(zhí)行任務。變相的背壓機制,可以降低任務往線程中加入。
3)ThreadPoolExecutor.DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知,需容忍數(shù)據(jù)丟失。
4)ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重試提交當前任務,需容忍數(shù)據(jù)丟失。
5)實現(xiàn)RejectedExecutionHandler接口自定義拒絕策略,在實際生產(chǎn)應用中推薦使用,可以做一些打印觀察日志的操作,告警、兜底的相關(guān)處理等。
運行機制詳解
新任務通過execute()方法提交給ThreadPoolExecutor時,其處理流程如下:
判斷核心線程數(shù):如果當前運行的線程數(shù)小于corePoolSize,則創(chuàng)建新線程(即使有空閑的核心線程)來執(zhí)行任務。
嘗試入隊:如果當前運行的線程數(shù)大于或等于corePoolSize,則嘗試將任務添加到workQueue中。
- 如果workQueue.offer()成功(隊列未滿),任務入隊等待執(zhí)行。
嘗試創(chuàng)建非核心線程:如果workQueue.offer()失敗(隊列已滿):
- 判斷當前運行的線程數(shù)是否小于maximumPoolSize;
- 如果是,則創(chuàng)建新的非核心線程來執(zhí)行任務。
執(zhí)行拒絕策略:
如果當前運行的線程數(shù)也達到了maximumPoolSize(即核心線程和非核心線程都已用盡,且隊列也滿了),則執(zhí)行RejectedExecutionHandler所定義的拒絕策略。
參考網(wǎng)絡中的經(jīng)典執(zhí)行圖:
圖片
這個圖能很好的表明運行原理,但是忽略了很多細節(jié),比如所謂的緩沖執(zhí)行是在什么條件下去走的呢?直接執(zhí)行又是什么邏輯下執(zhí)行呢?最后的任務拒絕又是怎么回事?帶著這些疑問點,我們直接來進行一個源碼級別的分析:
execute核心流程的源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//線程池狀態(tài) 高3位表示線程狀態(tài) 低29位代表線程數(shù)量
int c = ctl.get();
//判斷當前線程池線程數(shù)量是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//作為核心線程數(shù)進行線程的創(chuàng)建,并且創(chuàng)建成功線程會將command的任務執(zhí)行--》對應圖上的直接執(zhí)行
if (addWorker(command, true))
return;
c = ctl.get();
}
//創(chuàng)建核心線程失敗或者當前線程數(shù)量超過核心線程數(shù)
//當前線程池是否還在運行狀態(tài),嘗試將任務添加到阻塞隊列 --》對應圖上的緩沖執(zhí)行
//BlockingQueue隊列的頂級抽象定義了offer不是進行阻塞添加而是立即返回,添加失敗直接返回false,區(qū)別于put
if (isRunning(c) && workQueue.offer(command)) {
//重新獲取線程池標志位
int recheck = ctl.get();
//如果線程此時不在運行狀態(tài)中,那么將任務刪除
if (! isRunning(recheck) && remove(command))
//刪除任務成功,走拒絕策略拒絕掉當前任務
reject(command);
else if (workerCountOf(recheck) == 0)
//如果線程池中的工作線程都沒有的時候,這里需要創(chuàng)建一個線程去執(zhí)行添加到隊列中的任務
//防止因為并發(fā)的原因工作線程都被終止掉了,此時任務在阻塞隊列里等著,缺沒有工作線程
addWorker(null, false);
}
//到這里那就是添加隊列失敗,或者線程池狀態(tài)異常,但是這里仍然嘗試進行創(chuàng)建一個worker
//如果創(chuàng)建失敗,也是走拒絕策略拒絕當前任務
else if (!addWorker(command, false))
reject(command);
}接下來我們仔細看看addWorker這個方法具體是在做什么:
//核心邏輯其實就是在無限循環(huán)創(chuàng)建一個worker,創(chuàng)建失敗直接返回,創(chuàng)建成功,則將worker執(zhí)行
// 因為worker有thread的成員變量,最終添加worker成功,會啟動線程的start方法
//start方法最終會回調(diào)到外層的runWorker方法,改方法會不停的從阻塞隊列里以阻塞的take方式
//獲取任務,除非達到能被終止的條件,此時當前線程會終止
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//不停的重試添加worker的計數(shù),只有添加成功的才會進行后續(xù)的worker啟動
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//重試期間,如果其他線程導致線程池狀態(tài)不一致了。重新回到第一個循環(huán)進行check判斷
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//這里加鎖一個是workers.add時需要加鎖,另外是防止其他線程已經(jīng)在嘗試修改線程池狀態(tài)了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將worker的引用添加到workers的hashSet中
workers.add(w);
int s = workers.size();
//更新線程池此時最大的線程數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,就啟動worker中的線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//這里添加失敗的話,需要把線程池的count數(shù)進行--,并且要把worker引用從hashSer中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}線程池的生命周期
在介紹運行機制原理的源碼分析時,其實是有提到線程池狀態(tài)這個概念的。介紹這個狀態(tài)其實也是讓大家更方便的去管理線程池,比如我們關(guān)閉線程池時,怎么去優(yōu)雅的關(guān)閉,使用不同的方法可能會有不同的效果,我們需要根據(jù)自己的業(yè)務場景去酌情分析、權(quán)衡使用。
//線程池的狀態(tài)和計數(shù)采用一個Integer變量設置的
//這里之所以用一個變量來儲存狀態(tài)和數(shù)量,其實很有講究的,因為我們在上面的運行原理上可以看到
//源碼中有大量的進行狀態(tài)以及數(shù)量的判斷,如果分開采用變量的記錄的話,在維護二者一致性方面
//可能就需要加鎖的維護成本了,而且計算中都是位移運算也是非常高效的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程池的大小由ctl低29位表示,現(xiàn)成狀態(tài)由ctl高3位表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程池的狀態(tài)通過簡單的位移就能計算出來,狀態(tài)只能從低到高流轉(zhuǎn),不能逆向
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 這里是獲取線程狀態(tài)以及獲取線程數(shù)量的簡單高效的位移方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }接下來結(jié)合源碼詳細介紹下線程池的5種狀態(tài)以及分別有什么不同的表現(xiàn)行為?
先說下結(jié)論:
RUNNING 這個就是線程池運行中狀態(tài),我們可以添加任務也可以處理阻塞隊列任務
SHUTDOWN 不能添加新的任務,但是會將阻塞隊列中任務執(zhí)行完畢
STOP 不能添加新的任務,執(zhí)行中的線程也會被打斷,也不會處理阻塞隊列的任務
TIDYING 所有線程都被終止,并且workCount=0時會被置為的狀態(tài)
TERMINATED 調(diào)用完鉤子方法terminated()被置為的狀態(tài)shutdown狀態(tài)源碼分析:
//線程池關(guān)閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//循環(huán)cas設置線程池狀態(tài),直到成功或狀態(tài)已經(jīng)state>=SHUTDOWN
advanceRunState(SHUTDOWN);
//這個是真正得出結(jié)論的地方
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//打斷空閑的線程,如何判斷線程是否空閑還是運行?
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//worker的線程沒有被打斷過,并且能獲取到worker的aqs獨占鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
//打斷當前線程,如果線程在阻塞隊列中阻塞,此時會被中斷
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}STOP狀態(tài)分析
//循環(huán)cas修改線程池狀態(tài)為stop。打斷所有線程,取出阻塞隊列的所有任務
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查線程的權(quán)限
checkShutdownAccess();
//將狀態(tài)case為stop
advanceRunState(STOP);
//打斷所有worker不管是不是正在執(zhí)行任務
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//這里獲取鎖之后。打斷了所有的線程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}TIDYING、TERMINATED 狀態(tài)分析
//這個方法在每個線程退出時都會進行調(diào)用,如果是運行中、或者狀態(tài)大于等于TIDYING或者shutdown但是隊列不為空都
//直接返回,如果不滿足以上條件,并且線程數(shù)不為0的話,打斷一個空閑線程
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//此時到這里,狀態(tài)要么為STOP。要么是shutdown并且隊列為空了
// 獲取一個鎖,嘗試cas修改狀態(tài)為TIDYING
//調(diào)用terminated()的鉤子方法,
//修改線程池為終態(tài)TERMINATED,并且喚醒阻塞在termination隊列上的線程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}四、JDK內(nèi)置線程池的問題
java.util.concurrent.Executors工廠類提供了一些靜態(tài)方法,方便我們快速創(chuàng)建幾種預設配置的線程池:
- Executors.newFixedThreadPool(int nThreads):
創(chuàng)建一個固定大小的線程池。corePoolSize和maximumPoolSize都等于nThreads。
keepAliveTime為0L(因為線程數(shù)不會超過corePoolSize,所以此參數(shù)無效,除非allowCoreThreadTimeOut為true)。
使用無界的LinkedBlockingQueue作為工作隊列。
問題:由于使用無界隊列,當任務提交速度遠大于處理速度時,隊列會持續(xù)增長,可能導致內(nèi)存溢出(OOM)。此時maximumPoolSize參數(shù)實際上是無效的,線程數(shù)永遠不會超過nThreads。
- Executors.newSingleThreadExecutor():
創(chuàng)建一個只有一個工作線程的線程池。corePoolSize和maximumPoolSize都為1。
同樣使用無界的LinkedBlockingQueue。
保證所有任務按照提交順序(FIFO)執(zhí)行。
問題:與newFixedThreadPool類似,無界隊列可能導致OOM。
- Executors.newCachedThreadPool():
創(chuàng)建一個可緩存的線程池。
corePoolSize為0。
maximumPoolSize為Integer.MAX_VALUE (幾乎是無界的)。
keepAliveTime為60秒。
使用SynchronousQueue作為工作隊列。這種隊列不存儲元素,任務提交后必須有空閑線程立即接收,否則會創(chuàng)建新線程(如果未達到maximumPoolSize)。
問題:如果任務提交速度過快,會創(chuàng)建大量線程(理論上可達Integer.MAX_VALUE個),可能耗盡系統(tǒng)資源,導致OOM以及頻繁的上下文切換。
- Executors.newSingleThreadScheduledExecutor()、Executors.newScheduledThreadPool(int corePoolSize):
創(chuàng)建用于調(diào)度任務的線程池。
內(nèi)部使用ScheduledThreadPoolExecutor實現(xiàn),其任務隊列是DelayedWorkQueue (一種特殊的PriorityQueue)。
newSingleThreadScheduledExecutor的corePoolSize為1,maximumPoolSize為Integer.MAX_VALUE(但由于隊列是DelayedWorkQueue,通常不會無限增長線程,除非有大量同時到期的任務且處理不過來)。
newScheduledThreadPool可以指定corePoolSize。
問題:雖然DelayedWorkQueue本身是無界的,但ScheduledThreadPoolExecutor在任務執(zhí)行邏輯上與普通ThreadPoolExecutor有所不同。主要風險仍然是如果corePoolSize設置不當,且大量任務同時到期并執(zhí)行緩慢,可能導致任務積壓。
某一線互聯(lián)網(wǎng)Java開發(fā)手冊
五、線程池中的問題與最佳實踐
invokeAll 超時機制無效?
ExecutorService.invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)方法會提交一組Callable任務,并等待所有任務完成,或者直到超時。如果超時發(fā)生,它會嘗試取消(中斷)所有尚未完成的任務,然后返回一個List<Future>。
失效場景分析:
- 任務不響應中斷(最常見):任務內(nèi)部捕獲 InterruptedException 后靜默處理,或執(zhí)行不檢查中斷狀態(tài)的阻塞操作(如循環(huán)計算):
Callable<String> task = () -> {
while (true) {
//缺少此檢查將導致超時失效
if (Thread.interrupted()) break;
// 耗時計算...
}
return "done";
};- 使用非響應中斷的API:任務調(diào)用了不響應 interrupt() 的第三方庫或JNI代碼(如某些IO操作)
Callable<Integer> task = () -> {
Files.copy(in, path); // 某些NIO操作不響應中斷
return 1;
};- 任務依賴外部資源阻塞:任務因外部資源(如數(shù)據(jù)庫連接、網(wǎng)絡請求)阻塞且未設置超時。
Callable<Result> task = () -> {
//未設查詢超時時間
return jdbcTemplate.query("SELECT * FROM large_table");
};- 線程池配置缺陷:核心線程數(shù)過大或隊列無界,導致 invokeAll 超時前任務無法全部啟動,任務堆積在隊列,invokeAll 超時后仍有大量任務未執(zhí)行。
new ThreadPoolExecutor(
100, 100, // 核心線程數(shù)過大
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // 無界隊列
);invokeAll超時失效demo:
import java.util.*;
import java.util.concurrent.*;
public class InvokeAllTimeoutDemo {
// 模擬耗時任務(可配置是否響應中斷)
static class Task implements Callable<String> {
private final int id;
private final long durationMs;
private final boolean respectInterrupt;
Task(int id, long durationMs, boolean respectInterrupt) {
this.id = id;
this.durationMs = durationMs;
this.respectInterrupt = respectInterrupt;
}
@Override
public String call() throws Exception {
System.out.printf("Task %d started%n", id);
long start = System.currentTimeMillis();
// 模擬工作(檢查中斷狀態(tài))
while (System.currentTimeMillis() - start < durationMs) {
if (respectInterrupt && Thread.interrupted()) {
throw new InterruptedException("Task " + id + " interrupted");
}
// 不響應中斷的任務會繼續(xù)執(zhí)行
}
System.out.printf("Task %d completed%n", id);
return "Result-" + id;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
new Task(1, 2000, true), // 2秒,響應中斷
new Task(2, 10000, false) // 10秒,不響應中斷
);
System.out.println("Invoking with 3s timeout...");
try {
//設置3秒超時
List<Future<String>> futures = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
for (Future<String> f : futures) {
// 明確處理取消狀態(tài)
if (f.isCancelled()) {
System.out.println("Task was cancelled");
} else {
try {
System.out.println("Result: " + f.get(100, TimeUnit.MILLISECONDS));
} catch (TimeoutException | ExecutionException e) {
System.out.println("Task failed: " + e.getCause());
}
}
}
} finally {
executor.shutdownNow();
System.out.println("Executor shutdown");
}
}
}當我們使用invokeAll(tasks, timeout) 提交多個任務時,如果出現(xiàn)某個任務對中斷不響應或者響應不及時,那我們即使設置了超時時間,不響應中斷的任務2仍在后臺運行(即使調(diào)用了 shutdownNow())
submit()的異常消失了?
使用ExecutorService.submit()提交任務時,任務執(zhí)行過程中如果拋出未捕獲的異常(無論是受檢異常還是運行時異常),這個異常會被Future的包裝類如FutureTask重寫的run()方法捕獲并封裝在返回的Future包裝對象的成員變量中。
- 不顯示調(diào)用Future.get(),該異常我們就無法感知,好像沒有發(fā)生過一樣。線程池的工作線程本身通常會有一個默認的未捕獲異常處理器,可能會打印堆棧到控制臺,但你的主業(yè)務邏輯不會知道。
- 顯示調(diào)用Future.get(),拋出聲明式的ExecutionException,其cause屬性才是原始的任務異常。
- 如果調(diào)用Future.get(long timeout, TimeUnit unit)超時,向外拋出聲明式的TimeoutException。此時任務可能仍在后臺執(zhí)行,可能錯過了內(nèi)部的異常。
submit()異常消失demo:
public class ThreadPoolExceptionDemo {
public static void main(String[] args) {
// 創(chuàng)建單線程線程池(便于觀察異常)
ExecutorService executor = Executors.newSingleThreadExecutor();
// 場景1:Callable拋出異常(通過Future.get()捕獲)
Future<String> future1 = executor.submit(() -> {
System.out.println("[Callable] 開始執(zhí)行");
Thread.sleep(100);
throw new RuntimeException("Callable故意拋出的異常");
});
try {
System.out.println("Callable結(jié)果: " + future1.get());
} catch (ExecutionException e) {
System.err.println("捕獲到Callable異常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 場景2:Runnable拋出異常(同樣通過Future.get()捕獲)
Future<?> future2 = executor.submit(() -> {
System.out.println("[Runnable] 開始執(zhí)行");
throw new IllegalArgumentException("Runnable故意拋出的異常");
});
try {
future2.get(); // Runnable成功時返回null
} catch (ExecutionException e) {
System.err.println("捕獲到Runnable異常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 場景3:未處理的任務異常(需設置異常處理器)
executor.submit(() -> {
System.out.println("[未捕獲的任務] 開始執(zhí)行");
throw new IllegalStateException("這個異常會被默認處理器處理");
});
executor.shutdown();
}
}異常處理實踐
圖片
- Callable/Runnable catch處理異常:
不要捕獲Throwable或Exception然后靜默處理(只打日志)。如果確實需要捕獲,請考慮是否應該重新拋出(包裝成業(yè)務允許的受檢異?;蜻\行時異常)。
禁止靜默處理 InterruptedException:
1)在JDK的JUC底層源碼中,我們可以看到很多聲明了InterruptedException的方法,基本上都是對這類方法catch異常,要么繼續(xù)往外拋出,或者處理完相關(guān)資源后,重置中斷狀態(tài),絕對不要靜默處理。
2)如果方法沒有聲明InterruptedException如Runnable.run(),在catch InterruptedException后最好調(diào)用Thread.currentThread().interrupt()來恢復中斷標記。
正確處理中斷:callable在耗時的loop任務處理中,如果出現(xiàn)了中斷異常,因為Java代碼中中斷只是一種協(xié)作方式,其并沒真的終止線程,所以一般都是需要我們進行一個中斷標志的傳遞,如線程池中的shutdownNow()就依賴次機制處理。
- submit()執(zhí)行的任務,謹慎處理Future:
使用帶過期時間的future.get(long timeOut)獲取結(jié)果,并要對該方法進行try cache防止其他異常拋出。
多個任務并行處理時,如果有下個請求依賴上個請求,務必使用get()讓主線程等待這一結(jié)果執(zhí)行完成后,流轉(zhuǎn)到下一個異步任務。
- 實現(xiàn)線程Thread的UncaughtExceptionHandler屬性,在自定義的TheadFactory中通過set方法賦值:execute()方法執(zhí)行時,對于沒有捕獲的異常使用線程組的兜底統(tǒng)一處理機制。
//自定義當前線程組創(chuàng)建線程的統(tǒng)一異常處理,類似于controller的統(tǒng)一異常處理機制
ThreadFactory myThreadFactory = new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final String threadNamePrefix = "myThreadFactory-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,threadNamePrefix + atomicInteger.getAndIncrement());
t.setUncaughtExceptionHandler((thread, throwable) -> {
//異常的統(tǒng)一處理,日志打印、兜底處理、監(jiān)控、資源釋放等
System.err.println("線程[" + thread.getName() + "]異常: " + throwable);});
return t;
}
};
//構(gòu)造方法時使用自定義的線程工廠
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory,
handler
);- 使用自定義線程池時建議重寫鉤子方法afterExecute(Runnable r, Throwable t):這個hook方法是用來解決當前任務線程發(fā)生的異常,默認是空實現(xiàn),我們可以重寫他,比如進行兜底的線程繼續(xù)執(zhí)行,打印日志記錄,以及同步失敗使用兜底異步處理等等方式。還要注意釋放應用中的資源,比如文件鎖的占用等,最好手動釋放掉,避免底層操作系統(tǒng)線程對這類資源釋放失敗導致長期占用,最后只能重啟Java進程的尷尬地步。
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//需要特別注意任務是否為submit提交,如果是execute提交的任務,那這里很直接的知道任務是否發(fā)生異常以及后續(xù)去怎么處理
if(r instanceof Future){
if(((Future<?>) r).isDone() || ((Future<?>) r).isCancelled()){
//繼續(xù)使用主線程完成任務,一般不建議,最好使用兜底方式:例如異步發(fā)消息,由后續(xù)的消費組統(tǒng)一處理異常的任務
}
}else if( t != null){
//execute異常處理
}
}
}
//FutureTask 把run方法進行了重寫,并且catch住了異常,所以說afterExecute的t 如果是submit提交的方式
//那么t基本上就是null
public void run() {
//....
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
//...
}afterExecute可以借鑒的示例:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.slf4j.*;
public class RobustThreadPool extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(RobustThreadPool.class);
private final AtomicLong failureCounter = new AtomicLong();
private final RetryPolicy retryPolicy; // 重試策略
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public RobustThreadPool(int corePoolSize, int maxPoolSize,
BlockingQueue<Runnable> workQueue,
RetryPolicy retryPolicy) {
super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue);
this.retryPolicy = retryPolicy;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
logger.debug("開始執(zhí)行任務: {}", r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 1. 異常分類處理
if(r instanceof Future){
if(((Future<?>) r).isDone()){
//錯誤記錄以及異常處理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
}else if( t != null){
//execute異常處理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
// 2. 資源清理
cleanThreadLocals();
}
private void handleFailure(Runnable r, Throwable t) {
// 1. 異常類型識別
if (t instanceof OutOfMemoryError) {
logger.error("JVM內(nèi)存不足,終止任務: {}", t.getMessage());
System.exit(1); // 嚴重錯誤直接終止
}
// 2. 可重試異常處理
else if (isRetryable(t)) {
int retryCount = retryPolicy.getCurrentRetryCount(r);
if (retryCount < retryPolicy.getMaxRetries()) {
logger.warn("任務第{}次失敗,準備重試...",
retryCount + 1, t);
retryPolicy.retry(r, this);
} else {
logger.error("任務超過最大重試次數(shù)({}),轉(zhuǎn)入死信隊列",
retryPolicy.getMaxRetries(), t);
DeadLetterQueue.add(r, t);
}
}
// 3. 不可重試異常
else {
logger.error("不可恢復任務失敗", t);
Metrics.recordFailure(t.getClass()); // 上報監(jiān)控
}
}
private boolean isRetryable(Throwable t) {
return t instanceof IOException ||
t instanceof TimeoutException ||
(t.getCause() != null && isRetryable(t.getCause()));
}
private void cleanThreadLocals() {
// 清理可能的內(nèi)存泄漏
try {
ThreadLocal<?>[] holders = { /* 其他ThreadLocal */};
for (ThreadLocal<?> holder : holders) {
holder.remove();
}
} catch (Exception e) {
logger.warn("清理ThreadLocal失敗", e);
}
}
// 重試策略嵌套類
public static class RetryPolicy {
private final int maxRetries;
private final long retryDelayMs;
private final Map<Runnable, AtomicInteger> retryMap = new ConcurrentHashMap<>();
public RetryPolicy(int maxRetries, long retryDelayMs) {
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
public void retry(Runnable task, Executor executor) {
retryMap.computeIfAbsent(task, k -> new AtomicInteger()).incrementAndGet();
if (retryDelayMs > 0) {
executor.execute(() -> {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ignored) {}
executor.execute(task);
});
} else {
executor.execute(task);
}
}
public int getCurrentRetryCount(Runnable task) {
return retryMap.getOrDefault(task, new AtomicInteger()).get();
}
public int getMaxRetries() {
return maxRetries;
}
}
}異常處理小結(jié):要特別注意使用future.get()方法時,我們一定要注意設置超時時間,防止主線程無限期的阻塞避免邊緣的業(yè)務查詢影響了主業(yè)務造成得不償失的效果,另外我們需要注意一個點就是submit()方法的提交任務時,afterExecute(Runnable r, Throwable t)中的t恒為null,如果是execute方法提交的任務,那么就是直接獲取的任務執(zhí)行的異常,對于submit提交的任務異常其被封裝到了Futrure 包裝對象中,一般需要我們再次判斷任務時執(zhí)行完畢還是異常或被取消了,如果發(fā)生了異常,F(xiàn)uture.get()會拋出封裝的ExecutionException異常,當然還可能是取消異常以及中斷異常。invokeAll和invokeAny我們需要對返回的Future結(jié)果檢查可能拋出的異常,對于callable 前面一再強調(diào)了要對InterruptedException不要靜默處理,因為線程的中斷標記只是一個協(xié)作方式,他并沒有停止當前線程的運行,我們需要根據(jù)自身的場景對發(fā)生的中斷進行快速響應以及傳遞中斷標志。
拒絕策略實踐
先帶大家回顧一下策略是如何觸發(fā)執(zhí)行的流程:
//添加任務,當不滿足條件時會執(zhí)行拒絕方法reject
public void execute(Runnable command) {
//...
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//這里就是拒絕的入口。handler是有構(gòu)造方法傳入
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//....
//指定拒絕策略
this.handler = handler;
}AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
public static class AbortPolicy implements RejectedExecutionHandler {
//直接拋出RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}優(yōu)點:快速失敗,立即暴露系統(tǒng)過載問題、避免任務靜默丟失,便于監(jiān)控系統(tǒng)捕獲
缺點:需要調(diào)用方顯式處理異常,增加代碼復雜度,可能中斷主業(yè)務流程
適用場景:適用于那些對任務丟失非常敏感,配合熔斷機制使用的快速失敗場景
CallerRunsPolicy:提交任務的線程,直接執(zhí)行任務
public static class CallerRunsPolicy implements RejectedExecutionHandler {
//直接在提交任務的線程中執(zhí)行任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}優(yōu)點:任務都會被執(zhí)行,不會丟任務,并且由于主線程執(zhí)行任務,天然的流量控制,避免了大量的任務進入線程池。
缺點:調(diào)用線程可能被阻塞,導致上游服務雪崩。不適合高并發(fā)場景(可能拖垮整個調(diào)用鏈)。
適用場景:適用于處理能力不高,并且資源過載能夠平滑過渡,同時不丟失任務的場景。如:低并發(fā)、高可靠性的后臺任務(如日志歸檔)、允許同步執(zhí)行的批處理系統(tǒng)。
DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知。
public static class DiscardPolicy implements RejectedExecutionHandler {
//空實現(xiàn)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}優(yōu)點:實現(xiàn)簡單,無額外性能開銷。避免異常傳播影響主流程
缺點:數(shù)據(jù)靜默丟失,可能會掩蓋系統(tǒng)容量問題
適用場景:邊緣業(yè)務的監(jiān)控上報數(shù)據(jù),統(tǒng)計類的uv、pv統(tǒng)計任務
DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重試提交當前任務。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
//丟棄隊列中最舊的任務,重試當前任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}優(yōu)點:優(yōu)先保證新任務執(zhí)行,避免隊列堆積導致內(nèi)存溢出。
缺點:可能丟失關(guān)鍵舊任務、任務執(zhí)行順序無法保證。
適用場景:適用于可容忍部分數(shù)據(jù)丟失,并且實時性要求高于歷史數(shù)據(jù)的場景,比如:行情推送。
通過上線的介紹,我們可以看到JDK內(nèi)置策略基本上只使用于簡單處理的場景,在生產(chǎn)實踐中一般推薦我們自定義拒絕策略,進行相關(guān)的業(yè)務處理。
1. 自定義RejectedExecutionHandler:
/**
* 帶監(jiān)控統(tǒng)計的拒絕策略處理器
*/
public class MetricsRejectedExecutionHandler implements RejectedExecutionHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsRejectedExecutionHandler.class);
// 統(tǒng)計被拒絕的任務數(shù)量
private final AtomicLong rejectedCount = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 1. 采集線程池關(guān)鍵指標
int poolSize = executor.getPoolSize();
int activeThreads = executor.getActiveCount();
int corePoolSize = executor.getCorePoolSize();
int maxPoolSize = executor.getMaximumPoolSize();
int queueSize = executor.getQueue().size();
long completedTasks = executor.getCompletedTaskCount();
// 2. 遞增拒絕計數(shù)器
long totalRejected = rejectedCount.incrementAndGet();
// 3. 輸出警告日志(包含完整指標)
logger.warn("""
任務被拒絕執(zhí)行!線程池狀態(tài):
|- 活躍線程數(shù)/當前線程數(shù): {}/{}
|- 核心/最大線程數(shù): {}/{}
|- 隊列大小: {}
|- 已完成任務數(shù): {}
|- 歷史拒絕總數(shù): {}
|- 被拒絕任務: {}
""",
activeThreads, poolSize,
corePoolSize, maxPoolSize,
queueSize,
completedTasks,
totalRejected,
r.getClass().getName());
// 4. 可選:降級處理(如存入數(shù)據(jù)庫等待重試)
// fallbackToDatabase(r);
// 5. 拋出RejectedExecutionException(保持默認行為)
throw new RejectedExecutionException("Task " + r.toString() + " rejected");
}
// 獲取累計拒絕次數(shù)(用于監(jiān)控)
public long getRejectedCount() {
return rejectedCount.get();
}
}- 記錄日志并告警:所有的異常處理中,最常見簡單的方式無外乎,先記錄個日志,然后有告警系統(tǒng)的進行相關(guān)的某書、某信以及短信等的告警信息推送,方便開發(fā)人員以及運維人員的及時發(fā)現(xiàn)問題并介入處理。
- 兜底處理機制:一般常見的就是通過異步的方式提交到MQ,然后統(tǒng)一進行兜底處理。
- 帶超時和重試的拒絕:可以嘗試等待一小段時間,或者重試幾次提交,如果仍然失敗,再執(zhí)行最終的拒絕邏輯(如告警、持久化或拋異常)。
- 動態(tài)調(diào)整策略:根據(jù)系統(tǒng)的負載或任務類型,動態(tài)的執(zhí)行兜底策略機制,就如前面寫的源碼示例方式。
2. 根據(jù)自身業(yè)務場景選擇合適的拒絕策略:
- 核心業(yè)務,不容丟失:如果任務非常重要,不能丟失,可以考慮:
CallerRunsPolicy:調(diào)用線程承擔任務執(zhí)行壓力,是否可支撐;
自定義策略:嘗試持久化到MQ或DB,然后由專門的消費組補償任務處理;
AbortPolicy:如果希望系統(tǒng)快速失敗并由上層進行重試或熔斷。
- 非核心業(yè)務,可容忍部分丟失:
DiscardOldestPolicy:新任務更重要時,如行情推送;
DiscardPolicy:邊緣業(yè)務場景,比如一些pv統(tǒng)計等,丟失了無所謂;
及時的進行監(jiān)控查看,了解任務的丟失情況。
3. 結(jié)合線程池參數(shù)綜合考慮:
- 拒絕策略的選擇也與線程池的隊列類型(有界/無界)、隊列容量、maximumPoolSize等參數(shù)密切相關(guān)。
- 如果使用無界隊列LinkedBlockingQueue的無參構(gòu)造,只有機器內(nèi)存不夠時才會進行拒絕策略,不過這種極端場景已經(jīng)不是影響線程池本身,內(nèi)存不夠可能導致Java進程被操作系統(tǒng)直接kill可能。
- 如果使用有界隊列,需要權(quán)衡隊列的大小,核心場景甚至可以動態(tài)追蹤阻塞隊列大小,以及動態(tài)調(diào)整隊列大小來保證核心業(yè)務的正常流轉(zhuǎn)。
- 充分測試和監(jiān)控:無論選擇哪種策略,都必須在壓測環(huán)境中充分測試其行為,并在線上環(huán)境建立完善的監(jiān)控體系,監(jiān)控線程池的各項指標(活躍線程數(shù)、隊列長度、任務完成數(shù)、任務拒絕數(shù)等)。當拒絕發(fā)生時,應有相應的告警通知。
拒絕策略小結(jié):
策略的選擇跟我們大多數(shù)的系統(tǒng)設計哲學是保持一致的,都是在應對不同的場景中,做出一定的trade off。最好的策略需要根據(jù)業(yè)務場景、系統(tǒng)容忍度、資源等方面的綜合考量,一個黃金的實踐原則:拒絕事件做好監(jiān)控告警、根據(jù)業(yè)務SLA定義策略,是否可丟失,快速失敗等,定期的進行壓力測試,驗證策略的有效性。
池隔離實踐
核心思想:根據(jù)任務的資源類型 、優(yōu)先級和業(yè)務特性 ,劃分多個獨立的線程池,避免不同性質(zhì)的任務相互干擾。
1. 隔離維度:
- 資源類型:CPU密集型 vs I/O密集型任務
- 執(zhí)行時長:短時任務(毫秒級) vs 長時任務(分鐘級)
- 實時性要求:高實時性 vs 可延遲(最終一致即可)
- 業(yè)務重要性:支付交易(高優(yōu)先級) vs 日志清理(低優(yōu)先級)
- 是否依賴外部資源:例如,訪問特定數(shù)據(jù)庫、調(diào)用特定第三方API的任務可以歸為一類。
2. 不同業(yè)務場景線程池獨立使用:在不同的業(yè)務場景下,為自己的特定業(yè)務,創(chuàng)建獨立的線程池。
- 線程命名:通過ThreadFactory為每個線程池及其線程設置有意義的名稱,例如netty-io-compress-pool-%d,excel-export-pool-%d, 主要方便區(qū)別不同的業(yè)務場景以及問題排查。
- 參數(shù)調(diào)優(yōu):不同的業(yè)務場景設置不同的參數(shù)。
corePoolSize, maximumPoolSize:CPU密集型的計算任務可以設置小點減少上下文的切換,I/O密集型可以較大,在io阻塞等待期間,多去處理其他任務。
阻塞隊列blockQueue:選擇合適的隊列類型,以及設置合理的隊列大小。
RejectedExecutionHandler:有內(nèi)置的四種的策略以及自定義策略選擇,一般建議做好日志、監(jiān)控以及兜底的處理。
3. 自定義Executor避免線程池共用
// 創(chuàng)建CPU密集型任務線程池(線程數(shù)=CPU核心數(shù))
ExecutorService cpuIntensiveExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心線程數(shù)=CPU核心數(shù)
Runtime.getRuntime().availableProcessors(), // 最大線程數(shù)=CPU核心數(shù)
30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadFactoryBuilder()
.setNameFormat("cpu-pool-%d")
.setPriority(Thread.MAX_PRIORITY) // 提高優(yōu)先級
.build(),
new ThreadPoolExecutor.AbortPolicy() // 直接拒絕
);
// 使用示例
CompletableFuture.supplyAsync(() -> {
// 矩陣計算等CPU密集型任務
double[][] result = matrixMultiply(largeMatrixA, largeMatrixB);
return result;
}, cpuIntensiveExecutor)
.thenAccept(result -> {
System.out.println("計算結(jié)果維度: " + result.length + "x" + result[0].length);
});線程池隔離小結(jié):
專池專用的本質(zhì)是通過物理隔離實現(xiàn):
- 資源保障 :關(guān)鍵業(yè)務獨占線程資源
- 故障隔離 :避免級聯(lián)雪崩
- 性能優(yōu)化 :針對任務類型最大化吞吐量
最終呈現(xiàn)的效果是像專業(yè)廚房的分區(qū)(切配區(qū)/炒菜區(qū)/面點區(qū))一樣,讓每個線程池專注處理同類任務,提升整體效率和可靠性。
六、總結(jié)
線程池是Java并發(fā)編程的核心組件,通過復用線程減少資源開銷,提升系統(tǒng)吞吐量。其核心設計包括線程復用機制 、任務隊列和拒絕策略 ,通過ThreadPoolExecutor的參數(shù)(核心線程數(shù)、最大線程數(shù)、隊列容量等)實現(xiàn)靈活的資源控制。線程池的生命周期由RUNNING、SHUTDOWN等狀態(tài)管理,確保任務有序執(zhí)行或終止。
內(nèi)置線程池(如Executors.newCachedThreadPool)雖便捷,但存在內(nèi)存溢出或無界隊列堆積的風險,需謹慎選擇。invokeAll的超時失效和submit提交任務的異常消失是常見陷阱需通過正確處理中斷和檢查Future.get()規(guī)避。
最佳實踐包括:
- 異常處理:通過afterExecute來對發(fā)生的異常進行兜底處理,任務細粒度的try catch或UncaughtExceptionH捕獲異常處理防止線程崩潰退出;
- 拒絕策略:根據(jù)業(yè)務選擇拒絕策略或自定義降級邏輯,生產(chǎn)級應用建議盡量自定義處理;
- 線程隔離 :按任務類型(CPU/I/O)或優(yōu)先級劃分線程池,避免資源競爭。
合理使用線程池能顯著提升性能,但需結(jié)合業(yè)務場景精細調(diào)參,確保穩(wěn)定性和可維護性,希望這篇文章能給大家?guī)硪恍┥a(chǎn)實踐上的指導,減少一些因為不熟悉線程池相關(guān)原理生產(chǎn)誤用導致的一些問題。




























