偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

高并發(fā)高性能的定時(shí)器實(shí)現(xiàn)

開(kāi)發(fā) 架構(gòu)
我們經(jīng)常都會(huì)碰到延遲任務(wù),定時(shí)任務(wù)這種需求。在網(wǎng)絡(luò)連接的場(chǎng)景中,常常會(huì)出現(xiàn)一些超時(shí)控制。隨著連接數(shù)量的增加,這些超時(shí)任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對(duì)大量任務(wù)的超時(shí)管理并不是一個(gè)容易的事情。

[[413551]]

前言

我們經(jīng)常都會(huì)碰到延遲任務(wù),定時(shí)任務(wù)這種需求。在網(wǎng)絡(luò)連接的場(chǎng)景中,常常會(huì)出現(xiàn)一些超時(shí)控制。隨著連接數(shù)量的增加,這些超時(shí)任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對(duì)大量任務(wù)的超時(shí)管理并不是一個(gè)容易的事情。

幾種定時(shí)任務(wù)的實(shí)現(xiàn)

java.util.Timer

JDK 在 1.3 的時(shí)候引入了Timer數(shù)據(jù)結(jié)構(gòu)用于實(shí)現(xiàn)定時(shí)任務(wù)。Timer的實(shí)現(xiàn)思路比較簡(jiǎn)單,其內(nèi)部有兩個(gè)主要屬性:

  • TaskQueue:定時(shí)任務(wù)抽象類TimeTask的列表。
  • TimerThread:用于執(zhí)行定時(shí)任務(wù)的線程。
  1. private final TaskQueue queue = new TaskQueue();   
  2.     private final TimerThread thread = new TimerThread(queue); 

Timer結(jié)構(gòu)還定義了一個(gè)抽象類TimerTask并且繼承了Runnable接口。業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)了這個(gè)抽象類的run方法用于提供具體的延時(shí)任務(wù)邏輯。

TaskQueue內(nèi)部采用大頂堆的方式,依據(jù)任務(wù)的觸發(fā)時(shí)間進(jìn)行排序。而TimerThread則以死循環(huán)的方式從TaskQueue獲取隊(duì)列頭,等待隊(duì)列頭的任務(wù)的超時(shí)時(shí)間到達(dá)后觸發(fā)該任務(wù),并且將任務(wù)從隊(duì)列中移除。

Timer的數(shù)據(jù)結(jié)構(gòu)和算法都很容易理解。所有的超時(shí)任務(wù)都首先進(jìn)入延時(shí)隊(duì)列。后臺(tái)超時(shí)線程不斷的從延遲隊(duì)列中獲取任務(wù)并且等待超時(shí)時(shí)間到達(dá)后執(zhí)行任務(wù)。延遲隊(duì)列采用大頂堆排序,在延遲任務(wù)的場(chǎng)景中有三種操作,分別是:添加任務(wù),提取隊(duì)列頭任務(wù),查看隊(duì)列頭任務(wù)。

查看隊(duì)列頭任務(wù)的事件復(fù)雜度是 O(1) 。而添加任務(wù)和提取隊(duì)列頭任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) 。當(dāng)任務(wù)數(shù)量較大時(shí),添加和刪除的開(kāi)銷也是比較大的。此外,由于Timer內(nèi)部只有一個(gè)處理線程,如果有一個(gè)延遲任務(wù)的處理消耗了較多的時(shí)間,會(huì)對(duì)應(yīng)的延遲后續(xù)任務(wù)的處理。

代碼如下:

  1. public static void main(String[] args) { 
  2.         Timer timer = new Timer(); 
  3.         // 延遲 1秒 執(zhí)行任務(wù) 
  4.         timer.schedule( 
  5.                 new java.util.TimerTask() { 
  6.                     @Override 
  7.                     public void run() { 
  8.                         System.out.println("延遲 1秒 執(zhí)行任務(wù)"+System.currentTimeMillis()); 
  9.                     } 
  10.                 } 
  11.         ,1000); 
  12.  
  13.         timer.schedule( 
  14.                 new java.util.TimerTask() { 
  15.                     @Override 
  16.                     public void run() { 
  17.                         System.out.println("延遲 2秒 執(zhí)行任務(wù)"+System.currentTimeMillis()); 
  18.                     } 
  19.                 } 
  20.                 ,2000); 
  21.         try { 
  22.             Thread.sleep(5000); 
  23.         } catch (InterruptedException e) { 
  24.             e.printStackTrace(); 
  25.         } 
  26.         timer.cancel(); 
  27.     } 

