關(guān)于 ForkJoinPool 線程池詳解!
ForkJoinPool是Java 7 引入的一種線程池實(shí)現(xiàn),專門用于支持“大規(guī)模并行”任務(wù)的執(zhí)行。那么,它和普通的線程池(ThreadPoolExecutor)有什么本質(zhì)的區(qū)別呢?這篇文章我們將深入探討 Fork/Join 框架的工作原理。

一、Fork/Join 框架簡介:
Fork/Join 框架是一種并行計(jì)算框架,設(shè)計(jì)目的是提高具有遞歸性質(zhì)任務(wù)的執(zhí)行速度。典型的任務(wù)是將問題逐步分解成較小的任務(wù),直到每一個子任務(wù)足夠簡單可以直接解決,然后再將結(jié)果聚合起來。
工作原理
Fork/Join 框架基于"工作竊取"算法 (Work Stealing Algorithm),該算法的核心思想是每個工作線程有自己的任務(wù)隊(duì)列(雙端隊(duì)列, Deque)。當(dāng)一個線程完成了自己隊(duì)列中的任務(wù)時,便會竊取其他線程隊(duì)列中的任務(wù)執(zhí)行,這樣就不會因?yàn)槟硞€線程在等待而浪費(fèi) CPU 資源。
具體的工作原理如下:
- 任務(wù)拆分:框架會將任務(wù)遞歸地拆分成更小的任務(wù),分別放入不同的隊(duì)列。
 - 工作竊取:每個線程都嘗試從隊(duì)列中取任務(wù)執(zhí)行。當(dāng)一個線程完成了自己的任務(wù)隊(duì)列后會嘗試隨機(jī)從其他隊(duì)列拿任務(wù)繼續(xù)執(zhí)行,保證 CPU 資源盡可能地不閑置。
 - 任務(wù)合并:線程在執(zhí)行完任務(wù)后,會嘗試合并(Join)這些任務(wù)的結(jié)果,直到獲得最終結(jié)果。
 
二、ForkJoin Pool 核心組件
ForkJoin 框架是由以下 3個重要組件組成的:
- ForkJoinPool
 - ForkJoinTask
 - RecursiveTask & RecursiveAction
 
1.ForkJoinPool
ForkJoinPool 是整個框架的核心,它是一個線程池,負(fù)責(zé)調(diào)度和分發(fā)任務(wù)。內(nèi)部雖然類似于 ThreadPoolExecutor,但是與普通線程池有顯著的不同:
- 工作竊取機(jī)制:每個工作者線程會有自己的任務(wù)隊(duì)列,并且工作者線程可以相互“偷竊”任務(wù)。
 - 任務(wù)分解與合并:該池在運(yùn)行時會遞歸地分割大任務(wù),并使其盡量并行化。
 - 最優(yōu)并發(fā)級別:默認(rèn)情況下,它與 CPU 核心線程數(shù)量相同,確保最大限度地利用多核 CPU。
 
ForkJoinPool 具有兩種模式:
- 普通模式:適用于簡單任務(wù)的并行拆分和合并。
 - 自定義模式:通過提供特定的策略,可以更靈活地控制任務(wù)執(zhí)行的過程與行為。
 
2.ForkJoinTask
ForkJoinTask是 Fork/Join 框架中的基礎(chǔ)任務(wù)對象。ForkJoinTask 是一個抽象類,它提供了 fork 和 join 這兩個關(guān)鍵的操作。在具體使用過程中,一般情況下我們不會直接使用它,而是使用它的兩個子類:
- RecursiveTask: 適用于有返回值的任務(wù)。
 - RecursiveAction: 適用于無返回值的任務(wù)。
 
fork() 和 join()
- fork(): 將任務(wù)提交給線程池,讓線程池執(zhí)行任務(wù)。
 - join(): 等待任務(wù)執(zhí)行完成,并獲取任務(wù)結(jié)果。
 
下面,我們通過一個簡單的示例進(jìn)行說明:
假設(shè)有一個遞歸算法任務(wù),可以通過 RecursiveTask 實(shí)現(xiàn)這樣一個任務(wù):
class FibonacciTask extends RecursiveTask<Integer> {
    private final int n;
    FibonacciTask(int n) {
        this.n = n;
    }
    @Override
    protected Integer compute() {
        if (n <= 1) {
            return n;
        }
        FibonacciTask f1 = new FibonacciTask(n - 1);
        FibonacciTask f2 = new FibonacciTask(n - 2);
        f1.fork(); // 異步執(zhí)行
        return f2.compute() + f1.join(); // 等待結(jié)果并合并
    }
}
public class ForkJoinExample {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciTask task = new FibonacciTask(10);
        System.out.println(pool.invoke(task)); // 輸出 Fibonacci(10) 的結(jié)果:55
    }
}在上面的例子中,F(xiàn)ibonacciTask是一個遞歸計(jì)算斐波那契數(shù)列的任務(wù),使用了fork()將遞歸任務(wù)分解并提交給ForkJoinPool,然后通過join()合并結(jié)果。
3.RecursiveTask & RecursiveAction
- RecursiveTask: 適合有返回值的遞歸任務(wù)。
 - RecursiveAction: 適合無返回值的遞歸任務(wù),比如可以用于文件或者目錄的遍歷操作,在這種場景中任務(wù)只是執(zhí)行不需要有返回結(jié)果。
 
