如何手寫一個(gè)線程池?
手寫一個(gè)異步工具類
我是小識,新來了一個(gè)公司。這個(gè)公司呢,有個(gè)特點(diǎn),就是很鼓勵(lì)大家封裝各種實(shí)用的工具類,提高開發(fā)效率。
于是我就到處看項(xiàng)目的源碼,看看有沒有什么能改進(jìn)的?果然讓我發(fā)現(xiàn)了。項(xiàng)目中到處充斥著 new Thread 類來異步執(zhí)行代碼的邏輯。
new Thread(r).start();
我們可以封裝一個(gè)異步工具類??!
第一版
說干就干,把上面的代碼簡單封裝一下,一個(gè)簡單的異步工具類就封裝好了。
public interface Executor {
void execute(Runnable r);
}
public class AsyncExecutorV1 implements Executor {
@Override
public void execute(Runnable r) {
new Thread(r).start();
}
}
于是開開心心的提交了 merge request。
第二版
正當(dāng)我滿懷期待工具類代碼能被合并的時(shí)候,沒想代碼被組長杰哥打回來了。
「杰哥」:有心封裝工具類值得鼓勵(lì),不過還可以改進(jìn)一下。
「小識」:還能再改進(jìn)?沒感覺我這個(gè)工具類還有改進(jìn)的余地啊!
「杰哥」:假如說有10000個(gè)異步任務(wù),你這創(chuàng)建10000個(gè)線程,資源耗費(fèi)太嚴(yán)重了!
「小識」:這樣啊,那我加個(gè)隊(duì)列,任務(wù)都放到隊(duì)列中,用一個(gè)線程從隊(duì)列中取任務(wù)執(zhí)行。
public class AsyncExecutorV2 implements Executor {
private BlockingQueue<Runnable> workQueue;
public AsyncExecutorV2(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
WorkThread workThread = new WorkThread();
workThread.start();
}
@SneakyThrows
@Override
public void execute(Runnable r) {
workQueue.add(r);
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
第三版
「小識」:杰哥,快幫我看看,還有啥改進(jìn)的沒?
「杰哥」:小伙子不錯(cuò)啊,居然能想到用隊(duì)列來緩沖任務(wù),不愧是我招進(jìn)來的人!但是用一個(gè)異步線程執(zhí)行任務(wù),你確定這個(gè)工具類比同步執(zhí)行的效率快?
「小識」:哈哈,又一個(gè)工具類翻車的案例,應(yīng)該多開幾個(gè)異步線程來執(zhí)行任務(wù),但是應(yīng)該開多少呢?
「杰哥」:誰最清楚異步工具類應(yīng)該用多少個(gè)線程來執(zhí)行呢?
「小識」:使用工具類的人。
「杰哥」:這不對了,你可以定義一個(gè)線程數(shù)量參數(shù),讓用戶來決定開多少線程。「另外你這個(gè)工具類還個(gè)問題,隊(duì)列滿了會直接拋出異常!」
「小識」:那我增加一個(gè)拒絕策略類(RejectedExecutionHandler),當(dāng)線程池滿了讓用戶決定執(zhí)行策略,比如直接拋異常,用當(dāng)前線程同步執(zhí)行任務(wù)。
public class AsyncExecutorV3 implements Executor {
private BlockingQueue<Runnable> workQueue;
private List<WorkThread> workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV3(int corePoolSize,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
WorkThread workThread = new WorkThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
// 隊(duì)列滿了,執(zhí)行拒絕策略
handler.rejectedExecution(r);
}
}
class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
}
// 拒絕策略類
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r);
}
// 當(dāng)線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當(dāng)線程池滿了之后直接拋出異常
public class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
throw new RuntimeException("queue is full");
}
}
// 當(dāng)線程池滿了之后,用提交任務(wù)的線程同步執(zhí)行任務(wù)
public class CallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r) {
r.run();
}
}
再次提交 merge request,終于被合并了,別的團(tuán)隊(duì)都開始使用我的工具類了!
過了幾天小亮急匆匆找到我。
「小亮」:小識,你的工具類挺好用的。但是我最近遇到了一個(gè)問題,我用了CountDownLatch批量執(zhí)行任務(wù),但是我這個(gè)任務(wù)好像卡住了,我用jstack想看看線程的執(zhí)行情況,快告訴我你異步線程的名字設(shè)置的是啥?
「小識」:哎呀,我們沒設(shè)置線程的名字,應(yīng)該用的是默認(rèn)的線程名字 Thread-n。
「小亮」:你可得給工具類加個(gè)線程名字的參數(shù)啊,不然一個(gè)一個(gè)看線程的狀態(tài)太累了,而且效率也不高。