ScheduledThreadPoolExecutor

由于Timer只有一個(gè)線程用來(lái)處理延遲任務(wù),在任務(wù)數(shù)量很多的時(shí)候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService后,也對(duì)應(yīng)的提供了一個(gè)用于處理延時(shí)任務(wù)的ScheduledExecutorService子類接口。該接口內(nèi)部也一樣使用了一個(gè)使用小頂堆進(jìn)行排序的延遲隊(duì)列存放任務(wù)。線程池中的線程會(huì)在這個(gè)隊(duì)列上等待直到有任務(wù)可以提取。

整體來(lái)說(shuō),ScheduledExecutorService 區(qū)別于 Timer 的地方就在于前者依賴了線程池來(lái)執(zhí)行任務(wù),而任務(wù)本身會(huì)判斷是什么類型的任務(wù),需要重復(fù)執(zhí)行的在任務(wù)執(zhí)行結(jié)束后會(huì)被重新添加到任務(wù)隊(duì)列。

而對(duì)于后者來(lái)說(shuō),它只依賴一個(gè)線程不停的去獲取隊(duì)列首部的任務(wù)并嘗試執(zhí)行它,無(wú)論是效率上、還是安全性上都比不上前者。

ScheduledExecutorService的實(shí)現(xiàn)上有一些特殊,只有一個(gè)線程能夠提取到延遲隊(duì)列頭的任務(wù),并且根據(jù)任務(wù)的超時(shí)時(shí)間進(jìn)行等待。在這個(gè)等待期間,其他的線程是無(wú)法獲取任務(wù)的。這樣的實(shí)現(xiàn)是為了避免多個(gè)線程同時(shí)獲取任務(wù),導(dǎo)致超時(shí)時(shí)間未到達(dá)就任務(wù)觸發(fā)或者在等待任務(wù)超時(shí)時(shí)間時(shí)有新的任務(wù)被加入而無(wú)法響應(yīng)。

由于ScheduledExecutorService可以使用多個(gè)線程,這樣也緩解了因?yàn)閭€(gè)別任務(wù)執(zhí)行時(shí)間長(zhǎng)導(dǎo)致的后續(xù)任務(wù)被阻塞的情況。不過(guò)延遲隊(duì)列也是一樣采用小頂堆的排序方式,因此添加任務(wù)和刪除任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) 。在任務(wù)數(shù)量很大的情況下,性能表現(xiàn)比較差。