RecursiveTask 和 RecursiveAction 都是 ForkJoinTask 的子類,設(shè)計(jì)上它們旨在有效地利用多核處理器分而治之,提升計(jì)算速度。
RecursiveAction 示例:
class ArrayTransformAction extends RecursiveAction {
    private final int[] arr;
    private final int start, end;
    ArrayTransformAction(int[] arr, int start, int end) {
        this.arr = arr;
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        if (end - start <= 10) {
            // 當(dāng)任務(wù)足夠小直接計(jì)算
            for (int i = start; i < end; i++) {
                arr[i] *= 2; // 假定簡單的任務(wù):每一個數(shù)字乘以 2
            }
        } else {
            // 任務(wù)切分
            int middle = (start + end) / 2;
            ArrayTransformAction task1 = new ArrayTransformAction(arr, start, middle);
            ArrayTransformAction task2 = new ArrayTransformAction(arr, middle, end);
            invokeAll(task1, task2); // 并行處理兩個子任務(wù)
        }
    }
}在這個例子中,ArrayTransformAction 是一個無返回值的遞歸任務(wù),利用 ForkJoinPool 執(zhí)行可以使代碼有效利用多核 CPU 并行處理任務(wù)。
三、與普通線程池對比
任務(wù)分解:
- 普通線程池(如ThreadPoolExecutor)通常用于處理相對獨(dú)立的任務(wù),每個任務(wù)通常不會再被拆分。
 - ForkJoinPool則專注于可以遞歸拆分的任務(wù)。
 
工作竊?。?/p>
- 普通線程池沒有實(shí)現(xiàn)工作竊取機(jī)制,這意味著如果一個線程完成了任務(wù),它可能會閑置。
 - ForkJoinPool通過工作竊取算法,確保線程在完成自己的任務(wù)后可以繼續(xù)從其他線程中獲取任務(wù),提高了資源利用率。
 
線程管理:
- 普通線程池可以根據(jù)配置動態(tài)調(diào)整線程的數(shù)量。
 - ForkJoinPool通常在初始化時確定線程數(shù)量,通常設(shè)置為等于或略大于可用處理器的數(shù)量。
 
適用場景:
- 普通線程池適用于需要處理大量獨(dú)立任務(wù)的場景,如Web服務(wù)器處理請求。
 - ForkJoinPool適用于需要處理大規(guī)模數(shù)據(jù)并可以分解為子任務(wù)的場景。
 
四、使用場景
Fork/Join 框架非常適合以下這些工作負(fù)載:
- 遞歸任務(wù):如斐波那契數(shù)列、歸并排序等分治算法。
 - 大規(guī)模數(shù)據(jù)處理:快速對集合、數(shù)組等進(jìn)行并行操作。
 - 圖像處理:圖像處理等數(shù)據(jù)量大的任務(wù)可以被分成多個小任務(wù)并行處理。
 
此外,F(xiàn)ork/Join 在某些場景下的效率甚至優(yōu)于類似的 MapReduce 計(jì)算框架。對 Java 并行流 (Stream API parallelism) 的支持也使用了 ForkJoin 框架,因此在 Java Stream 中進(jìn)行并行處理的場景中,底層就是通過 Fork/JoinPool 來處理的。
五、注意事項(xiàng)
對于每種線程池都有其擅長的領(lǐng)域,同時存在局限性,對于ForkJoinPool也一樣,因此,在實(shí)際使用中,我們應(yīng)該注意以下事項(xiàng):
1. 控制任務(wù)粒度
如果 Fork/Join 任務(wù)拆分得過于細(xì)小,會導(dǎo)致過多的上下文切換及不必要的線程創(chuàng)建消耗性能,通常建議其中的任務(wù)不到一個門檻便停止分裂。你可以根據(jù)任務(wù)執(zhí)行時間、負(fù)載平衡等條件,動態(tài)地設(shè)置任務(wù)分解的閾值。
2. 避免 IO 密集型任務(wù)
ForkJoin 優(yōu)化了 CPU 密集型任務(wù)。而包含大量 IO 操作的任務(wù),容易導(dǎo)致線程阻塞, Fork/Join 效率并不高。因此,對于 IO 密集型任務(wù),推薦使用傳統(tǒng)的線程池來控制線程數(shù)量和資源分配,而避免使用 Fork/Join。
3. CPU 核心數(shù)的考量
ForkJoinPool 的默認(rèn)并行度是 Runtime.getRuntime().availableProcessors(),即根據(jù) CPU 核心數(shù)來確定并行度。這符合 CPU 密集型任務(wù)的特點(diǎn)。但你也可以自定義 ForkJoinPool 的并行度。
4. 異常處理
在 Fork/Join 框架中,所有提交到池中的任務(wù)都是 ForkJoinTask 的子類,我們應(yīng)當(dāng)注意捕獲異常防止任務(wù)執(zhí)行中止。測試和異常處理可以通過提供自定義的方法鉤子來協(xié)助調(diào)試。
六、總結(jié)
這篇文章,我們詳細(xì)地分析了 ForkJoinPool線程池,F(xiàn)ork/Join 是專為遞歸分治設(shè)計(jì)的,它充分了利用了現(xiàn)代多核計(jì)算能力和工作竊取算法,為某個任務(wù)的并行化提供了高效的解決方案。但是,需要特別注意,F(xiàn)ork/Join 更適用于 CPU 密集型任務(wù),對于 IO 密集型任務(wù),其表現(xiàn)不一定理想。因此,實(shí)際工作中,對于Java提供的 ThreadPoolExecutor和ForkJoinPool線程池,一定要選擇合適的適用場景。















 
 
 










 
 
 
 