高并發(fā)高性能的定時(shí)器實(shí)現(xiàn)
前言
我們經(jīng)常都會(huì)碰到延遲任務(wù),定時(shí)任務(wù)這種需求。在網(wǎng)絡(luò)連接的場(chǎng)景中,常常會(huì)出現(xiàn)一些超時(shí)控制。隨著連接數(shù)量的增加,這些超時(shí)任務(wù)的數(shù)量往往也是很龐大的。實(shí)現(xiàn)對(duì)大量任務(wù)的超時(shí)管理并不是一個(gè)容易的事情。
幾種定時(shí)任務(wù)的實(shí)現(xiàn)
java.util.Timer
JDK 在 1.3 的時(shí)候引入了Timer數(shù)據(jù)結(jié)構(gòu)用于實(shí)現(xiàn)定時(shí)任務(wù)。Timer的實(shí)現(xiàn)思路比較簡(jiǎn)單,其內(nèi)部有兩個(gè)主要屬性:
- TaskQueue:定時(shí)任務(wù)抽象類TimeTask的列表。
- TimerThread:用于執(zhí)行定時(shí)任務(wù)的線程。
- private final TaskQueue queue = new TaskQueue();
- private final TimerThread thread = new TimerThread(queue);
Timer結(jié)構(gòu)還定義了一個(gè)抽象類TimerTask并且繼承了Runnable接口。業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)了這個(gè)抽象類的run方法用于提供具體的延時(shí)任務(wù)邏輯。
TaskQueue內(nèi)部采用大頂堆的方式,依據(jù)任務(wù)的觸發(fā)時(shí)間進(jìn)行排序。而TimerThread則以死循環(huán)的方式從TaskQueue獲取隊(duì)列頭,等待隊(duì)列頭的任務(wù)的超時(shí)時(shí)間到達(dá)后觸發(fā)該任務(wù),并且將任務(wù)從隊(duì)列中移除。
Timer的數(shù)據(jù)結(jié)構(gòu)和算法都很容易理解。所有的超時(shí)任務(wù)都首先進(jìn)入延時(shí)隊(duì)列。后臺(tái)超時(shí)線程不斷的從延遲隊(duì)列中獲取任務(wù)并且等待超時(shí)時(shí)間到達(dá)后執(zhí)行任務(wù)。延遲隊(duì)列采用大頂堆排序,在延遲任務(wù)的場(chǎng)景中有三種操作,分別是:添加任務(wù),提取隊(duì)列頭任務(wù),查看隊(duì)列頭任務(wù)。
查看隊(duì)列頭任務(wù)的事件復(fù)雜度是 O(1) 。而添加任務(wù)和提取隊(duì)列頭任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) 。當(dāng)任務(wù)數(shù)量較大時(shí),添加和刪除的開(kāi)銷也是比較大的。此外,由于Timer內(nèi)部只有一個(gè)處理線程,如果有一個(gè)延遲任務(wù)的處理消耗了較多的時(shí)間,會(huì)對(duì)應(yīng)的延遲后續(xù)任務(wù)的處理。
代碼如下:
- public static void main(String[] args) {
- Timer timer = new Timer();
- // 延遲 1秒 執(zhí)行任務(wù)
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 1秒 執(zhí)行任務(wù)"+System.currentTimeMillis());
- }
- }
- ,1000);
- timer.schedule(
- new java.util.TimerTask() {
- @Override
- public void run() {
- System.out.println("延遲 2秒 執(zhí)行任務(wù)"+System.currentTimeMillis());
- }
- }
- ,2000);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- timer.cancel();
- }
ScheduledThreadPoolExecutor
由于Timer只有一個(gè)線程用來(lái)處理延遲任務(wù),在任務(wù)數(shù)量很多的時(shí)候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService后,也對(duì)應(yīng)的提供了一個(gè)用于處理延時(shí)任務(wù)的ScheduledExecutorService子類接口。該接口內(nèi)部也一樣使用了一個(gè)使用小頂堆進(jìn)行排序的延遲隊(duì)列存放任務(wù)。線程池中的線程會(huì)在這個(gè)隊(duì)列上等待直到有任務(wù)可以提取。
整體來(lái)說(shuō),ScheduledExecutorService 區(qū)別于 Timer 的地方就在于前者依賴了線程池來(lái)執(zhí)行任務(wù),而任務(wù)本身會(huì)判斷是什么類型的任務(wù),需要重復(fù)執(zhí)行的在任務(wù)執(zhí)行結(jié)束后會(huì)被重新添加到任務(wù)隊(duì)列。
而對(duì)于后者來(lái)說(shuō),它只依賴一個(gè)線程不停的去獲取隊(duì)列首部的任務(wù)并嘗試執(zhí)行它,無(wú)論是效率上、還是安全性上都比不上前者。
ScheduledExecutorService的實(shí)現(xiàn)上有一些特殊,只有一個(gè)線程能夠提取到延遲隊(duì)列頭的任務(wù),并且根據(jù)任務(wù)的超時(shí)時(shí)間進(jìn)行等待。在這個(gè)等待期間,其他的線程是無(wú)法獲取任務(wù)的。這樣的實(shí)現(xiàn)是為了避免多個(gè)線程同時(shí)獲取任務(wù),導(dǎo)致超時(shí)時(shí)間未到達(dá)就任務(wù)觸發(fā)或者在等待任務(wù)超時(shí)時(shí)間時(shí)有新的任務(wù)被加入而無(wú)法響應(yīng)。
由于ScheduledExecutorService可以使用多個(gè)線程,這樣也緩解了因?yàn)閭€(gè)別任務(wù)執(zhí)行時(shí)間長(zhǎng)導(dǎo)致的后續(xù)任務(wù)被阻塞的情況。不過(guò)延遲隊(duì)列也是一樣采用小頂堆的排序方式,因此添加任務(wù)和刪除任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) 。在任務(wù)數(shù)量很大的情況下,性能表現(xiàn)比較差。
代碼如下:
- public class ScheduledThreadPoolServiceTest {
- // 參數(shù)代表可以同時(shí)執(zhí)行的定時(shí)任務(wù)個(gè)數(shù)
- private ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
- /**
- * schedule:延時(shí)2秒執(zhí)行一次任務(wù)
- */
- public void task0() {
- service.schedule(() -> {
- System.out.println("task0-start");
- sleep(2);
- System.out.println("task0-end");
- }, 2, TimeUnit.SECONDS);
- }
- /**
- * scheduleAtFixedRate:2秒后,每間隔4秒執(zhí)行一次任務(wù)
- * 注意,如果任務(wù)的執(zhí)行時(shí)間(例如6秒)大于間隔時(shí)間,則會(huì)等待任務(wù)執(zhí)行結(jié)束后直接開(kāi)始下次任務(wù)
- */
- public void task1() {
- service.scheduleAtFixedRate(() -> {
- System.out.println("task1-start");
- sleep(2);
- System.out.println("task1-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- /**
- * scheduleWithFixedDelay:2秒后,每次延時(shí)4秒執(zhí)行一次任務(wù)
- * 注意,這里是等待上次任務(wù)執(zhí)行結(jié)束后,再延時(shí)固定時(shí)間后開(kāi)始下次任務(wù)
- */
- public void task2() {
- service.scheduleWithFixedDelay(() -> {
- System.out.println("task2-start");
- sleep(2);
- System.out.println("task2-end");
- }, 2, 4, TimeUnit.SECONDS);
- }
- private void sleep(long time) {
- try {
- TimeUnit.SECONDS.sleep(time);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main(String[] args) {
- ScheduledThreadPoolServiceTest test = new ScheduledThreadPoolServiceTest();
- System.out.println("main start");
- test.task0();
- //test.task1();
- // test.task2();
- test.sleep(10);
- System.out.println("main end");
- }
- }
DelayQueue
Java 中還有個(gè)延遲隊(duì)列 DelayQueue,加入延遲隊(duì)列的元素都必須實(shí)現(xiàn) Delayed 接口。延遲隊(duì)列內(nèi)部是利用 PriorityQueue 實(shí)現(xiàn)的,所以還是利用優(yōu)先隊(duì)列!Delayed 接口繼承了Comparable 因此優(yōu)先隊(duì)列是通過(guò) delay 來(lái)排序的。
Redis sorted set
Redis的數(shù)據(jù)結(jié)構(gòu)Zset,同樣可以實(shí)現(xiàn)延遲隊(duì)列的效果,主要利用它的score屬性,redis通過(guò)score來(lái)為集合中的成員進(jìn)行從小到大的排序。zset 內(nèi)部是用跳表實(shí)現(xiàn)的。
跳表數(shù)據(jù)結(jié)構(gòu)的示意圖:
總體上,跳躍表刪除操作的時(shí)間復(fù)雜度是O(logN)。
有沒(méi)有更高效的數(shù)據(jù)結(jié)構(gòu)?
Timer 、ScheduledThreadPool 、 DelayQueue,總結(jié)的說(shuō)下它們都是通過(guò)優(yōu)先隊(duì)列來(lái)獲取最早需要執(zhí)行的任務(wù),因此插入和刪除任務(wù)的時(shí)間復(fù)雜度都為O(logn),并且 Timer 、ScheduledThreadPool 的周期性任務(wù)是通過(guò)重置任務(wù)的下一次執(zhí)行時(shí)間來(lái)完成的。
但是由于新增任務(wù)和提取任務(wù)的時(shí)間復(fù)雜度都是 O(Logn) ,在任務(wù)數(shù)量很大,比如幾萬(wàn),十幾萬(wàn)的時(shí)候,性能的開(kāi)銷就變得很巨大。
問(wèn)題就出在時(shí)間復(fù)雜度上,插入刪除時(shí)間復(fù)雜度是O(logn),那么假設(shè)頻繁插入刪除次數(shù)為 m,總的時(shí)間復(fù)雜度就是O(mlogn)
那么,是否存在新增任務(wù)和提取任務(wù)比 O(Log2n) 復(fù)雜度更低的數(shù)據(jù)結(jié)構(gòu)呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設(shè)計(jì)了一種名為時(shí)間輪( Timing Wheels )的數(shù)據(jù)結(jié)構(gòu),這種結(jié)構(gòu)在處理延遲任務(wù)時(shí),其新增任務(wù)和刪除任務(wù)的時(shí)間復(fù)雜度降低到了 O(1) 。
時(shí)間輪算法
基本原理
見(jiàn)名知意,時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)很類似于我們鐘表上的數(shù)據(jù)指針。
時(shí)間輪用環(huán)形數(shù)組實(shí)現(xiàn),數(shù)組的每個(gè)元素可以稱為槽,和 HashMap一樣稱呼。
槽的內(nèi)部用雙向鏈表存著待執(zhí)行的任務(wù),添加和刪除的鏈表操作時(shí)間復(fù)雜度都是 O(1),槽位本身也指代時(shí)間精度,比如一秒掃一個(gè)槽,那么這個(gè)時(shí)間輪的最高精度就是 1 秒。
也就是說(shuō)延遲 1.2 秒的任務(wù)和 1.5 秒的任務(wù)會(huì)被加入到同一個(gè)槽中,然后在 1 秒的時(shí)候遍歷這個(gè)槽中的鏈表執(zhí)行任務(wù)。
任務(wù)插入
當(dāng)有一個(gè)延遲任務(wù)要插入時(shí)間輪時(shí),首先計(jì)算其延遲時(shí)間與單位時(shí)間的余值,從指針指向的當(dāng)前槽位移動(dòng)余值的個(gè)數(shù)槽位,就是該延遲任務(wù)需要被放入的槽位。
舉個(gè)例子,時(shí)間輪有8個(gè)槽位,編號(hào)為 0 ~ 7 。指針當(dāng)前指向槽位 2 。新增一個(gè)延遲時(shí)間為 4 秒的延遲任務(wù),4 % 8 = 4,因此該任務(wù)會(huì)被插入 4 + 2 = 6,也就是槽位6的延遲任務(wù)隊(duì)列。
時(shí)間槽位的實(shí)現(xiàn)
時(shí)間輪的槽位實(shí)現(xiàn)可以采用循環(huán)數(shù)組的方式達(dá)成,也就是讓指針在越過(guò)數(shù)組的邊界后重新回到起始下標(biāo)。概括來(lái)說(shuō),可以將時(shí)間輪的算法描述為:
用隊(duì)列來(lái)存儲(chǔ)延遲任務(wù),同一個(gè)隊(duì)列中的任務(wù),其延遲時(shí)間相同。用循環(huán)數(shù)組的方式來(lái)存儲(chǔ)元素,數(shù)組中的每一個(gè)元素都指向一個(gè)延遲任務(wù)隊(duì)列。
有一個(gè)當(dāng)前指針指向數(shù)組中的某一個(gè)槽位,每間隔一個(gè)單位時(shí)間,指針就移動(dòng)到下一個(gè)槽位。被指針指向的槽位的延遲隊(duì)列,其中的延遲任務(wù)全部被觸發(fā)。
在時(shí)間輪中新增一個(gè)延遲任務(wù),將其延遲時(shí)間除以單位時(shí)間得到的余值,從當(dāng)前指針開(kāi)始,移動(dòng)余值對(duì)應(yīng)個(gè)數(shù)的槽位,就是延遲任務(wù)被放入的槽位。
基于這樣的數(shù)據(jù)結(jié)構(gòu),插入一個(gè)延遲任務(wù)的時(shí)間復(fù)雜度就下降到 O(1) 。而當(dāng)指針指向到一個(gè)槽位時(shí),該槽位連接的延遲任務(wù)隊(duì)列中的延遲任務(wù)全部被觸發(fā)。
延遲任務(wù)的觸發(fā)和執(zhí)行不應(yīng)該影響指針向后移動(dòng)的時(shí)間精確性。因此一般情況下,用于移動(dòng)指針的線程只負(fù)責(zé)任務(wù)的觸發(fā),任務(wù)的執(zhí)行交由其他的線程來(lái)完成。比如,可以將槽位上的延遲任務(wù)隊(duì)列放入到額外的線程池中執(zhí)行,然后在槽位上新建一個(gè)空白的新的延遲任務(wù)隊(duì)列用于后續(xù)任務(wù)的添加。
關(guān)于擴(kuò)容
那假設(shè)現(xiàn)在要加入一個(gè)50秒后執(zhí)行的任務(wù)怎么辦?這槽好像不夠啊?難道要加槽嘛?和HashMap一樣擴(kuò)容?
假設(shè)要求精度為 1 秒,要能支持延遲時(shí)間為 1 天的延遲任務(wù),時(shí)間輪的槽位數(shù)需要 60 × 60 × 24 = 86400 。這就需要消耗更多的內(nèi)存。顯然,單純?cè)黾硬畚粩?shù)并不是一個(gè)好的解決方案。
常見(jiàn)有兩種方式:
通過(guò)增加輪次。50 % 8 + 1 = 3,即應(yīng)該放在槽位是 3,下標(biāo)是 2 的位置。然后 (50 - 1) / 8 = 6,即輪數(shù)記為 6。也就是說(shuō)當(dāng)循環(huán) 6 輪之后掃到下標(biāo)的 2 的這個(gè)槽位會(huì)觸發(fā)這個(gè)任務(wù)。Netty 中的 HashedWheelTimer 使用的就是這種方式。
通過(guò)多層次。這個(gè)和我們的手表就更像了,像我們秒針走一圈,分針走一格,分針走一圈,時(shí)針走一格。
多層次時(shí)間輪就是這樣實(shí)現(xiàn)的。假設(shè)上圖就是第一層,那么第一層走了一圈,第二層就走一格。
可以得知第二層的一格就是8秒,假設(shè)第二層也是 8 個(gè)槽,那么第二層走一圈,第三層走一格,可以得知第三層一格就是 64 秒。
那么一格三層,每層8個(gè)槽,一共 24 個(gè)槽時(shí)間輪就可以處理最多延遲 512 秒的任務(wù)。
而多層次時(shí)間輪還會(huì)有降級(jí)的操作,假設(shè)一個(gè)任務(wù)延遲 500 秒執(zhí)行,那么剛開(kāi)始加進(jìn)來(lái)肯定是放在第三層的,當(dāng)時(shí)間過(guò)了 436 秒后,此時(shí)還需要 64 秒就會(huì)觸發(fā)任務(wù)的執(zhí)行,而此時(shí)相對(duì)而言它就是個(gè)延遲 64 秒后的任務(wù),因此它會(huì)被降低放在第二層中,第一層還放不下它。
再過(guò)個(gè) 56 秒,相對(duì)而言它就是個(gè)延遲 8 秒后執(zhí)行的任務(wù),因此它會(huì)再被降級(jí)放在第一層中,等待執(zhí)行。
降級(jí)是為了保證時(shí)間精度一致性。Kafka內(nèi)部用的就是多層次的時(shí)間輪算法。
降級(jí)過(guò)程:
本文轉(zhuǎn)載自微信公眾號(hào)「小汪哥寫(xiě)代碼」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系小汪哥寫(xiě)代碼公眾號(hào)。