萬(wàn)字長(zhǎng)文阿粉帶你解析 ThreadPoolExecutor
本文轉(zhuǎn)載自微信公眾號(hào)「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java極客技術(shù)公眾號(hào)。
為什么要用線程池
你有沒(méi)有這樣的疑惑,為什么要用線程池呢?可能你會(huì)說(shuō),我可以復(fù)用已經(jīng)創(chuàng)建的線程呀;線程是個(gè)重量級(jí)對(duì)象,為了避免頻繁創(chuàng)建和銷毀,使用線程池來(lái)管理最好了。
沒(méi)毛病,各位都很懂哈~
不過(guò)使用線程池還有一個(gè)重要的點(diǎn):可以控制并發(fā)的數(shù)量。如果并發(fā)數(shù)量太多了,導(dǎo)致消耗的資源增多,直接把服務(wù)器給搞趴下了,肯定也是不行的
繞不過(guò)去的幾個(gè)參數(shù)
提到 ThreadPoolExecutor 那么你的小腦袋肯定會(huì)想到那么幾個(gè)參數(shù),咱們來(lái)瞅瞅源碼(我就直接放有 7 個(gè)參數(shù)的那個(gè)方法了):
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler)
咱們分別來(lái)看:
- corePoolSize :
核心線程數(shù),在線程池中有兩種線程,核心線程和非核心線程。在線程池中的核心線程,就算是它什么都不做,也會(huì)一直在線程池中,除非設(shè)置了 allowCoreThreadTimeOut 參數(shù)
- maximumPoolSize:
線程池能夠創(chuàng)建的最大線程數(shù)。這個(gè)值 = 核心線程數(shù) + 非核心線程數(shù)
- keepAliveTime & unit :
線程池是可以撤銷線程的,那么什么時(shí)候撤銷呢?一個(gè)線程如果在一段時(shí)間內(nèi),都沒(méi)有執(zhí)行任務(wù),那說(shuō)明這個(gè)線程很閑啊,那是不是就可以把它撤銷掉了?
所以呢,如果一個(gè)線程不是核心線程,而且在 keepAliveTime & unit 這段時(shí)間內(nèi),還沒(méi)有干活,那么很抱歉,只能請(qǐng)你走人了 核心線程就算是很閑,也不會(huì)將它從線程池中清除,沒(méi)辦法誰(shuí)讓它是 core 線程呢~
- workQueue :
工作隊(duì)列,這個(gè)隊(duì)列維護(hù)的是等待執(zhí)行的 Runnable 任務(wù)對(duì)象
常用的幾個(gè)隊(duì)列:LinkedBlockingQueue , ArrayBlockingQueue , SynchronousQueue , DelayQueue
大廠的編碼規(guī)范,相信各位都知道,并不建議使用 Executors ,最重要的一個(gè)原因就是:Executors 提供的很多方法默認(rèn)使用的都是無(wú)界的 LinkedBlockingQueue ,在高負(fù)載情況下,無(wú)界隊(duì)列很容易就導(dǎo)致 OOM ,而 OOM 會(huì)讓所有請(qǐng)求都無(wú)法處理,所以在使用時(shí),強(qiáng)烈建議使用有界隊(duì)列,因?yàn)槿绻闶褂玫氖怯薪珀?duì)列的話,當(dāng)線程數(shù)量太多時(shí),它會(huì)走拒絕策略
- threadFactory :
創(chuàng)建線程的工廠,用來(lái)批量創(chuàng)建線程的。如果不指定的話,就會(huì)創(chuàng)建一個(gè)默認(rèn)的線程工廠
- handler :
拒絕處理策略。在 workQueue 那里說(shuō)了,如果使用的是有界隊(duì)列,那么當(dāng)線程數(shù)量大于最大線程數(shù)的時(shí)候,拒絕處理策略就起到作用了
常用的有四種處理策略:
- AbortPolicy :默認(rèn)的拒絕策略,會(huì)丟棄任務(wù)并拋出 RejectedExecutionException 異常- CallerRunsPolicy :提交任務(wù)的線程,自己去執(zhí)行這個(gè)任務(wù)- DiscardOldestPolicy :直接丟棄新來(lái)的任務(wù),也沒(méi)有任何異常拋出- DiscardOldestPolicy :丟棄最老的任務(wù),然后將新任務(wù)加入到工作隊(duì)列中
默認(rèn)拒絕策略是 AbortPolicy ,會(huì) throw RejectedExecutionException 異常,但是這是一個(gè)運(yùn)行時(shí)異常,對(duì)于運(yùn)行時(shí)異常編譯器不會(huì)強(qiáng)制 catch 它,所以就會(huì)比較容易忽略掉錯(cuò)誤。
所以,如果線程池處理的任務(wù)非常重要,盡量自定義自己的拒絕策略
線程池的幾個(gè)狀態(tài)
在源碼中,能夠清楚地看到線程池有 5 種狀態(tài):
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
同時(shí),使用 AtomicInteger 修飾的變量 ctl 來(lái)控制線程池的狀態(tài),而 ctl 保存了 2 個(gè)變量:一個(gè)是 rs 即 runState ,線程池的運(yùn)行狀態(tài);一個(gè)是 wc 即 workerCount ,線程池中活動(dòng)線程的數(shù)量
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- 線程池創(chuàng)建之后就處于 RUNNING 狀態(tài)
- 調(diào)用 shutdown() 方法之后處于 SHUTDOWN 狀態(tài),此時(shí)線程池不再接受新的任務(wù),清除一些空閑 worker ,等待阻塞隊(duì)列的任務(wù)完成
- 調(diào)用 shutdownNow() 方法后處于 STOP 狀態(tài),此時(shí)線程池不再接受新的任務(wù),中斷所有的線程,阻塞隊(duì)列中沒(méi)有被執(zhí)行的任務(wù)也會(huì)被全部丟棄
- 當(dāng)線程池中執(zhí)行的任務(wù)為空時(shí),也就是此時(shí) ctl 的值為 0 時(shí),線程池會(huì)變?yōu)?TIDYING 狀態(tài),接下來(lái)會(huì)執(zhí)行 terminated() 方法
- 執(zhí)行完 terminated() 方法之后,線程池的狀態(tài)就由 TIDYING 轉(zhuǎn)到 TERMINATED 狀態(tài)
懵了?別急,有張圖呢~
線程池處理任務(wù)
execute
做到線程復(fù)用,肯定要先 execute 起來(lái)吧
線程池處理任務(wù)的核心方法是 execute ,大概思路就是:
- 如果 command 為 null ,沒(méi)啥說(shuō)的,直接拋出異常就完事兒了
- 如果當(dāng)前線程數(shù)小于 corePoolSize ,會(huì)新建一個(gè)核心線程執(zhí)行任務(wù)
- 如果當(dāng)前線程數(shù)不小于 corePoolSize ,就會(huì)將任務(wù)放到隊(duì)列中等待,如果任務(wù)排隊(duì)成功,仍然需要檢查是否應(yīng)該添加線程,所以需要重新檢查狀態(tài),并且在必要時(shí)回滾排隊(duì);如果線程池處于 running 狀態(tài),但是此時(shí)沒(méi)有線程,就會(huì)創(chuàng)建線程
- 如果沒(méi)有辦法給任務(wù)排隊(duì),說(shuō)明這個(gè)時(shí)候,緩存隊(duì)列滿了,而且線程數(shù)達(dá)到了 maximumPoolSize 或者是線程池關(guān)閉了,系統(tǒng)沒(méi)辦法再響應(yīng)新的請(qǐng)求,此時(shí)會(huì)執(zhí)行拒絕策略
來(lái)瞅瞅源碼具體是如何處理的:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- // 當(dāng)前線程數(shù)小于 corePoolSize 時(shí),調(diào)用 addWorker 創(chuàng)建核心線程來(lái)執(zhí)行任務(wù)
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 當(dāng)前線程數(shù)不小于 corePoolSize ,就將任務(wù)添加到 workQueue 中
- if (isRunning(c) && workQueue.offer(command)) {
- // 獲取到當(dāng)前線程的狀態(tài),賦值給 recheck ,是為了重新檢查狀態(tài)
- int recheck = ctl.get();
- // 如果 isRunning 返回 false ,那就 remove 掉這個(gè)任務(wù),然后執(zhí)行拒絕策略,也就是回滾重新排隊(duì)
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 線程池處于 running 狀態(tài),但是沒(méi)有線程,那就創(chuàng)建線程執(zhí)行任務(wù)
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 如果放入 workQueue 失敗,嘗試通過(guò)創(chuàng)建非核心線程來(lái)執(zhí)行任務(wù)
- // 如果還是失敗,說(shuō)明線程池已經(jīng)關(guān)閉或者已經(jīng)飽和,會(huì)拒絕執(zhí)行該任務(wù)
- else if (!addWorker(command, false))
- reject(command);
- }
在上面源碼中,判斷了兩次線程池的狀態(tài),為什么要這么做呢?
這是因?yàn)樵诙嗑€程環(huán)境下,線程池的狀態(tài)是時(shí)刻發(fā)生變化的,可能剛獲取線程池狀態(tài)之后,這個(gè)狀態(tài)就立刻發(fā)生了改變.如果沒(méi)有二次檢查的話,線程池處于非 RUNNING 狀態(tài)時(shí), command 就永遠(yuǎn)不會(huì)執(zhí)行
有點(diǎn)兒懵?阿粉都懂你,一張圖走起~
addWorker
從上面能夠看出來(lái),主要是 addWorker 方法
addWorker 主要是用來(lái)創(chuàng)建核心線程的,它主要的實(shí)現(xiàn)邏輯是:
- 判斷線程數(shù)量有沒(méi)有超過(guò)規(guī)定的數(shù)量,如果超過(guò)了就返回 false
- 如果沒(méi)有超過(guò),就會(huì)創(chuàng)建 worker 對(duì)象,并初始化一個(gè) Thread 對(duì)象,然后啟動(dòng)這個(gè)線程對(duì)象
接下來(lái)瞅瞅源碼:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- // 線程池狀態(tài) >= SHUTDOWN 時(shí),不再接受新的任務(wù),直接返回 false
- // 如果 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 同樣不接受新的任務(wù),返回 false
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // wc >= CAPACITY 說(shuō)明線程數(shù)不夠,所以就返回 false
- // wc >= (core ? corePoolSize : maximumPoolSize) 是在做判斷
- // 如果 core 為 true ,說(shuō)明要?jiǎng)?chuàng)建的線程是核心線程,接下來(lái)判斷 wc 是否大于 核心線程數(shù) ,如果大于返回 false
- // 如果 core 為 false ,說(shuō)明要?jiǎng)?chuàng)建的線程是非核心線程,接下來(lái)判斷 wc 是否大于 最大線程數(shù) ,如果大于返回 false
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- // CAS 操作增加 workerCount 的值,如果成功跳出循環(huán)
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // 判斷線程池狀態(tài)有沒(méi)有變化,如果有變化,則重試
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // workerCount 增加成功之后開(kāi)始走下面的代碼
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 創(chuàng)建一個(gè) worker 對(duì)象
- w = new Worker(firstTask);
- // 實(shí)例化一個(gè) Thread 對(duì)象
- final Thread t = w.thread;
- if (t != null) {
- // 接下來(lái)的操作需要加鎖進(jìn)行
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // 將任務(wù)線程添加到線程池中
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 啟動(dòng)任務(wù)線程,開(kāi)始執(zhí)行任務(wù)
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- // 如果任務(wù)線程啟動(dòng)失敗調(diào)用 addWorkerFailed
- // addWorkerFailed 方法里面主要做了兩件事:將該線程從線程池中移除;將 workerCount 的值減 1
- addWorkerFailed(w);
- }
- return workerStarted;
- }
Worker 類
在 addWorker 中,主要是由 Worker 類去做一些相應(yīng)處理, worker 繼承 AQS ,實(shí)現(xiàn) Runnable 接口
線程池維護(hù)的是 HashSet,一個(gè)由 worker 對(duì)象組成的 HashSet
- private final HashSet<Worker> workers = new HashSet<Worker>();
worker 繼承 AQS 主要是利用 AQS 獨(dú)占鎖機(jī)制,來(lái)標(biāo)識(shí)線程是否空閑;另外, worker 還實(shí)現(xiàn)了 Runnable 接口,所以它本身就是一個(gè)線程任務(wù),在構(gòu)造方法中創(chuàng)建了一個(gè)線程,線程的任務(wù)就是自己 this。thread = getThreadFactory().newThread(this);
咱們瞅瞅里面的源碼:
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- */
- private static final long serialVersionUID = 6138294804551838833L;
- // 處理任務(wù)的線程
- final Thread thread;
- // worker 傳入的任務(wù)
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- // 將 state 設(shè)為 -1 ,避免 worker 在執(zhí)行前被中斷
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 創(chuàng)建一個(gè)線程,來(lái)執(zhí)行任務(wù)
- this.thread = getThreadFactory().newThread(this);
- }
- /** Delegates main run loop to outer runWorker */
- public void run() {
- runWorker(this);
- }
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
- void interruptIfStarted() {
- Thread t;
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
runWorker
worker 類在執(zhí)行 run 方法時(shí),實(shí)際上調(diào)用的是 runWorker 方法
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 允許中斷
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 判斷 task 是否為空,如果不為空直接執(zhí)行
- // 如果 task 為空,調(diào)用 getTask() 方法,從 workQueue 中取出新的 task 執(zhí)行
- while (task != null || (task = getTask()) != null) {
- // 加鎖,防止被其他線程中斷
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- // 檢查線程池的狀態(tài),如果線程池處于 stop 狀態(tài),則需要中斷當(dāng)前線程
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 執(zhí)行 beforeExecute
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 執(zhí)行任務(wù)
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 執(zhí)行 afterExecute 方法
- afterExecute(task, thrown);
- }
- } finally {
- // 將 task 設(shè)置為 null ,循環(huán)操作
- task = null;
- w.completedTasks++;
- // 釋放鎖
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
在 runWorker 方法中,首先會(huì)去執(zhí)行創(chuàng)建這個(gè) worker 時(shí)就有的任務(wù),當(dāng)執(zhí)行完這個(gè)任務(wù)之后, worker 并不會(huì)被銷毀,而是在 while 循環(huán)中, worker 會(huì)不斷的調(diào)用 getTask 方法從阻塞隊(duì)列中獲取任務(wù)然后調(diào)用 task。run() 來(lái)執(zhí)行任務(wù),這樣就達(dá)到了復(fù)用線程的目的。通過(guò)循環(huán)條件 while (task != null || (task = getTask()) != null) 可以看出,只要 getTask 方法返回值不為 null ,就會(huì)一直循環(huán)下去,這個(gè)線程也就會(huì)一直在執(zhí)行,從而達(dá)到了線程復(fù)用的目的
getTask
咱們來(lái)看看 getTask 方法的實(shí)現(xiàn):
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- // allowCoreThreadTimeOut 變量默認(rèn)為 false ,也就是核心線程就算是空閑也不會(huì)被銷毀
- // 如果為 true ,核心線程在 keepAliveTime 內(nèi)是空閑的,就會(huì)被銷毀
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- // 如果運(yùn)行線程數(shù)大于最大線程數(shù),但是緩存隊(duì)列已經(jīng)空了,此時(shí)遞減 worker 數(shù)量
- // 如果有設(shè)置允許線程超時(shí)或者線程數(shù)量超過(guò)了核心線程數(shù)量,并且線程在規(guī)定時(shí)間內(nèi)沒(méi)有 poll 到任務(wù)并且隊(duì)列為空,此時(shí)也遞減 worker 數(shù)量
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 如果 timed 為 true ,會(huì)調(diào)用 workQueue 的 poll 方法
- // 超時(shí)時(shí)間為 keepAliveTime ,如果超過(guò) keepAliveTime 時(shí)長(zhǎng)的話, poll 就會(huì)返回 null
- // 如果返回為 null ,在 runWorker 中
- // while (task != null || (task = getTask()) != null) 循環(huán)條件被打破,從而跳出循環(huán),此時(shí)線程執(zhí)行完畢
- // 如果 timed 為 false ( allowCoreThreadTimeOut 為 false ,并且 wc > corePoolSize 為 false )
- // 會(huì)調(diào)用 workQueue 的 take 方法阻塞到當(dāng)前
- // 當(dāng)隊(duì)列中有任務(wù)加入時(shí),線程被喚醒, take 方法返回任務(wù),開(kāi)始執(zhí)行
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
源碼分析到這里就差不多清楚了
線程復(fù)用主要體現(xiàn)在 runWorker 方法中的 while 循環(huán)中,在 while 循環(huán)里面, worker 會(huì)不斷的調(diào)用 getTask 方法,而在 getTask 方法里,如果任務(wù)隊(duì)列中沒(méi)有了任務(wù),此時(shí)如果線程是核心線程則會(huì)一直卡在 workQueue。take 方法,這個(gè)時(shí)候會(huì)被阻塞并掛起,不會(huì)占用 CPU 資源,直到拿到任務(wù)然后返回 true , 此時(shí) runWorker 中得到這個(gè)任務(wù)來(lái)繼續(xù)執(zhí)行任務(wù),從而實(shí)現(xiàn)了線程復(fù)用
呦,沒(méi)想到吧,一不小心就看完了