「小識」:我這就加。
第四版
趕緊加了一個(gè)線程名字的參數(shù),然后再次提交代碼。
「杰哥」:哎呀,沒想到我也疏忽了,沒發(fā)現(xiàn)這個(gè)問題,確實(shí)應(yīng)該加個(gè)線程名字的參數(shù),代碼的可擴(kuò)展性太重要了,改來改去可不行。
「小識」:是啊!
「杰哥」:你覺得你只加一個(gè)線程名字參數(shù),可擴(kuò)展性高嗎?如果有的團(tuán)隊(duì)想修改異步線程的優(yōu)先級,你再加個(gè)優(yōu)先級參數(shù)?
「小識」:感覺不太行,那讓用戶把線程傳給我吧!
「杰哥」:哈哈,可以,你還可以用工廠模式優(yōu)化一下,用戶傳入線程工廠類,工具類用工廠類創(chuàng)建線程。
「小識」:不愧是杰哥,這樣一來代碼更清爽了!
public class AsyncExecutorV4 implements Executor {
private BlockingQueue<Runnable> workQueue;
private List<WorkThread> workThreadList = new ArrayList<>();
private RejectedExecutionHandler handler;
public AsyncExecutorV4(int corePoolSize,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
ThreadFactory threadFactory) {
this.workQueue = workQueue;
this.handler = handler;
for (int i = 0; i < corePoolSize; i++) {
// 用工廠類創(chuàng)建線程
WorkThread workThread = threadFactory.newThread();
workThread.start();
workThreadList.add(workThread);
}
}
@SneakyThrows
@Override
public void execute(Runnable r) {
if (!workQueue.offer(r)) {
handler.rejectedExecution(r);
}
}
// 異步線程
public class WorkThread extends Thread {
@Override
public void run() {
while (true) {
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
task.run();
}
}
}
// 異步線程工廠類
public interface ThreadFactory {
WorkThread newThread();
}
}
代碼提交之后,小亮給線程起了一個(gè)名字,async-thread,現(xiàn)在他通過名字很快就能知道線程池中的線程在干嘛了!

大家不斷的進(jìn)行改進(jìn)
隨著這個(gè)異步工具類在公司內(nèi)部使用的越來越多,大家也提交了很多改進(jìn)的代碼。
- 按需創(chuàng)建線程,不要一開始就創(chuàng)建「corePoolSize」個(gè)線程,而是在調(diào)用者提交任務(wù)的過程中逐漸創(chuàng)建出來,最后創(chuàng)建了「corePoolSize」個(gè)就不再創(chuàng)建了。
- 提高工具的彈性,當(dāng)任務(wù)突增時(shí),隊(duì)列會被放滿,然后多余的任務(wù)有可能會被直接扔掉。當(dāng)然我們可以把「corePoolSize」設(shè)的很大,但是這樣并不優(yōu)雅,因?yàn)榇蟛糠智闆r下是用不到這么多線程的。當(dāng)任務(wù)突增時(shí),我們可以適當(dāng)增加線程,提高執(zhí)行速度,當(dāng)然創(chuàng)建的總線程數(shù)還是要限制一下的,我們把能創(chuàng)建的總數(shù)定為「maximumPoolSize」。
- 及時(shí)關(guān)閉不需要的線程,當(dāng)任務(wù)突增時(shí),線程數(shù)可能增加「maximumPoolSize」,但是大多數(shù)時(shí)間「corePoolSize」個(gè)線程就足夠用了,因此可以定義一個(gè)超時(shí)時(shí)間,當(dāng)一個(gè)線程在「keepAliveTime」時(shí)間內(nèi)沒有執(zhí)行任務(wù),就把它給關(guān)掉。
異步工具類執(zhí)行流程圖
經(jīng)過大家的不斷改進(jìn)之后,構(gòu)造函數(shù)中的參數(shù)也越來越多了,杰哥讓我寫個(gè)文檔吧,把這個(gè)異步工具類的構(gòu)造函數(shù)和執(zhí)行流程總結(jié)一下,不然新來的小伙伴看到這個(gè)工具類一臉懵可不行!
這個(gè)工具類的構(gòu)造函數(shù)目前有如下7個(gè)參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數(shù) | 含義 |
corePoolSize | 核心線程數(shù) |
maximumPoolSize | 最大線程數(shù) |
keepAliveTime | 非核心線程的空閑時(shí)間 |
TimeUnit | 空閑時(shí)間的單位 |
BlockingQueue<Runnable> | 任務(wù)隊(duì)列 |
ThreadFactory | 線程工廠 |
RejectedExecutionHandler | 拒絕策略 |
「執(zhí)行流程圖如下」:

對了,最后大家給這個(gè)異步工具類起了一個(gè)牛的名字,「線程池」。



























