你真的了解ForkJoinPool嗎?這些技巧讓你的代碼性能提升十倍!

1、線程池簡(jiǎn)介
線程池是一種常見(jiàn)的多線程編程方式,它可以有效地管理線程的創(chuàng)建、銷毀和復(fù)用,從而提高程序的性能和穩(wěn)定性。Java中提供了多種線程池實(shí)現(xiàn),包括ForkJoinPool、Executors、CompletionService等。
2、ForkJoinPool
ForkJoinPool是Java 7中新增的一種線程池實(shí)現(xiàn),它主要用于執(zhí)行大量的計(jì)算密集型任務(wù)。ForkJoinPool采用“工作竊取”算法,即當(dāng)某個(gè)線程的任務(wù)執(zhí)行完畢后,會(huì)從其他線程的任務(wù)隊(duì)列中竊取任務(wù)執(zhí)行,從而實(shí)現(xiàn)負(fù)載均衡。
以下是一個(gè)使用ForkJoinPool的示例代碼:
import java.util.concurrent.*;
public class ForkJoinPoolExample {
public static void main(String[] args) {
int n = 1000000;
int[] array = new int[n];
for (int i = 0; i < n; i++) {
array[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
int sum = pool.invoke(new SumTask(array, 0, n));
System.out.println("Sum: " + sum);
}
}
class SumTask extends RecursiveTask<Integer> {
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 1000) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}以上代碼創(chuàng)建了一個(gè)ForkJoinPool,用于計(jì)算一個(gè)長(zhǎng)度為1000000的數(shù)組的元素之和。其中,SumTask是一個(gè)繼承自RecursiveTask的任務(wù)類,用于將數(shù)組分成若干個(gè)子任務(wù)進(jìn)行計(jì)算。當(dāng)子任務(wù)的數(shù)量小于等于1000時(shí),直接計(jì)算子任務(wù)的結(jié)果;否則,將子任務(wù)分成兩個(gè)部分,分別交給左右兩個(gè)子任務(wù)進(jìn)行計(jì)算,最后將兩個(gè)子任務(wù)的結(jié)果相加。
3、Executors
Executors是Java中提供的一個(gè)線程池工具類,它可以方便地創(chuàng)建各種類型的線程池。Executors提供了多個(gè)靜態(tài)方法,用于創(chuàng)建不同類型的線程池,例如newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor等。
(1)newFixedThreadPool
newFixedThreadPool是一個(gè)固定大小的線程池,它會(huì)一直保持固定數(shù)量的線程,如果有新的任務(wù)提交,但線程池中的線程都在忙碌,那么新的任務(wù)就會(huì)進(jìn)入等待隊(duì)列中等待執(zhí)行。
以下是一個(gè)使用newFixedThreadPool創(chuàng)建線程池的示例代碼:
import java.util.concurrent.*;
public class FixedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task executed by " + Thread.currentThread().getName());
}
};
executor.execute(task);
}
executor.shutdown();
}
}以上代碼創(chuàng)建了一個(gè)固定大小為5的線程池,提交了20個(gè)任務(wù),并關(guān)閉了線程池。每個(gè)任務(wù)的執(zhí)行會(huì)輸出執(zhí)行線程的名稱。
(2)newCachedThreadPool
newCachedThreadPool是一個(gè)可緩存的線程池,它會(huì)根據(jù)需要?jiǎng)?chuàng)建新的線程,如果有線程空閑時(shí)間超過(guò)60秒,就會(huì)被回收。如果有新的任務(wù)提交,但線程池中的線程都在忙碌,那么就會(huì)創(chuàng)建新的線程來(lái)處理任務(wù)。
以下是一個(gè)使用newCachedThreadPool創(chuàng)建線程池的示例代碼:
import java.util.concurrent.*;
public class CachedThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task executed by " + Thread.currentThread().getName());
}
};
executor.execute(task);
}
executor.shutdown();
}
}以上代碼創(chuàng)建了一個(gè)可緩存的線程池,提交了20個(gè)任務(wù),并關(guān)閉了線程池。每個(gè)任務(wù)的執(zhí)行會(huì)輸出執(zhí)行線程的名稱。
(3)newSingleThreadExecutor
newSingleThreadExecutor是一個(gè)單線程的線程池,它會(huì)保證所有任務(wù)按照順序執(zhí)行,即每個(gè)任務(wù)都會(huì)在前一個(gè)任務(wù)執(zhí)行完畢后才會(huì)執(zhí)行。
以下是一個(gè)使用newSingleThreadExecutor創(chuàng)建線程池的示例代碼:
import java.util.concurrent.*;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task executed by " + Thread.currentThread().getName());
}
};
executor.execute(task);
}
executor.shutdown();
}
}以上代碼創(chuàng)建了一個(gè)單線程的線程池,提交了20個(gè)任務(wù),并關(guān)閉了線程池。每個(gè)任務(wù)的執(zhí)行會(huì)輸出執(zhí)行線程的名稱。
4、CompletionService
CompletionService是Java中提供的一個(gè)用于異步執(zhí)行任務(wù)的工具類,它可以方便地獲取已完成的任務(wù)的結(jié)果。CompletionService內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列,用于存儲(chǔ)已完成的任務(wù)的結(jié)果。
以下是一個(gè)使用CompletionService的示例代碼:
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < 10; i++) {
final int index = i;
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep((long) (Math.random() * 1000));
return index;
}
};
completionService.submit(task);
}
for (int i = 0; i < 10; i++) {
Future<Integer> future = completionService.take();
System.out.println("Result: " + future.get());
}
executor.shutdown();
}
}以上代碼創(chuàng)建了一個(gè)固定大小為5的線程池,提交了10個(gè)任務(wù),并使用CompletionService獲取已完成的任務(wù)的結(jié)果。每個(gè)任務(wù)會(huì)隨機(jī)休眠一段時(shí)間,然后返回任務(wù)的編號(hào)。在主線程中,使用completionService.take()方法獲取已完成的任務(wù)的結(jié)果,并輸出任務(wù)的編號(hào)。
5、Callable和Future
Callable是Java中提供的一個(gè)接口,它類似于Runnable接口,但是可以返回執(zhí)行結(jié)果。Future是Java中提供的一個(gè)接口,它可以用于獲取異步執(zhí)行任務(wù)的結(jié)果。
以下是一個(gè)使用Callable和Future的示例代碼:
import java.util.concurrent.*;
public class CallableAndFutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1 + 2;
}
};
Future<Integer> future = executor.submit(task);
System.out.println("Result: " + future.get());
executor.shutdown();
}
}以上代碼創(chuàng)建了一個(gè)單線程的線程池,提交了一個(gè)任務(wù),并使用Future獲取任務(wù)的執(zhí)行結(jié)果。任務(wù)會(huì)休眠1秒鐘,然后返回1+2的結(jié)果。在主線程中,使用future.get()方法獲取任務(wù)的執(zhí)行結(jié)果,并輸出結(jié)果。
6、Runnable和Thread
Runnable是Java中提供的一個(gè)接口,它表示一個(gè)可以被線程執(zhí)行的任務(wù)。Thread是Java中提供的一個(gè)類,它表示一個(gè)線程。
以下是一個(gè)使用Runnable和Thread的示例代碼:
public class RunnableAndThreadExample {
public static void main(String[] args) throws InterruptedException {
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task executed by " + Thread.currentThread().getName());
}
};
Thread thread = new Thread(task);
thread.start();
thread.join();
}
}以上代碼創(chuàng)建了一個(gè)任務(wù),并使用Thread將任務(wù)提交到一個(gè)新的線程中執(zhí)行。在主線程中,使用thread.join()方法等待新線程執(zhí)行完畢。每個(gè)任務(wù)的執(zhí)行會(huì)輸出執(zhí)行線程的名稱。
7、總結(jié)
本文介紹了Java中常見(jiàn)的線程池實(shí)現(xiàn),包括ForkJoinPool、Executors、CompletionService、Callable、Future、Runnable等知識(shí)點(diǎn)的詳細(xì)講解和完整可運(yùn)行的代碼示例。線程池是Java中常見(jiàn)的多線程編程方式,它可以有效地管理線程的創(chuàng)建、銷毀和復(fù)用,從而提高程序的性能和穩(wěn)定性。在實(shí)際開(kāi)發(fā)中,需要根據(jù)具體的需求選擇合適的線程池實(shí)現(xiàn)。
