代碼如下:

  1. public class ScheduledThreadPoolServiceTest { 
  2.     // 參數(shù)代表可以同時(shí)執(zhí)行的定時(shí)任務(wù)個(gè)數(shù) 
  3.     private ScheduledExecutorService service = Executors.newScheduledThreadPool(3); 
  4.     /** 
  5.      * schedule:延時(shí)2秒執(zhí)行一次任務(wù) 
  6.      */ 
  7.     public void task0() { 
  8.         service.schedule(() -> { 
  9.             System.out.println("task0-start"); 
  10.             sleep(2); 
  11.             System.out.println("task0-end"); 
  12.         }, 2, TimeUnit.SECONDS); 
  13.     } 
  14.  
  15.     /** 
  16.      * scheduleAtFixedRate:2秒后,每間隔4秒執(zhí)行一次任務(wù) 
  17.      * 注意,如果任務(wù)的執(zhí)行時(shí)間(例如6秒)大于間隔時(shí)間,則會(huì)等待任務(wù)執(zhí)行結(jié)束后直接開(kāi)始下次任務(wù) 
  18.      */ 
  19.     public void task1() { 
  20.         service.scheduleAtFixedRate(() -> { 
  21.             System.out.println("task1-start"); 
  22.             sleep(2); 
  23.             System.out.println("task1-end"); 
  24.         }, 2, 4, TimeUnit.SECONDS); 
  25.     } 
  26.  
  27.     /** 
  28.      * scheduleWithFixedDelay:2秒后,每次延時(shí)4秒執(zhí)行一次任務(wù) 
  29.      * 注意,這里是等待上次任務(wù)執(zhí)行結(jié)束后,再延時(shí)固定時(shí)間后開(kāi)始下次任務(wù) 
  30.      */ 
  31.     public void task2() { 
  32.         service.scheduleWithFixedDelay(() -> { 
  33.             System.out.println("task2-start"); 
  34.             sleep(2); 
  35.             System.out.println("task2-end"); 
  36.         }, 2, 4, TimeUnit.SECONDS); 
  37.     } 
  38.  
  39.     private void sleep(long time) { 
  40.         try { 
  41.             TimeUnit.SECONDS.sleep(time); 
  42.         } catch (InterruptedException e) { 
  43.             e.printStackTrace(); 
  44.         } 
  45.     } 
  46.  
  47.     public static void main(String[] args) { 
  48.         ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest(); 
  49.         System.out.println("main start"); 
  50.         test.task0(); 
  51.         //test.task1(); 
  52.        // test.task2(); 
  53.         test.sleep(10); 
  54.         System.out.println("main end"); 
  55.     } 

DelayQueue

Java 中還有個(gè)延遲隊(duì)列 DelayQueue,加入延遲隊(duì)列的元素都必須實(shí)現(xiàn) Delayed 接口。延遲隊(duì)列內(nèi)部是利用 PriorityQueue 實(shí)現(xiàn)的,所以還是利用優(yōu)先隊(duì)列!Delayed 接口繼承了Comparable 因此優(yōu)先隊(duì)列是通過(guò) delay 來(lái)排序的。

Redis sorted set

Redis的數(shù)據(jù)結(jié)構(gòu)Zset,同樣可以實(shí)現(xiàn)延遲隊(duì)列的效果,主要利用它的score屬性,redis通過(guò)score來(lái)為集合中的成員進(jìn)行從小到大的排序。zset 內(nèi)部是用跳表實(shí)現(xiàn)的。

跳表數(shù)據(jù)結(jié)構(gòu)的示意圖:

總體上,跳躍表刪除操作的時(shí)間復(fù)雜度是O(logN)。

有沒(méi)有更高效的數(shù)據(jù)結(jié)構(gòu)?

Timer 、ScheduledThreadPool 、 DelayQueue,總結(jié)的說(shuō)下它們都是通過(guò)優(yōu)先隊(duì)列來(lái)獲取最早需要執(zhí)行的任務(wù),因此插入和刪除任務(wù)的時(shí)間復(fù)雜度都為O(logn),并且 Timer 、ScheduledThreadPool 的周期性任務(wù)是通過(guò)重置任務(wù)的下一次執(zhí)行時(shí)間來(lái)完成的。

但是由于新增任務(wù)和提取任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) ,在任務(wù)數(shù)量很大,比如幾萬(wàn),十幾萬(wàn)的時(shí)候,性能的開(kāi)銷就變得很巨大。

問(wèn)題就出在時(shí)間復(fù)雜度上,插入刪除時(shí)間復(fù)雜度是O(logn),那么假設(shè)頻繁插入刪除次數(shù)為 m,總的時(shí)間復(fù)雜度就是O(mlogn)

那么,是否存在新增任務(wù)和提取任務(wù)比 O(Log2n) 復(fù)雜度更低的數(shù)據(jù)結(jié)構(gòu)呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設(shè)計(jì)了一種名為時(shí)間輪( Timing Wheels )的數(shù)據(jù)結(jié)構(gòu),這種結(jié)構(gòu)在處理延遲任務(wù)時(shí),其新增任務(wù)和刪除任務(wù)的時(shí)間復(fù)雜度降低到了 O(1) 。

時(shí)間輪算法

基本原理

見(jiàn)名知意,時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)很類似于我們鐘表上的數(shù)據(jù)指針。

