如何優(yōu)雅地中斷 Java 線程
我們通過并發(fā)編程提升了系統(tǒng)的吞吐量,同時我們也希望并發(fā)運行的線程能夠及時停止并做好資源歸納,所以筆者就借此文來談談Java并發(fā)編程中線程中斷的藝術。
一、詳解Java中斷的哲學
1. 何時觸發(fā)中斷阻塞
按照操作系統(tǒng)對于線程的任務調(diào)度管理來說,當觸發(fā)以下幾種情況時線程就會阻塞而處于BLOCKED、WAITING、TIMED_WAITING幾種狀態(tài): 第一種是線程執(zhí)行IO請求,在等待IO資源返回,觸發(fā)阻塞,此時線程就處于BLOCKED狀態(tài),例如服務端server執(zhí)行serverSocket.accept()等待就緒的客戶端接入:
第二種則是等待條件為真期間,線程因此掛起等待notify通知或者通過sleep休眠,進而處于WAITING或者TIMED_WAITING:
new Thread(() -> ThreadUtil.sleep(3600), "t-0").start();
因為并發(fā)互斥原因,線程需要等待其它線程釋放監(jiān)視鎖而進入BLOCKED阻塞態(tài):
2. Java是如何響應中斷的
在操作系統(tǒng)中,線程的中斷方式一般分為以下兩種:
- 搶占式:當線程需要中斷時,直接強制讓線程立刻停止手里的任務
- 協(xié)作式:當線程需要中斷時,通過標識告知線程需要被中斷,線程輪詢檢查時看到這個標識就會直接中斷
而Java線程則是采用協(xié)作式中斷,即調(diào)用interrupt時其底層僅僅是將線程設置為可中斷狀態(tài),等到線程主動檢查到線程標識被設置為中斷時,則觸發(fā)InterruptedException:
對應的我們以Linux為例給出JDK底層關于線程中斷函數(shù)interrupt的實現(xiàn),即位于os_linux.cpp的interrupt方法,可以看到其底層本質(zhì)上就是定位到java線程對應的os線程并將其中斷標識設置為true:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
//設置os線程中斷標識為true
osthread->set_interrupted(true);
//......
}
//......
}
同時我們也給出處于sleep休眠狀態(tài)的線程響應中斷的源碼,同樣是以Linux為例的線程封裝類os_linux.cpp下的sleep函數(shù),可以看到進行休眠時其底層在明確知曉線程可被中斷的時候,就會在for循環(huán)中知曉休眠并定期檢查可中斷狀態(tài):
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
//......
if (interruptible) {
jlong prevtime = javaTimeNanos();
for (;;) {
//循環(huán)中感知到中斷直接返回OS_INTRPT標識
if (os::is_interrupted(thread, true)) {
return OS_INTRPT;
}
}
} else {
//......
}
}
3. 線程中斷的守則
通常來說我們對線程中斷響應度越高,就越容易處理并優(yōu)雅的完成兜底動作,一般來說,在處理線程中斷時一般會出現(xiàn)如下兩種情況:
- 當前代碼層面對象實例不具備處理該中斷異常
- 處于線程內(nèi)部的run方法感知到中斷無法向上拋出
針對情況1,本質(zhì)上就是權(quán)責上的轉(zhuǎn)移,如果當前業(yè)務層面不具備處理此類異常的能力,那么就將異常向上層拋出傳遞給上層使用者:
public void sleep(int seconds) throws InterruptedException {
TimeUnit.SECONDS.sleep(seconds);
}
而情況2則相對麻煩一些,如果類似于Runnable 這種無法向上拋出的內(nèi)置接口類的實現(xiàn),我們則可以主動去捕獲中斷中斷異常,并將在完成必要的資源清理工作后,將當前線程打斷從而讓高層棧幀感知到這個異常中斷:
class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//執(zhí)行當前線程資源清理
//打斷當前線程引發(fā)更高層線程響應此中斷
Thread.currentThread().interrupt();
}
}
}
二、Java線程中斷處理的一些實踐
1. 基于標識取消任務
我們先來說說基于自定義標識的方式中斷線程,即非java內(nèi)置方法層面的協(xié)作式標識來停止線程,通過任務運行時輪詢檢測,一旦線程輪詢檢查看到中斷標識設置為true,則直接結(jié)束運行:
對應的我們給出自定義協(xié)作式中斷的實現(xiàn),整體思路為:
- 采用cancelled標識中斷狀態(tài),并用volatile保證可見性
- 內(nèi)置初始化一個執(zhí)行線程thread
- 對外暴露start方法,執(zhí)行線程啟動
- 對外暴露cancel方法修改線程中斷狀態(tài)
- run方法執(zhí)行業(yè)務邏輯,并定期輪詢檢查中斷標識,一旦標識被設置為true則退出循環(huán),結(jié)束線程
public class Task implements Runnable {
/**
* 使用volatile修飾保證標識修改可見
*/
privatevolatileboolean cancelled = false;
privatefinal Thread thread = new Thread(this);
public void start() {
thread.start();
}
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
cancelled = true;
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結(jié)束循環(huán)
while (!cancelled) {
System.out.println("running");
ThreadUtil.sleep(1000);
}
System.out.println("task cancelled");
}
}
對應的我們也給出這種方式的使用示例,可以看到我們的測試代碼會在5s后調(diào)用task暴露的任務取消方法完成線程中斷:
//線程啟動運行5s
Task task = new Task();
task.start();
//休眠5s后將task任務對應線程中斷
new Thread(()->{
ThreadUtil.sleep(5000);
task.cancel();
}).start();
而執(zhí)行的輸出結(jié)果如下,是符合我們預期的:
running
running
running
running
running
running
task cancelled
當然這種做法也存在一定的弊端,即帶有阻塞性質(zhì)的操作,任務可能出現(xiàn)永遠無法檢查取消標志,例如我們的線程在循環(huán)往阻塞阻塞隊列blockingQueue的put添加元素,一旦隊列空間達到容器上界,當前線程就會阻塞即無法執(zhí)行到循環(huán)分支上:
對應我們也給出這段錯誤的樣例,即阻塞隊列添加操作后阻塞而走不到循環(huán)判斷:
private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);
@Override
public void run() {
//取消標識檢測,如果取消則直接結(jié)束循環(huán)
while (!cancelled) {
System.out.println("running");
try {
queue.put(RandomUtil.randomInt(10));
} catch (InterruptedException e) {
//......
}
}
System.out.println("task cancelled");
}
測試代碼還是和上一小節(jié)一致,不多贅述,測試輸出結(jié)果如下,即第二次添加操作時發(fā)現(xiàn)隊列已滿而阻塞從而無法打斷:
running
running
2. 如何處理阻塞式的中斷
參考java內(nèi)置方法Thread.sleep(1000);或者wait()方法,其底層都會針對外部中斷操作做檢測,一旦感知到中斷就會提前返回,即執(zhí)行如下步驟:
- 清除中斷狀態(tài)
- 拋出InterruptedException讓被中斷線程感知異常
所以對于阻塞式中斷的的正確方式為:
通過合理的時機發(fā)出中斷請求,讓線程在下一個合適時候處理中斷
所以對于上述阻塞隊列操作來說,可以按照如下方式進行線程優(yōu)雅中斷:
- 對外暴露interrupt方法打斷當前線程,確保阻塞隊列插入阻塞時依然可以利用內(nèi)置方法完成線程打斷
- 線程感知中斷時不可直接拋出異常,而是利用異常捕獲將資源處理清楚,再次執(zhí)行中斷循環(huán)監(jiān)測,正常退出線程邏輯
對應我們給出改造后的代碼,可以看到我們將cancel改為調(diào)用線程的中斷方法將線程中斷,同時在感知到中斷異常時會將執(zhí)行中斷后的兜底邏輯:
/**
* 停止時,通過cancel請求取消
*/
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
//取消標識檢測,如果取消則直接結(jié)束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
//處理中斷
}
}
}
3. 合理的中斷策略
筆者在上面的文章中對于拋出的異常給出了一段todo的伏筆,這里我們就來說說線程面對中斷異常后響應的哲學。一般來說,線程級或者服務級的中斷策略為:
- 盡快的退出
- 必要時完成手頭任務的清理
這也就是為什么java中各種并發(fā)包的類庫對于中斷的任務僅僅是拋出InterruptedException而不是直接處理掉中斷 ,例如ArrayBlockingQueue的put方法:
//將任務中的中斷InterruptedException 丟給調(diào)用棧的上層代碼執(zhí)行
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//上可打斷的鎖
lock.lockInterruptibly();
try {
//......
} finally {
//1. 釋放鎖
lock.unlock();
}
}
而中斷策略的響應,正確的做法應該是讓執(zhí)行該任務的線程進行按照如下原則進行處理:
- 如果不具備處理的能力,則將異常向上傳遞
- 如果無法傳遞異常則顯示拋出中斷讓上層的調(diào)用棧感知。
以我們Task生產(chǎn)者代碼為例,我們將提交給線程即哪些繼承runnable或者lambda表達式統(tǒng)稱為任務,一般來說持有這些任務的線程不一定是這些任務的執(zhí)行者,它們僅擁有對于任務狀態(tài)管理的一些權(quán)限,例如一個主線程main方法調(diào)用thread-0異步執(zhí)行阻塞隊列存取操作:
所以從任務的維度來說,執(zhí)行任務的線程應該小心的保存中斷的狀態(tài),即在面對中斷時,它們不應該對中斷進行任何的干預,而是讓擁有線程的代碼段做出正確的響應,即讓thread-0感知到中斷異常然后將狀態(tài)狀態(tài)還原向上傳遞:
對應的我們也給出阻塞存儲元素的優(yōu)雅中斷處理:
- 輪詢檢測本地中斷標識,若未中斷執(zhí)行插入
- 感知中斷,捕獲異常并打印未能處理的元素
- 完成資源兜底,主動打斷線程將狀態(tài)向上傳遞
//外部線程打斷的方法
public void cancel() {
thread.interrupt();
}
@Override
public void run() {
try {
//取消標識檢測,如果取消則直接結(jié)束循環(huán)
while (!Thread.currentThread().isInterrupted()) {
System.out.println("運行中......");
Integer element = RandomUtil.randomInt(10);
try {
queue.put(element);
} catch (InterruptedException e) {
Console.log("線程中斷,未處理資源:{}", element);
Thread.currentThread().interrupt();
}
}
} finally {
if (thread.isInterrupted()) {
System.out.println("任務已取消");
}
}
}
對應的我們也給出輸出結(jié)果,可以看到阻塞的隊列在被打斷后完成必要的資源兜底,就會將中斷狀態(tài)向上傳遞:
運行中......
運行中......
線程中斷,未處理資源:6
任務已取消
4. 時刻保留中斷的狀態(tài)
需要注意的是,執(zhí)行者僅僅傳遞中斷還是不行的,更重要的一點是:
在必要時刻,保存中斷的狀態(tài),并返回前恢復狀態(tài),而不是捕獲到isInterrupted,避免陷入無限循環(huán)的漩渦。
很多情況下當前任務不具備處理中斷的能力,例如Runnable收到中斷的請求不可拋出異常交由上層調(diào)用棧處理,那么就在收到中斷請求,按照如下步驟執(zhí)行:
- 基于本地標識保留中斷狀態(tài)
- 完成必要的收尾工作
- 在返回前打斷該線程恢復中斷狀態(tài)
例如線程0循環(huán)獲取阻塞隊列元素,在因為沒有元素而阻塞時,線程1打斷該線程,已按照時刻保留中斷的狀態(tài)守則,線程0則應該按照如下步驟執(zhí)行:
- 收到中斷,利用本地變量保留中斷狀態(tài)
- 繼續(xù)循環(huán)等待元素獲取
- 獲取到元素并返回,在返回前將當前線程打斷,讓外部感知
對應的我們給出消費者循環(huán)獲取元素并處理狀態(tài)的代碼:
privatestaticfinal BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);
public static String getNextElement() {
boolean interrupted = false;
try {
while (true) {
try {
//1. 等待結(jié)果返回而阻塞
return blockingQueue.take();
} catch (InterruptedException e) {
//2.收到異常中斷,小心保存中斷狀態(tài),繼續(xù)阻塞等待元素返回
interrupted = true;
Console.log("消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷");
}
}
} finally {
//3. 返回前基于中斷標識將中斷狀態(tài)向上傳遞
if (interrupted) {
Console.log("消費者線程已中斷");
Thread.currentThread().interrupt();
}
}
}
對應的我們也給出測試代碼,即在消費者阻塞后將其打斷,并投遞元素,讓其完成優(yōu)雅中斷:
public static void main(String[] args) {
//消費者線程
Thread thread = new Thread(() -> {
String nextElement = getNextElement();
Console.log("消費者線程獲取結(jié)果:{}", nextElement);
});
thread.start();
Console.log("消費者線程啟動");
//休眠5s后將task任務對應線程中斷
new Thread(() -> {
//休眠5s后將task任務對應線程中斷
ThreadUtil.sleep(5000);
thread.interrupt();
//休眠5s后再投遞元素
ThreadUtil.sleep(5000);
try {
String element = RandomUtil.randomString(10);
Console.log("生產(chǎn)者線程投遞:{}", element);
blockingQueue.put(element);
} catch (InterruptedException e) {
//......
}
}).start();
}
輸出結(jié)果如下,可以看到消費者在收到中斷后明確保留中斷狀態(tài),并完成資源處理的工作后執(zhí)行中斷:
消費者線程啟動
消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷
生產(chǎn)者線程投遞:7gmnj1rqpj
消費者線程已中斷
消費者線程獲取結(jié)果:7gmnj1rqpj
5. 超時任務取消的最優(yōu)解
如果我們現(xiàn)在需要實現(xiàn)這樣以一個函數(shù),該函數(shù)會接受外部傳入一個異步任務并提交到我們的線程池異步執(zhí)行,并具備如下要求:
- 要求在給定時間完成執(zhí)行
- 任務執(zhí)行完成后,要知曉是超時取消,還是正常執(zhí)行完成返回,即任務正確執(zhí)行則返回true,反之返回false
- 任務執(zhí)行過程中可被中斷
所以對于該需求,要做到如下幾點:
- 可以感知任務執(zhí)行完成并返回true
- 可以感知任務執(zhí)行超時,并返回false
- 任務可中斷,直接拋出讓上層代碼解決
對應我們給出如下代碼,可以看到我們采用submit獲取異步任務的Future對象,利用Future實現(xiàn)帶有時限的阻塞獲取,一旦超時則直接拋出超時異常,并在函數(shù)返回前的finally語句塊調(diào)用cancel取消任務,需要注意的是這個cancel方法并不會一味的取消任務:
- 如果任務已完成,cancel就會返回false
- 如果任務因為超時等原因調(diào)用cancel,那么任務則還是活躍的,調(diào)用cancel可以取消并直接返回true
所以基于cancel這個特點,我們直接取反,即可實現(xiàn)正確執(zhí)行返回true,超時返回false:
private staticfinal ExecutorService executor = ThreadUtil.newExecutor(10);
public static boolean get(Runnable r, int timeout) throws InterruptedException {
Future<?> future = executor.submit(r);
try {
future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Console.error("任務執(zhí)行超時");
} catch (ExecutionException e) {
thrownew RuntimeException(e);
} finally {
/*
1. 設置為true,如果任務是運行中,則取消任務,如果已經(jīng)取消,則沒有任務效果
2. 如果任務已經(jīng)完成,則返回false,反之返回true
*/
return !future.cancel(true);
}
}
對應的我們也給出測試代碼:
public static void main(String[] args) {
try {
boolean isTimeOut = get(() -> {
ThreadUtil.sleep(5000);
Console.log("任務執(zhí)行完成");
}, 1);
Console.log("任務是否正常執(zhí)行:{}", isTimeOut);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
輸出結(jié)果如下,可以看到任務執(zhí)行超時后直接打斷休眠,任務執(zhí)行到已完成的輸出,然后執(zhí)行超時取消,如果取反得到false:
任務執(zhí)行超時
任務執(zhí)行完成
任務是否正常執(zhí)行:false
6. 處理系統(tǒng)層面阻塞IO
有時候阻塞并非來自阻塞式并發(fā)包的調(diào)用,而是例如硬件層面文件IO或者網(wǎng)絡層面的socket IO,這種API涉及內(nèi)核態(tài)調(diào)用,通過interrupt我們也只能修改中斷表示,無法直接將其中斷。
所以我們只能通過間接的手段干預其資源關閉來做到中斷,無論是socket還是文件IO,本質(zhì)上都是針對系統(tǒng)或者網(wǎng)卡IO數(shù)據(jù)的阻塞讀取,所以我們可以直接通過關閉文件IO流或者socket套接字來間接打斷其資源讀取。
對應的我們以文件IO為例給出代碼示例,可以看到我們通過繼承thread重寫其中斷方法,當我們需要打斷系統(tǒng)資源時,直接關閉其流通道讓工作線程感知到這一點,然后通過原生interrupt修改中斷狀態(tài):
public class IOThread extends Thread {
privatefinal BufferedReader utf8Reader;
public IOThread(String path) {
utf8Reader = FileUtil.getUtf8Reader(path);
}
@Override
public synchronized void start() {
while (true) {
try {
String line = utf8Reader.readLine();
Console.log(line);
} catch (IOException e) {
//保存IO上下文狀態(tài)
thrownew RuntimeException(e);
}
}
}
@Override
public void interrupt() {
try {
//強制關閉IO讓其感知中斷
utf8Reader.close();
} catch (IOException e) {
thrownew RuntimeException(e);
} finally {
super.interrupt();
}
}
}
對應的我們也給出使用示例,這段代碼會在中斷線程調(diào)用interrupt關閉流通道直接直接將IOUtil 線程打斷:
IOThread ioThread = new IOThread("F:\\test.txt");
Thread thread = new Thread(() -> {
ThreadUtil.sleep(10_000);
ioThread.interrupt();
});
thread.start();
ioThread.start();