偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

如何優(yōu)雅地中斷 Java 線程

開發(fā)
本文針對Java并發(fā)編程的中斷的使用技巧、狀態(tài)保存、合理的中斷時機和不同場景的中斷方式進行了深入的剖析的講解,希望對你有幫助。

我們通過并發(fā)編程提升了系統(tǒng)的吞吐量,同時我們也希望并發(fā)運行的線程能夠及時停止并做好資源歸納,所以筆者就借此文來談談Java并發(fā)編程中線程中斷的藝術。

一、詳解Java中斷的哲學

1. 何時觸發(fā)中斷阻塞

按照操作系統(tǒng)對于線程的任務調(diào)度管理來說,當觸發(fā)以下幾種情況時線程就會阻塞而處于BLOCKED、WAITING、TIMED_WAITING幾種狀態(tài): 第一種是線程執(zhí)行IO請求,在等待IO資源返回,觸發(fā)阻塞,此時線程就處于BLOCKED狀態(tài),例如服務端server執(zhí)行serverSocket.accept()等待就緒的客戶端接入:

第二種則是等待條件為真期間,線程因此掛起等待notify通知或者通過sleep休眠,進而處于WAITING或者TIMED_WAITING:

new Thread(() -> ThreadUtil.sleep(3600), "t-0").start();

因為并發(fā)互斥原因,線程需要等待其它線程釋放監(jiān)視鎖而進入BLOCKED阻塞態(tài):

2. Java是如何響應中斷的

在操作系統(tǒng)中,線程的中斷方式一般分為以下兩種:

  • 搶占式:當線程需要中斷時,直接強制讓線程立刻停止手里的任務
  • 協(xié)作式:當線程需要中斷時,通過標識告知線程需要被中斷,線程輪詢檢查時看到這個標識就會直接中斷

而Java線程則是采用協(xié)作式中斷,即調(diào)用interrupt時其底層僅僅是將線程設置為可中斷狀態(tài),等到線程主動檢查到線程標識被設置為中斷時,則觸發(fā)InterruptedException:

對應的我們以Linux為例給出JDK底層關于線程中斷函數(shù)interrupt的實現(xiàn),即位于os_linux.cpp的interrupt方法,可以看到其底層本質(zhì)上就是定位到java線程對應的os線程并將其中斷標識設置為true:

void os::interrupt(Thread* thread) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

if (!osthread->interrupted()) {
    //設置os線程中斷標識為true
    osthread->set_interrupted(true);
    //......
  }

//......

}

同時我們也給出處于sleep休眠狀態(tài)的線程響應中斷的源碼,同樣是以Linux為例的線程封裝類os_linux.cpp下的sleep函數(shù),可以看到進行休眠時其底層在明確知曉線程可被中斷的時候,就會在for循環(huán)中知曉休眠并定期檢查可中斷狀態(tài):

int os::sleep(Thread* thread, jlong millis, bool interruptible) {
   //......

if (interruptible) {
    jlong prevtime = javaTimeNanos();

    for (;;) {
      //循環(huán)中感知到中斷直接返回OS_INTRPT標識
      if (os::is_interrupted(thread, true)) {
        return OS_INTRPT;
      }


    }
  } else {
    //......
  }
}

3. 線程中斷的守則

通常來說我們對線程中斷響應度越高,就越容易處理并優(yōu)雅的完成兜底動作,一般來說,在處理線程中斷時一般會出現(xiàn)如下兩種情況:

  • 當前代碼層面對象實例不具備處理該中斷異常
  • 處于線程內(nèi)部的run方法感知到中斷無法向上拋出

針對情況1,本質(zhì)上就是權(quán)責上的轉(zhuǎn)移,如果當前業(yè)務層面不具備處理此類異常的能力,那么就將異常向上層拋出傳遞給上層使用者:

public void sleep(int seconds) throws InterruptedException {
        TimeUnit.SECONDS.sleep(seconds);
    }

而情況2則相對麻煩一些,如果類似于Runnable 這種無法向上拋出的內(nèi)置接口類的實現(xiàn),我們則可以主動去捕獲中斷中斷異常,并將在完成必要的資源清理工作后,將當前線程打斷從而讓高層棧幀感知到這個異常中斷:

