偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

并發(fā)編程之定時任務(wù)&定時線程池原理解析

開發(fā) 前端
線程池的具體實現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時線程池,上一篇已經(jīng)分析過ThreadPoolExecutor原理與使用了,本篇我們來重點分析下ScheduledThreadPoolExecutor的原理與使用。

[[356766]]

 前言

線程池的具體實現(xiàn)有兩種,分別是ThreadPoolExecutor 默認(rèn)線程池和ScheduledThreadPoolExecutor 定時線程池,上一篇已經(jīng)分析過ThreadPoolExecutor原理與使用了,本篇我們來重點分析下ScheduledThreadPoolExecutor的原理與使用。

《并發(fā)編程之Executor線程池原理與源碼解讀》

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 與 ThreadPoolExecutor 線程池的概念有些區(qū)別,它是一個支持任務(wù)周期性調(diào)度的線程池。

ScheduledThreadPoolExecutor 繼承 ThreadPoolExecutor,同時通過實現(xiàn) ScheduledExecutorSerivce 來擴展基礎(chǔ)線程池的功能,使其擁有了調(diào)度能力。其整個調(diào)度的核心在于內(nèi)部類 DelayedWorkQueue ,一個有序的延時隊列。

定時線程池類的類結(jié)構(gòu)圖如下:

ScheduledThreadPoolExecutor 的出現(xiàn),很好的彌補了傳統(tǒng) Timer 的不足,具體對比看下表:

TimerScheduledThreadPoolExecutor線程單線程多線程多任務(wù)任務(wù)之間相互影響任務(wù)之間不影響調(diào)度時間絕對時間相對時間異常單任務(wù)異常,后續(xù)任務(wù)受影響無影響

工作原理

它用來處理延時任務(wù)或定時任務(wù)

它接收SchduledFutureTask類型的任務(wù),是線程池調(diào)度任務(wù)的最小單位,有三種提交任務(wù)的方式:

  1. schedule,特定時間延時后執(zhí)行一次任務(wù)
  2. scheduledAtFixedRate,固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間無關(guān),周期是固定的)
  3. scheduledWithFixedDelay,固定延時執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間有關(guān),延時從上一次任務(wù)完成后開始)

它采用 DelayedWorkQueue 存儲等待的任務(wù)

  1. DelayedWorkQueue 內(nèi)部封裝了一個 PriorityQueue ,它會根據(jù) time 的先后時間排序,若 time 相同則根據(jù) sequenceNumber 排序;
  2. DelayedWorkQueue 也是一個無界隊列;

因為前面講阻塞隊列實現(xiàn)的時候,已經(jīng)對DelayedWorkQueue進(jìn)行了說明,更多內(nèi)容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》

工作線程的執(zhí)行過程:

  • 工作線程會從DelayedWorkerQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
  • 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時間,再次放回DelayedWorkerQueue。

take方法是什么時候調(diào)用的呢? 在ThreadPoolExecutor中,getTask方法,工作線程會循環(huán)地從workQueue中取任務(wù)。但定時任務(wù)卻不同,因為如果一旦getTask方法取出了任務(wù)就開始執(zhí)行了,而這時可能還沒有到執(zhí)行的時間,所以在take方法中,要保證只有在到指定的執(zhí)行時間的時候任務(wù)才可以被取走。

PS:對于以上原理的理解,可以通過下面的源碼分析加深印象。

源碼分析

構(gòu)造方法

ScheduledThreadPoolExecutor有四個構(gòu)造形式:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) { 
  2.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  3.   new DelayedWorkQueue()); 
  4.  
  5. public ScheduledThreadPoolExecutor(int corePoolSize, 
  6.                                        ThreadFactory threadFactory) { 
  7.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  8.      new DelayedWorkQueue(), threadFactory); 
  9.  
  10. public ScheduledThreadPoolExecutor(int corePoolSize, 
  11.                                        RejectedExecutionHandler handler) { 
  12.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  13.   new DelayedWorkQueue(), handler); 
  14.  
  15. public ScheduledThreadPoolExecutor(int corePoolSize, 
  16.                                        ThreadFactory threadFactory, 
  17.                                        RejectedExecutionHandler handler) { 
  18.  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  19.      new DelayedWorkQueue(), threadFactory, handler); 

 當(dāng)然我們也可以使用工具類Executors的newScheduledThreadPool的方法,快速創(chuàng)建。注意這里使用的DelayedWorkQueue。

