Java 并發(fā)編程優(yōu)雅中斷
我們通過(guò)并發(fā)編程提升了系統(tǒng)的吞吐量,同時(shí)我們也希望并發(fā)運(yùn)行的線程能夠及時(shí)停止并做好資源歸納,所以筆者就借此文來(lái)談?wù)凧ava并發(fā)編程中線程中斷的藝術(shù)。

一、詳解Java中斷的哲學(xué)
1. 何時(shí)觸發(fā)中斷阻塞
按照操作系統(tǒng)對(duì)于線程的任務(wù)調(diào)度管理來(lái)說(shuō),當(dāng)觸發(fā)以下幾種情況時(shí)線程就會(huì)阻塞而處于BLOCKED、WAITING、TIMED_WAITING幾種狀態(tài): 第一種是線程執(zhí)行IO請(qǐng)求,在等待IO資源返回,觸發(fā)阻塞,此時(shí)線程就處于BLOCKED狀態(tài),例如服務(wù)端server執(zhí)行serverSocket.accept()等待就緒的客戶端接入:

第二種則是等待條件為真期間,線程因此掛起等待notify通知或者通過(guò)sleep休眠,進(jìn)而處于WAITING或者TIMED_WAITING:
new Thread(() -> ThreadUtil.sleep(3600), "t-0").start();因?yàn)椴l(fā)互斥原因,線程需要等待其它線程釋放監(jiān)視鎖而進(jìn)入BLOCKED阻塞態(tài):

2. Java是如何響應(yīng)中斷的
在操作系統(tǒng)中,線程的中斷方式一般分為以下兩種:
- 搶占式:當(dāng)線程需要中斷時(shí),直接強(qiáng)制讓線程立刻停止手里的任務(wù)
- 協(xié)作式:當(dāng)線程需要中斷時(shí),通過(guò)標(biāo)識(shí)告知線程需要被中斷,線程輪詢檢查時(shí)看到這個(gè)標(biāo)識(shí)就會(huì)直接中斷
而Java線程則是采用協(xié)作式中斷,即調(diào)用interrupt時(shí)其底層僅僅是將線程設(shè)置為可中斷狀態(tài),等到線程主動(dòng)檢查到線程標(biāo)識(shí)被設(shè)置為中斷時(shí),則觸發(fā)InterruptedException:

對(duì)應(yīng)的我們以Linux為例給出JDK底層關(guān)于線程中斷函數(shù)interrupt的實(shí)現(xiàn),即位于os_linux.cpp的interrupt方法,可以看到其底層本質(zhì)上就是定位到j(luò)ava線程對(duì)應(yīng)的os線程并將其中斷標(biāo)識(shí)設(shè)置為true:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
//設(shè)置os線程中斷標(biāo)識(shí)為true
osthread->set_interrupted(true);
//......
}
//......
}同時(shí)我們也給出處于sleep休眠狀態(tài)的線程響應(yīng)中斷的源碼,同樣是以Linux為例的線程封裝類os_linux.cpp下的sleep函數(shù),可以看到進(jìn)行休眠時(shí)其底層在明確知曉線程可被中斷的時(shí)候,就會(huì)在for循環(huán)中知曉休眠并定期檢查可中斷狀態(tài):
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
//......
if (interruptible) {
jlong prevtime = javaTimeNanos();
for (;;) {
//循環(huán)中感知到中斷直接返回OS_INTRPT標(biāo)識(shí)
if (os::is_interrupted(thread, true)) {
return OS_INTRPT;
}
}
} else {
//......
}
}3. 線程中斷的守則
通常來(lái)說(shuō)我們對(duì)線程中斷響應(yīng)度越高,就越容易處理并優(yōu)雅的完成兜底動(dòng)作,一般來(lái)說(shuō),在處理線程中斷時(shí)一般會(huì)出現(xiàn)如下兩種情況:
- 當(dāng)前代碼層面對(duì)象實(shí)例不具備處理該中斷異常
- 處于線程內(nèi)部的run方法感知到中斷無(wú)法向上拋出
針對(duì)情況1,本質(zhì)上就是權(quán)責(zé)上的轉(zhuǎn)移,如果當(dāng)前業(yè)務(wù)層面不具備處理此類異常的能力,那么就將異常向上層拋出傳遞給上層使用者:
public void sleep(int seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}而情況2則相對(duì)麻煩一些,如果類似于Runnable這種無(wú)法向上拋出的內(nèi)置接口類的實(shí)現(xiàn),我們則可以主動(dòng)去捕獲中斷中斷異常,并將在完成必要的資源清理工作后,將當(dāng)前線程打斷從而讓高層棧幀感知到這個(gè)異常中斷:
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//執(zhí)行當(dāng)前線程資源清理
//打斷當(dāng)前線程引發(fā)更高層線程響應(yīng)此中斷
Thread.currentThread().interrupt();
}
}
}二、Java線程中斷處理的一些實(shí)踐
1. 基于標(biāo)識(shí)取消任務(wù)
我們先來(lái)說(shuō)說(shuō)基于自定義標(biāo)識(shí)的方式中斷線程,即非java內(nèi)置方法層面的協(xié)作式標(biāo)識(shí)來(lái)停止線程,通過(guò)任務(wù)運(yùn)行時(shí)輪詢檢測(cè),一旦線程輪詢檢查看到中斷標(biāo)識(shí)設(shè)置為true,則直接結(jié)束運(yùn)行:

對(duì)應(yīng)的我們給出自定義協(xié)作式中斷的實(shí)現(xiàn),整體思路為:
- 采用cancelled標(biāo)識(shí)中斷狀態(tài),并用volatile保證可見(jiàn)性
- 內(nèi)置初始化一個(gè)執(zhí)行線程thread
- 對(duì)外暴露start方法,執(zhí)行線程啟動(dòng)
- 對(duì)外暴露cancel方法修改線程中斷狀態(tài)
- run方法執(zhí)行業(yè)務(wù)邏輯,并定期輪詢檢查中斷標(biāo)識(shí),一旦標(biāo)識(shí)被設(shè)置為true則退出循環(huán),結(jié)束線程
public class Task implements Runnable {
/**
* 使用volatile修飾保證標(biāo)識(shí)修改可見(jiàn)
*/
private volatile boolean cancelled = false;
private final Thread thread = new Thread(this);
public void start() {
thread.start();
}
/**
* 停止時(shí),通過(guò)cancel請(qǐng)求取消
*/
public void cancel() {
cancelled = true;
}
@Override
public void run() {
//取消標(biāo)識(shí)檢測(cè),如果取消則直接結(jié)束循環(huán)
while (!cancelled) {
System.out.println("running");
ThreadUtil.sleep(1000);
}
System.out.println("task cancelled");
}
}對(duì)應(yīng)的我們也給出這種方式的使用示例,可以看到我們的測(cè)試代碼會(huì)在5s后調(diào)用task暴露的任務(wù)取消方法完成線程中斷:
//線程啟動(dòng)運(yùn)行5s
Task task = new Task();
task.start();
//休眠5s后將task任務(wù)對(duì)應(yīng)線程中斷
new Thread(()->{
ThreadUtil.sleep(5000);
task.cancel();
}).start();而執(zhí)行的輸出結(jié)果如下,是符合我們預(yù)期的:
running
running
running
running
running
running
task cancelled當(dāng)然這種做法也存在一定的弊端,即帶有阻塞性質(zhì)的操作,任務(wù)可能出現(xiàn)永遠(yuǎn)無(wú)法檢查取消標(biāo)志,例如我們的線程在循環(huán)往阻塞阻塞隊(duì)列blockingQueue的put添加元素,一旦隊(duì)列空間達(dá)到容器上界,當(dāng)前線程就會(huì)阻塞即無(wú)法執(zhí)行到循環(huán)分支上:

對(duì)應(yīng)我們也給出這段錯(cuò)誤的樣例,即阻塞隊(duì)列添加操作后阻塞而走不到循環(huán)判斷:
private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
@Override
public void run() {
//取消標(biāo)識(shí)檢測(cè),如果取消則直接結(jié)束循環(huán)
while (!cancelled) {
System.out.println("running");
try {
queue.put(RandomUtil.randomInt(10));
} catch (InterruptedException e) {
//......
}
}
System.out.println("task cancelled");
}測(cè)試代碼還是和上一小節(jié)一致,不多贅述,測(cè)試輸出結(jié)果如下,即第二次添加操作時(shí)發(fā)現(xiàn)隊(duì)列已滿而阻塞從而無(wú)法打斷:
running
running2. 如何處理阻塞式的中斷
參考java內(nèi)置方法Thread.sleep(1000);或者wait()方法,其底層都會(huì)針對(duì)外部中斷操作做檢測(cè),一旦感知到中斷就會(huì)提前返回,即執(zhí)行如下步驟:
- 清除中斷狀態(tài)
- 拋出InterruptedException讓被中斷線程感知異常
所以對(duì)于阻塞式中斷的的正確方式為:
通過(guò)合理的時(shí)機(jī)發(fā)出中斷請(qǐng)求,讓線程在下一個(gè)合適時(shí)候處理中斷
所以對(duì)于上述阻塞隊(duì)列操作來(lái)說(shuō),可以按照如下方式進(jìn)行線程優(yōu)雅中斷:
- 對(duì)外暴露interrupt方法打斷當(dāng)前線程,確保阻塞隊(duì)列插入阻塞時(shí)依然可以利用內(nèi)置方法完成線程打斷
- 線程感知中斷時(shí)不可直接拋出異常,而是利用異常捕獲將資源處理清楚,再次執(zhí)行中斷循環(huán)監(jiān)測(cè),正常退出線程邏輯
對(duì)應(yīng)我們給出改造后的代碼,可以看到我們將cancel改為調(diào)用線程的中斷方法將線程中斷,同時(shí)在感知到中斷異常時(shí)會(huì)將執(zhí)行中斷后的兜底邏輯:
/**
* 停止時(shí),通過(guò)cancel請(qǐng)求取消
*/
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
//取消標(biāo)識(shí)檢測(cè),如果取消則直接結(jié)束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運(yùn)行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
//處理中斷
}
}
}3. 合理的中斷策略
筆者在上面的文章中對(duì)于拋出的異常給出了一段todo的伏筆,這里我們就來(lái)說(shuō)說(shuō)線程面對(duì)中斷異常后響應(yīng)的哲學(xué)。一般來(lái)說(shuō),線程級(jí)或者服務(wù)級(jí)的中斷策略為:
- 盡快的退出
- 必要時(shí)完成手頭任務(wù)的清理
這也就是為什么java中各種并發(fā)包的類庫(kù)對(duì)于中斷的任務(wù)僅僅是拋出InterruptedException而不是直接處理掉中斷 ,例如ArrayBlockingQueue的put方法:
//將任務(wù)中的中斷InterruptedException 丟給調(diào)用棧的上層代碼執(zhí)行
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//上可打斷的鎖
lock.lockInterruptibly();
try {
//......
} finally {
//1. 釋放鎖
lock.unlock();
}
}而中斷策略的響應(yīng),正確的做法應(yīng)該是讓執(zhí)行該任務(wù)的線程進(jìn)行按照如下原則進(jìn)行處理:
- 如果不具備處理的能力,則將異常向上傳遞
- 如果無(wú)法傳遞異常則顯示拋出中斷讓上層的調(diào)用棧感知。
以我們Task生產(chǎn)者代碼為例,我們將提交給線程即哪些繼承runnable或者lambda表達(dá)式統(tǒng)稱為任務(wù),一般來(lái)說(shuō)持有這些任務(wù)的線程不一定是這些任務(wù)的執(zhí)行者,它們僅擁有對(duì)于任務(wù)狀態(tài)管理的一些權(quán)限,例如一個(gè)主線程main方法調(diào)用thread-0異步執(zhí)行阻塞隊(duì)列存取操作:

所以從任務(wù)的維度來(lái)說(shuō),執(zhí)行任務(wù)的線程應(yīng)該小心的保存中斷的狀態(tài),即在面對(duì)中斷時(shí),它們不應(yīng)該對(duì)中斷進(jìn)行任何的干預(yù),而是讓擁有線程的代碼段做出正確的響應(yīng),即讓thread-0感知到中斷異常然后將狀態(tài)狀態(tài)還原向上傳遞:

對(duì)應(yīng)的我們也給出阻塞存儲(chǔ)元素的優(yōu)雅中斷處理:
- 輪詢檢測(cè)本地中斷標(biāo)識(shí),若未中斷執(zhí)行插入
- 感知中斷,捕獲異常并打印未能處理的元素
- 完成資源兜底,主動(dòng)打斷線程將狀態(tài)向上傳遞
//外部線程打斷的方法
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
try {
//取消標(biāo)識(shí)檢測(cè),如果取消則直接結(jié)束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運(yùn)行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
Console.log("線程中斷,未處理資源:{}", element);
Thread.currentThread().interrupt();
}
}
} finally {
if (thread.isInterrupted()) {
System.out.println("任務(wù)已取消");
}
}
}對(duì)應(yīng)的我們也給出輸出結(jié)果,可以看到阻塞的隊(duì)列在被打斷后完成必要的資源兜底,就會(huì)將中斷狀態(tài)向上傳遞:
運(yùn)行中......
運(yùn)行中......
線程中斷,未處理資源:6
任務(wù)已取消4. 時(shí)刻保留中斷的狀態(tài)
需要注意的是,執(zhí)行者僅僅傳遞中斷還是不行的,更重要的一點(diǎn)是:
在必要時(shí)刻,保存中斷的狀態(tài),并返回前恢復(fù)狀態(tài),而不是捕獲到isInterrupted,避免陷入無(wú)限循環(huán)的漩渦。
很多情況下當(dāng)前任務(wù)不具備處理中斷的能力,例如Runnable收到中斷的請(qǐng)求不可拋出異常交由上層調(diào)用棧處理,那么就在收到中斷請(qǐng)求,按照如下步驟執(zhí)行:
- 基于本地標(biāo)識(shí)保留中斷狀態(tài)
- 完成必要的收尾工作
- 在返回前打斷該線程恢復(fù)中斷狀態(tài)
例如:線程0循環(huán)獲取阻塞隊(duì)列元素,在因?yàn)闆](méi)有元素而阻塞時(shí),線程1打斷該線程,已按照時(shí)刻保留中斷的狀態(tài)守則,線程0則應(yīng)該按照如下步驟執(zhí)行:
- 收到中斷,利用本地變量保留中斷狀態(tài)
- 繼續(xù)循環(huán)等待元素獲取
- 獲取到元素并返回,在返回前將當(dāng)前線程打斷,讓外部感知

對(duì)應(yīng)的我們給出消費(fèi)者循環(huán)獲取元素并處理狀態(tài)的代碼:
private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
public static String getNextElement() {
boolean interrupted = false;
try {
while (true) {
try {
//1. 等待結(jié)果返回而阻塞
return blockingQueue.take();
} catch (InterruptedException e) {
//2.收到異常中斷,小心保存中斷狀態(tài),繼續(xù)阻塞等待元素返回
interrupted = true;
Console.log("消費(fèi)者線程中斷,待完成本次資源獲取后執(zhí)行中斷");
}
}
} finally {
//3. 返回前基于中斷標(biāo)識(shí)將中斷狀態(tài)向上傳遞
if (interrupted) {
Console.log("消費(fèi)者線程已中斷");
Thread.currentThread().interrupt();
}
}
}對(duì)應(yīng)的我們也給出測(cè)試代碼,即在消費(fèi)者阻塞后將其打斷,并投遞元素,讓其完成優(yōu)雅中斷:
public static void main(String[] args) {
//消費(fèi)者線程
Thread thread = new Thread(() -> {
String nextElement = getNextElement();
Console.log("消費(fèi)者線程獲取結(jié)果:{}", nextElement);
});
thread.start();
Console.log("消費(fèi)者線程啟動(dòng)");
//休眠5s后將task任務(wù)對(duì)應(yīng)線程中斷
new Thread(() -> {
//休眠5s后將task任務(wù)對(duì)應(yīng)線程中斷
ThreadUtil.sleep(5000);
thread.interrupt();
//休眠5s后再投遞元素
ThreadUtil.sleep(5000);
try {
String element = RandomUtil.randomString(10);
Console.log("生產(chǎn)者線程投遞:{}", element);
blockingQueue.put(element);
} catch (InterruptedException e) {
//......
}
}).start();
}輸出結(jié)果如下,可以看到消費(fèi)者在收到中斷后明確保留中斷狀態(tài),并完成資源處理的工作后執(zhí)行中斷:
消費(fèi)者線程啟動(dòng)
消費(fèi)者線程中斷,待完成本次資源獲取后執(zhí)行中斷
生產(chǎn)者線程投遞:7gmnj1rqpj
消費(fèi)者線程已中斷
消費(fèi)者線程獲取結(jié)果:7gmnj1rqpj5. 超時(shí)任務(wù)取消的最優(yōu)解
如果我們現(xiàn)在需要實(shí)現(xiàn)這樣以一個(gè)函數(shù),該函數(shù)會(huì)接受外部傳入一個(gè)異步任務(wù)并提交到我們的線程池異步執(zhí)行,并具備如下要求:
- 要求在給定時(shí)間完成執(zhí)行
- 任務(wù)執(zhí)行完成后,要知曉是超時(shí)取消,還是正常執(zhí)行完成返回,即任務(wù)正確執(zhí)行則返回true,反之返回false
- 任務(wù)執(zhí)行過(guò)程中可被中斷