class Task implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //執(zhí)行當前線程資源清理
                //打斷當前線程引發(fā)更高層線程響應此中斷
                Thread.currentThread().interrupt();
            }
        }
    }

二、Java線程中斷處理的一些實踐

1. 基于標識取消任務

我們先來說說基于自定義標識的方式中斷線程,即非java內(nèi)置方法層面的協(xié)作式標識來停止線程,通過任務運行時輪詢檢測,一旦線程輪詢檢查看到中斷標識設置為true,則直接結(jié)束運行:

對應的我們給出自定義協(xié)作式中斷的實現(xiàn),整體思路為:

  • 采用cancelled標識中斷狀態(tài),并用volatile保證可見性
  • 內(nèi)置初始化一個執(zhí)行線程thread
  • 對外暴露start方法,執(zhí)行線程啟動
  • 對外暴露cancel方法修改線程中斷狀態(tài)
  • run方法執(zhí)行業(yè)務邏輯,并定期輪詢檢查中斷標識,一旦標識被設置為true則退出循環(huán),結(jié)束線程
public class Task implements Runnable {
    /**
     * 使用volatile修飾保證標識修改可見
     */
    privatevolatileboolean cancelled = false;

    privatefinal Thread thread = new Thread(this);

    public void start() {
        thread.start();
    }

    /**
     * 停止時,通過cancel請求取消
     */
    public void cancel() {
        cancelled = true;

    }

   
    @Override
    public void run() {
        //取消標識檢測,如果取消則直接結(jié)束循環(huán)
        while (!cancelled) {
            System.out.println("running");
            ThreadUtil.sleep(1000);
        }
        System.out.println("task cancelled");
    }
}

對應的我們也給出這種方式的使用示例,可以看到我們的測試代碼會在5s后調(diào)用task暴露的任務取消方法完成線程中斷:

//線程啟動運行5s
        Task task = new Task();
        task.start();
        //休眠5s后將task任務對應線程中斷
        new Thread(()->{
            ThreadUtil.sleep(5000);
            task.cancel();
        }).start();

而執(zhí)行的輸出結(jié)果如下,是符合我們預期的:

running
running
running
running
running
running
task cancelled

當然這種做法也存在一定的弊端,即帶有阻塞性質(zhì)的操作,任務可能出現(xiàn)永遠無法檢查取消標志,例如我們的線程在循環(huán)往阻塞阻塞隊列blockingQueue的put添加元素,一旦隊列空間達到容器上界,當前線程就會阻塞即無法執(zhí)行到循環(huán)分支上:

對應我們也給出這段錯誤的樣例,即阻塞隊列添加操作后阻塞而走不到循環(huán)判斷:

private final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100);

    @Override
    public void run() {
        //取消標識檢測,如果取消則直接結(jié)束循環(huán)
        while (!cancelled) {
            System.out.println("running");
            try {
                queue.put(RandomUtil.randomInt(10));
            } catch (InterruptedException e) {
                //......
            }
        }
        System.out.println("task cancelled");
    }

測試代碼還是和上一小節(jié)一致,不多贅述,測試輸出結(jié)果如下,即第二次添加操作時發(fā)現(xiàn)隊列已滿而阻塞從而無法打斷:

running
running

2. 如何處理阻塞式的中斷

參考java內(nèi)置方法Thread.sleep(1000);或者wait()方法,其底層都會針對外部中斷操作做檢測,一旦感知到中斷就會提前返回,即執(zhí)行如下步驟:

  • 清除中斷狀態(tài)
  • 拋出InterruptedException讓被中斷線程感知異常

所以對于阻塞式中斷的的正確方式為:

通過合理的時機發(fā)出中斷請求,讓線程在下一個合適時候處理中斷

所以對于上述阻塞隊列操作來說,可以按照如下方式進行線程優(yōu)雅中斷:

  • 對外暴露interrupt方法打斷當前線程,確保阻塞隊列插入阻塞時依然可以利用內(nèi)置方法完成線程打斷
  • 線程感知中斷時不可直接拋出異常,而是利用異常捕獲將資源處理清楚,再次執(zhí)行中斷循環(huán)監(jiān)測,正常退出線程邏輯