時(shí)間輪用環(huán)形數(shù)組實(shí)現(xiàn),數(shù)組的每個(gè)元素可以稱為槽,和 HashMap一樣稱呼。

槽的內(nèi)部用雙向鏈表存著待執(zhí)行的任務(wù),添加和刪除的鏈表操作時(shí)間復(fù)雜度都是 O(1),槽位本身也指代時(shí)間精度,比如一秒掃一個(gè)槽,那么這個(gè)時(shí)間輪的最高精度就是 1 秒。

也就是說(shuō)延遲 1.2 秒的任務(wù)和 1.5 秒的任務(wù)會(huì)被加入到同一個(gè)槽中,然后在 1 秒的時(shí)候遍歷這個(gè)槽中的鏈表執(zhí)行任務(wù)。

任務(wù)插入

當(dāng)有一個(gè)延遲任務(wù)要插入時(shí)間輪時(shí),首先計(jì)算其延遲時(shí)間與單位時(shí)間的余值,從指針指向的當(dāng)前槽位移動(dòng)余值的個(gè)數(shù)槽位,就是該延遲任務(wù)需要被放入的槽位。

舉個(gè)例子,時(shí)間輪有8個(gè)槽位,編號(hào)為 0 ~ 7 。指針當(dāng)前指向槽位 2 。新增一個(gè)延遲時(shí)間為 4 秒的延遲任務(wù),4 % 8 = 4,因此該任務(wù)會(huì)被插入 4 + 2 = 6,也就是槽位6的延遲任務(wù)隊(duì)列。

時(shí)間槽位的實(shí)現(xiàn)

時(shí)間輪的槽位實(shí)現(xiàn)可以采用循環(huán)數(shù)組的方式達(dá)成,也就是讓指針在越過(guò)數(shù)組的邊界后重新回到起始下標(biāo)。概括來(lái)說(shuō),可以將時(shí)間輪的算法描述為:

用隊(duì)列來(lái)存儲(chǔ)延遲任務(wù),同一個(gè)隊(duì)列中的任務(wù),其延遲時(shí)間相同。用循環(huán)數(shù)組的方式來(lái)存儲(chǔ)元素,數(shù)組中的每一個(gè)元素都指向一個(gè)延遲任務(wù)隊(duì)列。

有一個(gè)當(dāng)前指針指向數(shù)組中的某一個(gè)槽位,每間隔一個(gè)單位時(shí)間,指針就移動(dòng)到下一個(gè)槽位。被指針指向的槽位的延遲隊(duì)列,其中的延遲任務(wù)全部被觸發(fā)。

在時(shí)間輪中新增一個(gè)延遲任務(wù),將其延遲時(shí)間除以單位時(shí)間得到的余值,從當(dāng)前指針開(kāi)始,移動(dòng)余值對(duì)應(yīng)個(gè)數(shù)的槽位,就是延遲任務(wù)被放入的槽位。

基于這樣的數(shù)據(jù)結(jié)構(gòu),插入一個(gè)延遲任務(wù)的時(shí)間復(fù)雜度就下降到 O(1) 。而當(dāng)指針指向到一個(gè)槽位時(shí),該槽位連接的延遲任務(wù)隊(duì)列中的延遲任務(wù)全部被觸發(fā)。

延遲任務(wù)的觸發(fā)和執(zhí)行不應(yīng)該影響指針向后移動(dòng)的時(shí)間精確性。因此一般情況下,用于移動(dòng)指針的線程只負(fù)責(zé)任務(wù)的觸發(fā),任務(wù)的執(zhí)行交由其他的線程來(lái)完成。比如,可以將槽位上的延遲任務(wù)隊(duì)列放入到額外的線程池中執(zhí)行,然后在槽位上新建一個(gè)空白的新的延遲任務(wù)隊(duì)列用于后續(xù)任務(wù)的添加。

關(guān)于擴(kuò)容

那假設(shè)現(xiàn)在要加入一個(gè)50秒后執(zhí)行的任務(wù)怎么辦?這槽好像不夠啊?難道要加槽嘛?和HashMap一樣擴(kuò)容?