所以對(duì)于該需求,要做到如下幾點(diǎn):
- 可以感知任務(wù)執(zhí)行完成并返回true
- 可以感知任務(wù)執(zhí)行超時(shí),并返回false
- 任務(wù)可中斷,直接拋出讓上層代碼解決
對(duì)應(yīng)我們給出如下代碼,可以看到我們采用submit獲取異步任務(wù)的Future對(duì)象,利用Future實(shí)現(xiàn)帶有時(shí)限的阻塞獲取,一旦超時(shí)則直接拋出超時(shí)異常,并在函數(shù)返回前的finally語(yǔ)句塊調(diào)用cancel取消任務(wù),需要注意的是這個(gè)cancel方法并不會(huì)一味的取消任務(wù):
- 如果任務(wù)已完成,這就意味著任務(wù)已到達(dá)終態(tài),不可取消,cancel就會(huì)返回false
- 如果任務(wù)因?yàn)槌瑫r(shí)等原因調(diào)用cancel,那么任務(wù)則還是活躍的,調(diào)用cancel可以取消并直接返回true
所以基于cancel這個(gè)特點(diǎn),我們直接取反,即可實(shí)現(xiàn)正確執(zhí)行返回true,超時(shí)返回false:
private static final ExecutorService executor = ThreadUtil.newExecutor(10);
public static boolean get(Runnable r, int timeout,TimeUnit timeUnit) throws InterruptedException {
Future<?> future = executor.submit(r);
try {
future.get(timeout, timeUnit);
} catch (TimeoutException e) {
Console.error("任務(wù)執(zhí)行超時(shí)");
} catch (ExecutionException e) {
throw new RuntimeException(e);
} finally {
/*
1. cancel方法傳參為true,即任務(wù)正常完成不可取消直接返回false,反之返回true
2. 如果任務(wù)已經(jīng)完成,cancel則返回false,反之返回true
*/
return future.cancel(true);
}
}對(duì)應(yīng)的我們也給出測(cè)試代碼,可以看到筆者的代碼休眠2s,等到超時(shí)時(shí)間為1s,這也就意味著cancel最終會(huì)成功超時(shí)取消返回true:
try {
boolean isTimeOut = get(() -> {
ThreadUtil.sleep(2000);
Console.log("任務(wù)執(zhí)行完成");
}, 1,TimeUnit.SECONDS);
Console.log("任務(wù)是否超時(shí):{}", isTimeOut);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}輸出結(jié)果如下,可以看到任務(wù)執(zhí)行超時(shí)后直接打斷休眠,任務(wù)執(zhí)行到已完成的輸出,然后執(zhí)行超時(shí)取消,如果取反得到false:
任務(wù)執(zhí)行超時(shí)
任務(wù)執(zhí)行完成
任務(wù)是否超時(shí):true6. 處理系統(tǒng)層面阻塞IO
有時(shí)候阻塞并非來(lái)自阻塞式并發(fā)包的調(diào)用,而是例如硬件層面文件IO或者網(wǎng)絡(luò)層面的socket IO,這種API涉及內(nèi)核態(tài)調(diào)用,通過(guò)interrupt我們也只能修改中斷表示,無(wú)法直接將其中斷。
所以我們只能通過(guò)間接的手段干預(yù)其資源關(guān)閉來(lái)做到中斷,無(wú)論是socket還是文件IO,本質(zhì)上都是針對(duì)系統(tǒng)或者網(wǎng)卡IO數(shù)據(jù)的阻塞讀取,所以我們可以直接通過(guò)關(guān)閉文件IO流或者socket套接字來(lái)間接打斷其資源讀取。

對(duì)應(yīng)的我們以文件IO為例給出代碼示例,可以看到我們通過(guò)繼承thread重寫(xiě)其中斷方法,當(dāng)我們需要打斷系統(tǒng)資源時(shí),直接關(guān)閉其流通道讓工作線程感知到這一點(diǎn),然后通過(guò)原生interrupt修改中斷狀態(tài):
public class IOThread extends Thread {
private final BufferedReader utf8Reader;
public IOThread(String path) {
utf8Reader = FileUtil.getUtf8Reader(path);
}
@Override
public synchronized void start() {
while (true) {
try {
String line = utf8Reader.readLine();
Console.log(line);
} catch (IOException e) {
//保存IO上下文狀態(tài)
throw new RuntimeException(e);
}
}
}
@Override
public void interrupt() {
try {
//強(qiáng)制關(guān)閉IO讓其感知中斷
utf8Reader.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
super.interrupt();
}
}
}對(duì)應(yīng)的我們也給出使用示例,這段代碼會(huì)在中斷線程調(diào)用interrupt關(guān)閉流通道直接直接將IOUtil線程打斷:
IOThread ioThread = new IOThread("F:\\test.txt");
Thread thread = new Thread(() -> {
ThreadUtil.sleep(10_000);
ioThread.interrupt();
});
thread.start();
ioThread.start();






