對應我們給出改造后的代碼,可以看到我們將cancel改為調(diào)用線程的中斷方法將線程中斷,同時在感知到中斷異常時會將執(zhí)行中斷后的兜底邏輯:

/**
     * 停止時,通過cancel請求取消
     */
    public void cancel() {
        thread.interrupt();
    }


    @Override
    public void run() {

            //取消標識檢測,如果取消則直接結(jié)束循環(huán)
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("運行中......");
                Integer element = RandomUtil.randomInt(10);
                try {
                    queue.put(element);
                } catch (InterruptedException e) {
                   //處理中斷
                }

            }
        

    }

3. 合理的中斷策略

筆者在上面的文章中對于拋出的異常給出了一段todo的伏筆,這里我們就來說說線程面對中斷異常后響應的哲學。一般來說,線程級或者服務級的中斷策略為:

  • 盡快的退出
  • 必要時完成手頭任務的清理

這也就是為什么java中各種并發(fā)包的類庫對于中斷的任務僅僅是拋出InterruptedException而不是直接處理掉中斷 ,例如ArrayBlockingQueue的put方法:

//將任務中的中斷InterruptedException 丟給調(diào)用棧的上層代碼執(zhí)行
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //上可打斷的鎖
        lock.lockInterruptibly();
        try {
           //......
        } finally {
        //1. 釋放鎖
            lock.unlock();
        }
    }

而中斷策略的響應,正確的做法應該是讓執(zhí)行該任務的線程進行按照如下原則進行處理:

  • 如果不具備處理的能力,則將異常向上傳遞
  • 如果無法傳遞異常則顯示拋出中斷讓上層的調(diào)用棧感知。

以我們Task生產(chǎn)者代碼為例,我們將提交給線程即哪些繼承runnable或者lambda表達式統(tǒng)稱為任務,一般來說持有這些任務的線程不一定是這些任務的執(zhí)行者,它們僅擁有對于任務狀態(tài)管理的一些權(quán)限,例如一個主線程main方法調(diào)用thread-0異步執(zhí)行阻塞隊列存取操作:

所以從任務的維度來說,執(zhí)行任務的線程應該小心的保存中斷的狀態(tài),即在面對中斷時,它們不應該對中斷進行任何的干預,而是讓擁有線程的代碼段做出正確的響應,即讓thread-0感知到中斷異常然后將狀態(tài)狀態(tài)還原向上傳遞:

對應的我們也給出阻塞存儲元素的優(yōu)雅中斷處理:

  • 輪詢檢測本地中斷標識,若未中斷執(zhí)行插入
  • 感知中斷,捕獲異常并打印未能處理的元素
  • 完成資源兜底,主動打斷線程將狀態(tài)向上傳遞
//外部線程打斷的方法
 public void cancel() {
        thread.interrupt();
    }


    @Override
    public void run() {
        try {
            //取消標識檢測,如果取消則直接結(jié)束循環(huán)
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("運行中......");
                Integer element = RandomUtil.randomInt(10);
                try {
                    queue.put(element);
                } catch (InterruptedException e) {
                    Console.log("線程中斷,未處理資源:{}", element);
                    Thread.currentThread().interrupt();
                }

            }
        } finally {
            if (thread.isInterrupted()) {
                System.out.println("任務已取消");
            }
        }
        
    }

對應的我們也給出輸出結(jié)果,可以看到阻塞的隊列在被打斷后完成必要的資源兜底,就會將中斷狀態(tài)向上傳遞:

運行中......
運行中......
線程中斷,未處理資源:6
任務已取消

4. 時刻保留中斷的狀態(tài)

需要注意的是,執(zhí)行者僅僅傳遞中斷還是不行的,更重要的一點是:

在必要時刻,保存中斷的狀態(tài),并返回前恢復狀態(tài),而不是捕獲到isInterrupted,避免陷入無限循環(huán)的漩渦。

很多情況下當前任務不具備處理中斷的能力,例如Runnable收到中斷的請求不可拋出異常交由上層調(diào)用棧處理,那么就在收到中斷請求,按照如下步驟執(zhí)行:

  • 基于本地標識保留中斷狀態(tài)
  • 完成必要的收尾工作
  • 在返回前打斷該線程恢復中斷狀態(tài)