假設(shè)要求精度為 1 秒,要能支持延遲時(shí)間為 1 天的延遲任務(wù),時(shí)間輪的槽位數(shù)需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內(nèi)存。顯然,單純?cè)黾硬畚粩?shù)并不是一個(gè)好的解決方案。

常見(jiàn)有兩種方式:

通過(guò)增加輪次。50 % 8 + 1 = 3,即應(yīng)該放在槽位是 3,下標(biāo)是 2 的位置。然后 (50 - 1) / 8 = 6,即輪數(shù)記為 6。也就是說(shuō)當(dāng)循環(huán) 6 輪之后掃到下標(biāo)的 2 的這個(gè)槽位會(huì)觸發(fā)這個(gè)任務(wù)。Netty 中的 HashedWheelTimer 使用的就是這種方式。

通過(guò)多層次。這個(gè)和我們的手表就更像了,像我們秒針走一圈,分針走一格,分針走一圈,時(shí)針走一格。

多層次時(shí)間輪就是這樣實(shí)現(xiàn)的。假設(shè)上圖就是第一層,那么第一層走了一圈,第二層就走一格。

可以得知第二層的一格就是8秒,假設(shè)第二層也是 8 個(gè)槽,那么第二層走一圈,第三層走一格,可以得知第三層一格就是 64 秒。

那么一格三層,每層8個(gè)槽,一共 24 個(gè)槽時(shí)間輪就可以處理最多延遲 512 秒的任務(wù)。

而多層次時(shí)間輪還會(huì)有降級(jí)的操作,假設(shè)一個(gè)任務(wù)延遲 500 秒執(zhí)行,那么剛開(kāi)始加進(jìn)來(lái)肯定是放在第三層的,當(dāng)時(shí)間過(guò)了 436 秒后,此時(shí)還需要 64 秒就會(huì)觸發(fā)任務(wù)的執(zhí)行,而此時(shí)相對(duì)而言它就是個(gè)延遲 64 秒后的任務(wù),因此它會(huì)被降低放在第二層中,第一層還放不下它。

再過(guò)個(gè) 56 秒,相對(duì)而言它就是個(gè)延遲 8 秒后執(zhí)行的任務(wù),因此它會(huì)再被降級(jí)放在第一層中,等待執(zhí)行。

降級(jí)是為了保證時(shí)間精度一致性。Kafka內(nèi)部用的就是多層次的時(shí)間輪算法。

降級(jí)過(guò)程:

本文轉(zhuǎn)載自微信公眾號(hào)「小汪哥寫(xiě)代碼」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系小汪哥寫(xiě)代碼公眾號(hào)。

 

責(zé)任編輯:武曉燕 來(lái)源: 小汪哥寫(xiě)代碼
相關(guān)推薦

2020-11-10 07:46:09

服務(wù)器高并發(fā)高性能

2023-11-01 11:13:58

Linux信號(hào)處理定時(shí)器

2021-05-24 09:28:41

軟件開(kāi)發(fā) 技術(shù)

2024-12-04 10:58:57

TomcatJetty高并發(fā)

2017-11-27 09:14:29

2023-11-06 08:32:17

FastAPIPython

2016-12-21 09:33:40

2022-06-02 12:56:25

容器網(wǎng)絡(luò)云原生

2009-11-11 10:14:10

linux定時(shí)器操作系統(tǒng)

2010-07-28 15:56:22

FlexTimer定時(shí)

2009-06-18 11:07:17

Spring fram

2018-05-13 22:23:32

2009-06-15 15:02:48

Spring定時(shí)器

2021-08-11 10:10:26

Linux定時(shí)器數(shù)組

2022-11-02 11:40:16

Flowable定時(shí)器流程

2021-09-22 16:25:17

服務(wù)器戴爾科技集團(tuán)

2023-12-11 09:50:35

Linux定時(shí)器

2021-06-28 06:00:11

systemd定時(shí)器系統(tǒng)運(yùn)維

2019-12-31 10:33:57

Netty高性能內(nèi)存

2016-09-12 14:07:14

Android 定時(shí)器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)