ScheduledThreadPoolExecutor沒有提供帶有最大線程數(shù)的構(gòu)造函數(shù)的,默認(rèn)是Integer.MAX_VALUE,說明其可以無限制的開啟任意線程執(zhí)行任務(wù),在大量任務(wù)系統(tǒng),應(yīng)注意這一點,避免內(nèi)存溢出。

核心方法

核心方法主要介紹ScheduledThreadPoolExecutor的調(diào)度方法,其他方法與 ThreadPoolExecutor 一致。調(diào)度方法均由 ScheduledExecutorService 接口定義:

  1. public interface ScheduledExecutorService extends ExecutorService { 
  2.     // 特定時間延時后執(zhí)行一次Runnable 
  3.     public ScheduledFuture<?> schedule(Runnable command, 
  4.                                        long delay, TimeUnit unit); 
  5.     // 特定時間延時后執(zhí)行一次Callable 
  6.     public <V> ScheduledFuture<V> schedule(Callable<V> callable, 
  7.                                            long delay, TimeUnit unit); 
  8.     // 固定周期執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間無關(guān),周期是固定的) 
  9.     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
  10.                                                   long initialDelay, 
  11.                                                   long period, 
  12.                                                   TimeUnit unit); 
  13.      // 固定延時執(zhí)行任務(wù)(與任務(wù)執(zhí)行時間有關(guān),延時從上一次任務(wù)完成后開始) 
  14.     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
  15.                                                      long initialDelay, 
  16.                                                      long delay, 
  17.                                                      TimeUnit unit); 

 我們再來看一下接口的實現(xiàn),具體是怎么來實現(xiàn)線程池任務(wù)的提交。因為最終都回調(diào)用 delayedExecute 提交任務(wù)。所以,我們這里只分析schedule方法,該方法是指任務(wù)在指定延遲時間到達(dá)后觸發(fā),只會執(zhí)行一次。源代碼如下:

  1. public ScheduledFuture<?> schedule(Runnable command, 
  2.                                    long delay, 
  3.                                    TimeUnit unit) { 
  4.     //參數(shù)校驗 
  5.     if (command == null || unit == null
  6.         throw new NullPointerException(); 
  7.     //這里是一個嵌套結(jié)構(gòu),首先把用戶提交的任務(wù)包裝成ScheduledFutureTask 
  8.     //然后在調(diào)用decorateTask進(jìn)行包裝,該方法是留給用戶去擴展的,默認(rèn)是個空方法 
  9.     RunnableScheduledFuture<?> t = decorateTask(command, 
  10.         new ScheduledFutureTask<Void>(command, null
  11.                                       triggerTime(delay, unit))); 
  12.    //包裝好任務(wù)以后,就進(jìn)行提交了 
  13.  delayedExecute(t); 
  14.     return t; 

 delayedExecute 任務(wù)提交方法:

  1. private void delayedExecute(RunnableScheduledFuture<?> task) { 
  2.     //如果線程池已經(jīng)關(guān)閉,則使用拒絕策略把提交任務(wù)拒絕掉 
  3.  if (isShutdown()) 
  4.         reject(task); 
  5.     else { 
  6.   //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊列 
  7.         super.getQueue().add(task);//使用用的DelayedWorkQueue 
  8.   //如果當(dāng)前狀態(tài)無法執(zhí)行任務(wù),則取消 
  9.         if (isShutdown() && 
  10.             !canRunInCurrentRunState(task.isPeriodic()) && 
  11.             remove(task)) 
  12.             task.cancel(false); 
  13.         else 
  14.          //這里是增加一個worker線程,避免提交的任務(wù)沒有worker去執(zhí)行 
  15.          //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列 
  16.            ensurePrestart(); 
  17.     } 

 我們可以看到提交到線程池的任務(wù)都包裝成了 ScheduledFutureTask,繼續(xù)往下我們再來研究下。

ScheduledFutureTask

從ScheduledFutureTask類的定義可以看出,ScheduledFutureTask類是ScheduledThreadPoolExecutor類的私有內(nèi)部類,繼承了FutureTask類,并實現(xiàn)了RunnableScheduledFuture接口。也就是說,ScheduledFutureTask具有FutureTask類的所有功能,并實現(xiàn)了RunnableScheduledFuture接口的所有方法。ScheduledFutureTask類的定義如下所示:

  1. private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> 

ScheduledFutureTask類繼承圖如下:


成員變量

SchduledFutureTask接收的參數(shù)(成員變量):

  1. // 任務(wù)開始的時間 
  2.  
  3. private long time
  4.  
  5. // 任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號 
  6.  
  7. private final long sequenceNumber; 
  8.  
  9. // 任務(wù)執(zhí)行的時間間隔 
  10.  
  11. private final long period; 
  12.  
  13. //ScheduledFutureTask對象,實際指向當(dāng)前對象本身 
  14.  
  15. RunnableScheduledFuture outerTask = this; 
  16.  
  17. //當(dāng)前任務(wù)在延遲隊列中的索引,能夠更加方便的取消當(dāng)前任務(wù) 
  18.  
  19. int heapIndex; 

 解析:

  • sequenceNumber:任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號,可以根據(jù)這個序列號確定唯一的一個任務(wù),如果在定時任務(wù)中,如果一個任務(wù)是周期性執(zhí)行的,但是它們的sequenceNumber的值相同,則被視為是同一個任務(wù)。
  • time:下一次執(zhí)行任務(wù)的時間。
  • period:任務(wù)的執(zhí)行周期。
  • outerTask:ScheduledFutureTask對象,實際指向當(dāng)前對象本身。此對象的引用會被傳入到周期性執(zhí)行任務(wù)的ScheduledThreadPoolExecutor類的reExecutePeriodic方法中。
  • heapIndex:當(dāng)前任務(wù)在延遲隊列中的索引,這個索引能夠更加方便的取消當(dāng)前任務(wù)。

構(gòu)造方法

ScheduledFutureTask類繼承了FutureTask類,并實現(xiàn)了RunnableScheduledFuture接口。在ScheduledFutureTask類中提供了如下構(gòu)造方法。

  1. ScheduledFutureTask(Runnable r, V result, long ns) { 
  2.  super(r, result); 
  3.  this.time = ns; 
  4.  this.period = 0; 
  5.  this.sequenceNumber = sequencer.getAndIncrement(); 
  6.  
  7. ScheduledFutureTask(Runnable r, V result, long ns, long period) { 
  8.  super(r, result); 
  9.  this.time = ns; 
  10.  this.period = period; 
  11.  this.sequenceNumber = sequencer.getAndIncrement(); 
  12.  
  13. ScheduledFutureTask(Callable<V> callable, long ns) { 
  14.  super(callable); 
  15.  this.time = ns; 
  16.  this.period = 0; 
  17.  this.sequenceNumber = sequencer.getAndIncrement(); 

 FutureTask的構(gòu)造方法如下:

  1. public FutureTask(Runnable runnable, V result) { 
  2.  this.callable = Executors.callable(runnable, result); 
  3.  this.state = NEW;       // ensure visibility of callable 

 通過源碼可以看到,在ScheduledFutureTask類的構(gòu)造方法中,首先會調(diào)用FutureTask類的構(gòu)造方法為FutureTask類的callable和state成員變量賦值,接下來為ScheduledFutureTask類的time、period和sequenceNumber成員變量賦值。理解起來比較簡單。

getDelay方法

我們先來看getDelay方法的源碼,如下所示:

  1. //獲取下次執(zhí)行任務(wù)的時間距離當(dāng)前時間的納秒數(shù) 
  2. public long getDelay(TimeUnit unit) { 
  3.  return unit.convert(time - now(), NANOSECONDS); 

 getDelay方法比較簡單,主要用來獲取下次執(zhí)行任務(wù)的時間距離當(dāng)前系統(tǒng)時間的納秒數(shù)。

compareTo方法

ScheduledFutureTask類在類的結(jié)構(gòu)上實現(xiàn)了Comparable接口,compareTo方法主要是對Comparable接口定義的compareTo方法的實現(xiàn)。源碼如下所示:

  1. public int compareTo(Delayed other) { 
  2.  if (other == this)  
  3.   return 0; 
  4.  if (other instanceof ScheduledFutureTask) { 
  5.   ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; 
  6.   long diff = time - x.time
  7.   if (diff < 0) 
  8.    return -1; 
  9.   else if (diff > 0) 
  10.    return 1; 
  11.   else if (sequenceNumber < x.sequenceNumber) 
  12.    return -1; 
  13.   else 
  14.    return 1; 
  15.  } 
  16.  long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); 
  17.  return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; 

 這段代碼看上去好像是對各種數(shù)值類型數(shù)據(jù)的比較,本質(zhì)上是對延遲隊列中的任務(wù)進(jìn)行排序。排序規(guī)則為:

  • 首先比較延遲隊列中每個任務(wù)下次執(zhí)行的時間,下次執(zhí)行時間距離當(dāng)前時間短的任務(wù)會排在前面;
  • 如果下次執(zhí)行任務(wù)的時間相同,則會比較任務(wù)的sequenceNumber值,sequenceNumber值小的任務(wù)會排在前面。

isPeriodic方法

isPeriodic方法的源代碼如下所示:

  1. //判斷是否是周期性任務(wù) 
  2. public boolean isPeriodic() { 
  3.  return period != 0; 

 這個方法主要是用來判斷當(dāng)前任務(wù)是否是周期性任務(wù)。這里只要判斷運行任務(wù)的執(zhí)行周期不等于0就能確定為周期性任務(wù)了。因為無論period的值是大于0還是小于0,當(dāng)前任務(wù)都是周期性任務(wù)。

setNextRunTime方法

setNextRunTime方法的作用主要是設(shè)置當(dāng)前任務(wù)下次執(zhí)行的時間,源碼如下所示:

  1. private void setNextRunTime() { 
  2.  long p = period; 
  3.  //固定頻率,上次執(zhí)行任務(wù)的時間加上任務(wù)的執(zhí)行周期 
  4.  if (p > 0) 
  5.   time += p; 
  6.  //相對固定的延遲執(zhí)行,當(dāng)前系統(tǒng)時間加上任務(wù)的執(zhí)行周期 
  7.  else 
  8.   time = triggerTime(-p); 

 這里再一次證明了使用isPeriodic方法判斷當(dāng)前任務(wù)是否為周期性任務(wù)時,只要判斷period的值是否不等于0就可以了。

  • 因為如果當(dāng)前任務(wù)時固定頻率執(zhí)行的周期性任務(wù),會將周期period當(dāng)作正數(shù)來處理;
  • 如果是相對固定的延遲執(zhí)行當(dāng)前任務(wù),則會將周期period當(dāng)作負(fù)數(shù)來處理。

這里我們看到在setNextRunTime方法中,調(diào)用了ScheduledThreadPoolExecutor類的triggerTime方法。接下來我們看下triggerTime方法的源碼。

ScheduledThreadPoolExecutor類的triggerTime方法

triggerTime方法用于獲取延遲隊列中的任務(wù)下一次執(zhí)行的具體時間。源碼如下所示。

  1. private long triggerTime(long delay, TimeUnit unit) { 
  2.  return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); 
  3.  
  4. long triggerTime(long delay) { 
  5.  return now() + 
  6.   ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); 

 這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執(zhí)行任務(wù)的具體時間。有一點需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。

我們看到在triggerTime方法中處理防止溢出的邏輯使用了ScheduledThreadPoolExecutor類的overflowFree方法,接下來,我們就看看ScheduledThreadPoolExecutor類的overflowFree方法的實現(xiàn)。

ScheduledThreadPoolExecutor類的overflowFree方法

overflowFree方法的源代碼如下所示:

  1. private long overflowFree(long delay) { 
  2.  //獲取隊列中的節(jié)點 
  3.  Delayed head = (Delayed) super.getQueue().peek(); 
  4.  //獲取的節(jié)點不為空,則進(jìn)行后續(xù)處理 
  5.  if (head != null) { 
  6.   //從隊列節(jié)點中獲取延遲時間 
  7.   long headDelay = head.getDelay(NANOSECONDS); 
  8.   //如果從隊列中獲取的延遲時間小于0,并且傳遞的delay 
  9.   //值減去從隊列節(jié)點中獲取延遲時間小于0 
  10.   if (headDelay < 0 && (delay - headDelay < 0)) 
  11.    //將delay的值設(shè)置為Long.MAX_VALUE + headDelay 
  12.    delay = Long.MAX_VALUE + headDelay; 
  13.  } 
  14.  //返回延遲時間 
  15.  return delay; 

 通過對overflowFree方法的源碼分析,可以看出overflowFree方法本質(zhì)上就是為了限制隊列中的所有節(jié)點的延遲時間在Long.MAX_VALUE值之內(nèi),防止在compareTo方法中溢出。

cancel方法

cancel方法的作用主要是取消當(dāng)前任務(wù)的執(zhí)行,源碼如下所示:

  1. public boolean cancel(boolean mayInterruptIfRunning) { 
  2.  //取消任務(wù),返回任務(wù)是否取消的標(biāo)識 
  3.  boolean cancelled = super.cancel(mayInterruptIfRunning); 
  4.  //如果任務(wù)已經(jīng)取消 
  5.  //并且需要將任務(wù)從延遲隊列中刪除 
  6.  //并且任務(wù)在延遲隊列中的索引大于或者等于0 
  7.  if (cancelled && removeOnCancel && heapIndex >= 0) 
  8.   //將當(dāng)前任務(wù)從延遲隊列中刪除 
  9.   remove(this); 
  10.  //返回是否成功取消任務(wù)的標(biāo)識 
  11.  return cancelled; 

 這段代碼理解起來相對比較簡單,首先調(diào)用取消任務(wù)的方法,并返回任務(wù)是否已經(jīng)取消的標(biāo)識。如果任務(wù)已經(jīng)取消,并且需要移除任務(wù),同時,任務(wù)在延遲隊列中的索引大于或者等于0,則將當(dāng)前任務(wù)從延遲隊列中移除。最后返回任務(wù)是否成功取消的標(biāo)識。

run方法

run方法可以說是ScheduledFutureTask類的核心方法,是對Runnable接口的實現(xiàn),源碼如下所示:

  1. public void run() { 
  2.  //當(dāng)前任務(wù)是否是周期性任務(wù) 
  3.  boolean periodic = isPeriodic(); 
  4.  //線程池當(dāng)前運行狀態(tài)下不能執(zhí)行周期性任務(wù) 
  5.  if (!canRunInCurrentRunState(periodic)) 
  6.   //取消任務(wù)的執(zhí)行 
  7.   cancel(false); 
  8.  //如果不是周期性任務(wù) 
  9.  else if (!periodic) 
  10.   //則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù) 
  11.   ScheduledFutureTask.super.run(); 
  12.  //如果是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù) 
  13.  //如果任務(wù)執(zhí)行成功 
  14.  else if (ScheduledFutureTask.super.runAndReset()) { 
  15.   //設(shè)置下次執(zhí)行任務(wù)的時間 
  16.   setNextRunTime(); 
  17.   //重復(fù)執(zhí)行任務(wù) 
  18.   reExecutePeriodic(outerTask); 
  19.  } 

 整理一下方法的邏輯:

  1. 首先判斷當(dāng)前任務(wù)是否是周期性任務(wù)。如果線程池當(dāng)前運行狀態(tài)下不能執(zhí)行周期性任務(wù),則取消任務(wù)的執(zhí)行,否則執(zhí)行步驟2;
  2. 如果當(dāng)前任務(wù)不是周期性任務(wù),則直接調(diào)用FutureTask類的run方法執(zhí)行任務(wù),會設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
  3. 如果當(dāng)前任務(wù)是周期性任務(wù),則調(diào)用FutureTask類的runAndReset方法執(zhí)行任務(wù),不會設(shè)置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4;
  4. 如果任務(wù)執(zhí)行成功,則設(shè)置下次執(zhí)行任務(wù)的時間,同時,將任務(wù)設(shè)置為重復(fù)執(zhí)行。

這里,調(diào)用了FutureTask類的run方法和runAndReset方法,并且調(diào)用了ScheduledThreadPoolExecutor類的reExecutePeriodic方法。接下來,我們分別看下這些方法的實現(xiàn)。

FutureTask類的run方法

FutureTask類的run方法源碼如下所示:

  1. public void run() { 
  2.     //狀態(tài)如果不是NEW,說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返回 
  3.     //狀態(tài)如果是NEW,則嘗試把當(dāng)前執(zhí)行線程保存在runner字段中 
  4.     //如果賦值失敗則直接返回 
  5.     if (state != NEW || 
  6.         !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 
  7.         return
  8.     try { 
  9.         Callable<V> c = callable; 
  10.         if (c != null && state == NEW) { 
  11.             V result; 
  12.             boolean ran; 
  13.             try { 
  14.                 //執(zhí)行任務(wù) 
  15.                 result = c.call(); 
  16.                 ran = true
  17.             } catch (Throwable ex) { 
  18.                 result = null
  19.                 ran = false
  20.                 //任務(wù)異常 
  21.                 setException(ex); 
  22.             } 
  23.             if (ran) 
  24.                 //任務(wù)正常執(zhí)行完畢 
  25.                 set(result); 
  26.         } 
  27.     } finally { 
  28.  
  29.         runner = null
  30.         int s = state; 
  31.         //如果任務(wù)被中斷,執(zhí)行中斷處理 
  32.         if (s >= INTERRUPTING) 
  33.             handlePossibleCancellationInterrupt(s); 
  34.     } 

 代碼的整體邏輯為:

  • 判斷當(dāng)前任務(wù)的state是否等于NEW,如果不為NEW則說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返回;
  • 如果狀態(tài)為NEW則接著會通過unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中,如果保存失敗,則直接返回;
  • 執(zhí)行任務(wù);如果任務(wù)執(zhí)行發(fā)生異常,則調(diào)用setException()方法保存異常信息。

FutureTask類的runAndReset方法

方法的源碼如下所示:

  1. protected boolean runAndReset() { 
  2.  if (state != NEW || 
  3.   !UNSAFE.compareAndSwapObject(this, runnerOffset, 
  4.           null, Thread.currentThread())) 
  5.   return false
  6.  boolean ran = false
  7.  int s = state; 
  8.  try { 
  9.   Callable<V> c = callable; 
  10.   if (c != null && s == NEW) { 
  11.    try { 
  12.     c.call(); // don't set result 
  13.     ran = true
  14.    } catch (Throwable ex) { 
  15.     setException(ex); 
  16.    } 
  17.   } 
  18.  } finally { 
  19.   // runner must be non-null until state is settled to 
  20.   // prevent concurrent calls to run() 
  21.   runner = null
  22.   // state must be re-read after nulling runner to prevent 
  23.   // leaked interrupts 
  24.   s = state; 
  25.   if (s >= INTERRUPTING) 
  26.    handlePossibleCancellationInterrupt(s); 
  27.  } 
  28.  return ran && s == NEW; 

 FutureTask類的runAndReset方法與run方法的邏輯基本相同,只是runAndReset方法會重置當(dāng)前任務(wù)的執(zhí)行狀態(tài)。

ScheduledThreadPoolExecutor類的reExecutePeriodic方法

reExecutePeriodic重復(fù)執(zhí)行任務(wù)方法,源代碼如下所示:

  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) { 
  2.  //線程池當(dāng)前狀態(tài)下能夠執(zhí)行任務(wù) 
  3.  if (canRunInCurrentRunState(true)) { 
  4.   //與ThreadPoolExecutor不同,這里直接把任務(wù)加入延遲隊列 
  5.         super.getQueue().add(task);//使用用的DelayedWorkQueue 
  6.   //線程池當(dāng)前狀態(tài)下不能執(zhí)行任務(wù),并且成功移除任務(wù) 
  7.   if (!canRunInCurrentRunState(true) && remove(task)) 
  8.    //取消任務(wù) 
  9.    task.cancel(false); 
  10.   else 
  11.    //這里是增加一個worker線程,避免提交的任務(wù)沒有worker去執(zhí)行 
  12.             //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列   
  13.    ensurePrestart(); 
  14.  } 

 總體來說reExecutePeriodic方法的邏輯比較簡單,需要注意的是:調(diào)用reExecutePeriodic方法的時候已經(jīng)執(zhí)行過一次任務(wù),所以,并不會觸發(fā)線程池的拒絕策略;傳入reExecutePeriodic方法的任務(wù)一定是周期性的任務(wù)。

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實現(xiàn)阻塞的工作隊列,是因為 ScheduleThreadPoolExecutor 要求的工作隊列有些特殊。

DelayedWorkQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時任務(wù)的時候,每個任務(wù)的執(zhí)行時間都不同,所以DelayedWorkQueue的工作就是按照執(zhí)行時間的升序來排列,執(zhí)行時間距離當(dāng)前時間越近的任務(wù)在隊列的前面(注意:這里的順序并不是絕對的,堆中的排序只保證了子節(jié)點的下次執(zhí)行時間要比父節(jié)點的下次執(zhí)行時間要大,而葉子節(jié)點之間并不一定是順序的)。

堆結(jié)構(gòu)如下圖:

 可見,DelayedWorkQueue是一個基于最小堆結(jié)構(gòu)的隊列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

并發(fā)編程之定時任務(wù)&定時線程池原理解析

 在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性: 假設(shè)“第一個元素” 在數(shù)組中的索引為 0 的話,則父結(jié)點和子結(jié)點的位置關(guān)系如下:

  • 索引為 的左孩子的索引是 (2∗i+1);
  • 索引為 的右孩子的索引是 (2∗i+2);
  • 索引為 的父結(jié)點的索引是 floor((i−1)/2);

為什么要使用DelayedWorkQueue呢?

  • 定時任務(wù)執(zhí)行時需要取出最近要執(zhí)行的任務(wù),所以任務(wù)在隊列中每次出隊時一定要是當(dāng)前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列。
  • DelayedWorkQueue是一個優(yōu)先級隊列,它可以保證每次出隊的任務(wù)都是當(dāng)前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復(fù)雜度是 O(logN)。

因為前面講阻塞隊列實現(xiàn)的時候,已經(jīng)對DelayedWorkQueue進(jìn)行了說明,更多內(nèi)容請查看《阻塞隊列 — DelayedWorkQueue源碼分析》

總結(jié)

  1. 與Timer執(zhí)行定時任務(wù)比較,相比Timer,ScheduledThreadPoolExecutor有說明優(yōu)點?(文章前面分析過)
  2. ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有 coorPoolSize 和 workQueue,但是 ScheduledThreadPoolExecutor特殊的地方在于,自己實現(xiàn)了優(yōu)先工作隊列 DelayedWorkQueue ;
  3. ScheduledThreadPoolExecutor 實現(xiàn)了 ScheduledExecutorService,所以就有了任務(wù)調(diào)度的方法,如 schedule 、 scheduleAtFixedRate 、 scheduleWithFixedDelay ,同時注意他們之間的區(qū)別;
  4. 內(nèi)部類 ScheduledFutureTask 繼承者FutureTask,實現(xiàn)了任務(wù)的異步執(zhí)行并且可以獲取返回結(jié)果。同時實現(xiàn)了Delayed接口,可以通過getDelay方法獲取將要執(zhí)行的時間間隔;
  5. 周期任務(wù)的執(zhí)行其實是調(diào)用了FutureTask的 runAndReset 方法,每次執(zhí)行完不設(shè)置結(jié)果和狀態(tài)。
  6. DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個基于最小堆結(jié)構(gòu)的優(yōu)先隊列,并且每次出隊時能夠保證取出的任務(wù)是當(dāng)前隊列中下次執(zhí)行時間最小的任務(wù)。同時注意一下優(yōu)先隊列中堆的順序,堆中的順序并不是絕對的,但要保證子節(jié)點的值要比父節(jié)點的值要大,這樣就不會影響出隊的順序。

總體來說,ScheduedThreadPoolExecutor的重點是要理解下次執(zhí)行時間的計算,以及優(yōu)先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關(guān)鍵。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2020-12-08 08:53:53

編程ThreadPoolE線程池

2022-03-28 08:31:29

線程池定時任務(wù)

2024-02-28 09:54:07

線程池配置

2020-12-21 07:31:23

實現(xiàn)單機JDK

2024-12-27 08:24:55

2017-01-10 13:39:57

Python線程池進(jìn)程池

2023-01-16 08:49:20

RocketMQ定時任務(wù)源代

2009-10-28 10:05:29

Ubuntucrontab定時任務(wù)

2012-02-07 13:31:14

SpringJava

2010-03-10 15:47:58

crontab定時任務(wù)

2022-11-09 09:01:08

并發(fā)編程線程池

2016-10-21 11:04:07

JavaScript異步編程原理解析

2024-11-04 16:01:01

2010-01-07 13:38:41

Linux定時任務(wù)

2024-05-13 09:49:30

.NETQuartz庫Cron表達(dá)式

2023-11-07 07:47:35

Topic線程PUSH

2021-12-16 14:25:03

Linux定時任務(wù)

2023-12-19 08:09:06

Python定時任務(wù)Cron表達(dá)式

2022-08-15 15:43:29

Linuxcron

2016-12-27 19:29:14

Linux命令定時任務(wù)
點贊
收藏

51CTO技術(shù)棧公眾號