例如線程0循環(huán)獲取阻塞隊列元素,在因為沒有元素而阻塞時,線程1打斷該線程,已按照時刻保留中斷的狀態(tài)守則,線程0則應該按照如下步驟執(zhí)行:

  • 收到中斷,利用本地變量保留中斷狀態(tài)
  • 繼續(xù)循環(huán)等待元素獲取
  • 獲取到元素并返回,在返回前將當前線程打斷,讓外部感知

對應的我們給出消費者循環(huán)獲取元素并處理狀態(tài)的代碼:

privatestaticfinal BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);

public static String getNextElement() {
        boolean interrupted = false;

        try {
            while (true) {

                try {
                    //1. 等待結(jié)果返回而阻塞
                    return blockingQueue.take();
                } catch (InterruptedException e) {
                    //2.收到異常中斷,小心保存中斷狀態(tài),繼續(xù)阻塞等待元素返回
                    interrupted = true;
                    Console.log("消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷");
                }
            }
        } finally {
            //3. 返回前基于中斷標識將中斷狀態(tài)向上傳遞
            if (interrupted) {
                Console.log("消費者線程已中斷");
                Thread.currentThread().interrupt();
            }
        }

    }

對應的我們也給出測試代碼,即在消費者阻塞后將其打斷,并投遞元素,讓其完成優(yōu)雅中斷:

public static void main(String[] args) {
        //消費者線程
        Thread thread = new Thread(() -> {
            String nextElement = getNextElement();
            Console.log("消費者線程獲取結(jié)果:{}", nextElement);
        });
        thread.start();
        Console.log("消費者線程啟動");
        //休眠5s后將task任務對應線程中斷
        new Thread(() -> {
            //休眠5s后將task任務對應線程中斷
            ThreadUtil.sleep(5000);
            thread.interrupt();
            //休眠5s后再投遞元素
            ThreadUtil.sleep(5000);
            try {
                String element = RandomUtil.randomString(10);
                Console.log("生產(chǎn)者線程投遞:{}", element);
                blockingQueue.put(element);
            } catch (InterruptedException e) {
                //......
            }
        }).start();
    }

輸出結(jié)果如下,可以看到消費者在收到中斷后明確保留中斷狀態(tài),并完成資源處理的工作后執(zhí)行中斷:

消費者線程啟動
消費者線程中斷,待完成本次資源獲取后執(zhí)行中斷
生產(chǎn)者線程投遞:7gmnj1rqpj
消費者線程已中斷
消費者線程獲取結(jié)果:7gmnj1rqpj

5. 超時任務取消的最優(yōu)解

如果我們現(xiàn)在需要實現(xiàn)這樣以一個函數(shù),該函數(shù)會接受外部傳入一個異步任務并提交到我們的線程池異步執(zhí)行,并具備如下要求:

  • 要求在給定時間完成執(zhí)行
  • 任務執(zhí)行完成后,要知曉是超時取消,還是正常執(zhí)行完成返回,即任務正確執(zhí)行則返回true,反之返回false
  • 任務執(zhí)行過程中可被中斷

所以對于該需求,要做到如下幾點:

  • 可以感知任務執(zhí)行完成并返回true
  • 可以感知任務執(zhí)行超時,并返回false
  • 任務可中斷,直接拋出讓上層代碼解決

對應我們給出如下代碼,可以看到我們采用submit獲取異步任務的Future對象,利用Future實現(xiàn)帶有時限的阻塞獲取,一旦超時則直接拋出超時異常,并在函數(shù)返回前的finally語句塊調(diào)用cancel取消任務,需要注意的是這個cancel方法并不會一味的取消任務:

  • 如果任務已完成,cancel就會返回false
  • 如果任務因為超時等原因調(diào)用cancel,那么任務則還是活躍的,調(diào)用cancel可以取消并直接返回true

所以基于cancel這個特點,我們直接取反,即可實現(xiàn)正確執(zhí)行返回true,超時返回false:

private staticfinal ExecutorService executor = ThreadUtil.newExecutor(10);

    public static boolean get(Runnable r, int timeout) throws InterruptedException {
        Future<?> future = executor.submit(r);
        try {
            future.get(timeout, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            Console.error("任務執(zhí)行超時");
        } catch (ExecutionException e) {
            thrownew RuntimeException(e);
        } finally {
            /*
             1. 設置為true,如果任務是運行中,則取消任務,如果已經(jīng)取消,則沒有任務效果
             2. 如果任務已經(jīng)完成,則返回false,反之返回true
             */
            return !future.cancel(true);

        }

    }

對應的我們也給出測試代碼:

public static void main(String[] args) {
        try {
            boolean isTimeOut = get(() -> {
                ThreadUtil.sleep(5000);
                Console.log("任務執(zhí)行完成");
            }, 1);

            Console.log("任務是否正常執(zhí)行:{}", isTimeOut);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

輸出結(jié)果如下,可以看到任務執(zhí)行超時后直接打斷休眠,任務執(zhí)行到已完成的輸出,然后執(zhí)行超時取消,如果取反得到false:

任務執(zhí)行超時
任務執(zhí)行完成
任務是否正常執(zhí)行:false

6. 處理系統(tǒng)層面阻塞IO

有時候阻塞并非來自阻塞式并發(fā)包的調(diào)用,而是例如硬件層面文件IO或者網(wǎng)絡層面的socket IO,這種API涉及內(nèi)核態(tài)調(diào)用,通過interrupt我們也只能修改中斷表示,無法直接將其中斷。

所以我們只能通過間接的手段干預其資源關閉來做到中斷,無論是socket還是文件IO,本質(zhì)上都是針對系統(tǒng)或者網(wǎng)卡IO數(shù)據(jù)的阻塞讀取,所以我們可以直接通過關閉文件IO流或者socket套接字來間接打斷其資源讀取。

對應的我們以文件IO為例給出代碼示例,可以看到我們通過繼承thread重寫其中斷方法,當我們需要打斷系統(tǒng)資源時,直接關閉其流通道讓工作線程感知到這一點,然后通過原生interrupt修改中斷狀態(tài):

public class IOThread extends Thread {
    privatefinal BufferedReader utf8Reader;

    public IOThread(String path) {
        utf8Reader = FileUtil.getUtf8Reader(path);
    }

    @Override
    public synchronized void start() {
        while (true) {
            try {
                String line = utf8Reader.readLine();
                Console.log(line);
            } catch (IOException e) {
                //保存IO上下文狀態(tài)
                thrownew RuntimeException(e);
            }
        }

    }

    @Override
    public void interrupt() {
        try {
            //強制關閉IO讓其感知中斷
            utf8Reader.close();
        } catch (IOException e) {
            thrownew RuntimeException(e);
        } finally {
            super.interrupt();
        }
    }


    
}

對應的我們也給出使用示例,這段代碼會在中斷線程調(diào)用interrupt關閉流通道直接直接將IOUtil 線程打斷:

IOThread ioThread = new IOThread("F:\\test.txt");
        Thread thread = new Thread(() -> {
            ThreadUtil.sleep(10_000);
            ioThread.interrupt();
        });

        thread.start();
        ioThread.start();
責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-28 12:23:36

Promise代碼

2020-03-27 11:41:12

線程 Java中止

2024-09-26 10:51:51

2023-05-12 14:14:00

Java線程中斷

2023-07-07 07:44:41

線程中斷LockSuppor

2024-11-13 16:37:00

Java線程池

2021-01-11 12:53:28

線程Java管理

2009-04-23 09:07:03

JAVA終端線程

2018-06-24 09:27:55

線程Tomcat多線程

2015-08-03 09:54:26

Java線程Java

2017-11-22 15:11:33

Java線程停止

2025-01-26 09:35:45

2023-10-10 13:23:18

空指針異常Java

2017-12-19 10:03:44

JavaLinux代碼

2023-12-20 10:04:45

線程池Java

2024-08-06 09:43:54

Java 8工具編程

2022-09-08 08:03:30

RocketMQ線程技巧

2025-01-20 07:10:00

LambdaJavanull

2020-12-08 08:08:51

Java接口數(shù)據(jù)

2017-06-04 16:24:27

線程線程池中斷
點贊
收藏

51CTO技術棧公眾號