面試侃集合 | DelayQueue篇
面試官:好久不見啊,上次我們聊完了PriorityBlockingQueue,今天我們?cè)賮砹牧暮退嚓P(guān)的DelayQueue吧?
Hydra:就知道你前面肯定給我挖了坑,DelayQueue也是一個(gè)無界阻塞隊(duì)列,但是和之前我們聊的其他隊(duì)列不同,不是所有類型的元素都能夠放進(jìn)去,只有實(shí)現(xiàn)了Delayed接口的對(duì)象才能放進(jìn)隊(duì)列。Delayed對(duì)象具有一個(gè)過期時(shí)間,只有在到達(dá)這個(gè)到期時(shí)間后才能從隊(duì)列中取出。
面試官:有點(diǎn)意思,那么它有什么使用場(chǎng)景呢?
Hydra:不得不說,由于DelayQueue的精妙設(shè)計(jì),使用場(chǎng)景還是蠻多的。例如在電商系統(tǒng)中,如果有一筆訂單在下單30分鐘內(nèi)沒有完成支付,那么就需要自動(dòng)取消這筆訂單。還有,如果我們緩存了一些數(shù)據(jù),并希望這些緩存在一定時(shí)間后失效的話,也可以使用延遲隊(duì)列將它從緩存中刪除。
以電商系統(tǒng)為例,可以簡(jiǎn)單看一下這個(gè)流程:
面試官:看起來和任務(wù)調(diào)度有點(diǎn)類似啊,它們之間有什么區(qū)別嗎?
Hydra:任務(wù)調(diào)度更多的偏向于定時(shí)的特性,是在指定的時(shí)間點(diǎn)或時(shí)間間隔執(zhí)行特定的任務(wù),而延遲隊(duì)列更多偏向于在指定的延遲時(shí)間后執(zhí)行任務(wù)。相對(duì)任務(wù)調(diào)度來說,上面舉的例子中的延遲隊(duì)列場(chǎng)景都具有高頻率的特性,使用定時(shí)任務(wù)來實(shí)現(xiàn)它們的話會(huì)顯得有些過于笨重了。
面試官:好了,你也白話了半天了,能動(dòng)手就別吵吵,還是先給我寫個(gè)例子吧。
Hydra:好嘞,前面說過存入隊(duì)列的元素要實(shí)現(xiàn)Delayed接口,所以我們先定義這么一個(gè)類:
- public class Task implements Delayed {
- private String name;
- private long delay,expire;
- public Task(String name, long delay) {
- this.name = name;
- this.delay = delay;
- this.expire=System.currentTimeMillis()+delay;
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- }
- @Override
- public int compareTo(Delayed o) {
- return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
- }
- }
實(shí)現(xiàn)了Delayed接口的類必須要實(shí)現(xiàn)下面的兩個(gè)方法:
- getDelay方法用于計(jì)算對(duì)象的剩余延遲時(shí)間,判斷對(duì)象是否到期,計(jì)算方法一般使用過期時(shí)間減當(dāng)前時(shí)間。如果是0或負(fù)數(shù),表示延遲時(shí)間已經(jīng)用完,否則說明還沒有到期
- compareTo方法用于延遲隊(duì)列的內(nèi)部排序比較,這里使用當(dāng)前對(duì)象的延遲時(shí)間減去被比較對(duì)象的延遲時(shí)間
在完成隊(duì)列中元素的定義后,向隊(duì)列中加入5個(gè)不同延遲時(shí)間的對(duì)象,并等待從隊(duì)列中取出:
- public void delay() throws InterruptedException {
- DelayQueue<Task> queue=new DelayQueue<>();
- queue.offer(new Task("task1",5000));
- queue.offer(new Task("task2",1000));
- queue.offer(new Task("task3",6000));
- queue.offer(new Task("task4",100));
- queue.offer(new Task("task5",3000));
- while(true){
- Task task = queue.take();
- System.out.println(task);
- }
- }
運(yùn)行結(jié)果如下,可以看到按照延遲時(shí)間從短到長(zhǎng)的順序,元素被依次從隊(duì)列中取出。
- Task{name='task4', delay=100}
- Task{name='task2', delay=1000}
- Task{name='task5', delay=3000}
- Task{name='task1', delay=5000}
- Task{name='task3', delay=6000}
面試官:看起來應(yīng)用還是挺簡(jiǎn)單的,但今天也不能這么草草了事吧,還是說說原理吧。
Hydra:開始的時(shí)候你自己不都說了嗎,今天咱們聊的DelayQueue和前幾天聊過的PriorityBlockingQueue多少有點(diǎn)關(guān)系。DelayQueue的底層是PriorityQueue,而PriorityBlockingQueue和它的差別也沒有多少,只是在PriorityQueue的基礎(chǔ)上加上鎖和條件等待,入隊(duì)和出隊(duì)用的都是二叉堆的那一套邏輯。底層使用的有這些:
- private final transient ReentrantLock lock = new ReentrantLock();
- private final PriorityQueue<E> q = new PriorityQueue<E>();
- private Thread leader = null;
- private final Condition available = lock.newCondition();
面試官:你這樣也有點(diǎn)太糊弄我了吧,這就把我敷衍過去了?
Hydra:還沒完呢,還是先看入隊(duì)的offer方法,它的源碼如下:
- public boolean offer(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- q.offer(e);
- if (q.peek() == e) {
- leader = null;
- available.signal();
- }
- return true;
- } finally {
- lock.unlock();
- }
- }
DelayQueue每次向優(yōu)先級(jí)隊(duì)列PriorityQueue中添加元素時(shí),會(huì)以元素的剩余延遲時(shí)間delay作為排序的因素,來實(shí)現(xiàn)使最先過期的元素排在隊(duì)首,以此達(dá)到在之后從隊(duì)列中取出的元素都是先取出最先到達(dá)過期的元素。
二叉堆的構(gòu)造過程我們上次講過了,就不再重復(fù)了。向隊(duì)列中添加完5個(gè)元素后,二叉堆和隊(duì)列中的結(jié)構(gòu)是這樣的:
當(dāng)每個(gè)元素在按照二叉堆的順序插入隊(duì)列后,會(huì)查看堆頂元素是否剛插入的元素,如果是的話那么設(shè)置leader線程為空,并喚醒在available上阻塞的線程。
這里先簡(jiǎn)單的介紹一下leader線程的作用,leader是等待獲取元素的線程,它的作用主要是用于減少不必要的等待,具體的使用在后面介紹take方法的時(shí)候我們細(xì)說。
面試官:也別一會(huì)了,趁熱打鐵直接講隊(duì)列的出隊(duì)方法吧。
Hydra:這還真沒法著急,在看阻塞方法take前還得先看看非阻塞的poll方法是如何實(shí)現(xiàn)的:
- public E poll() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- E first = q.peek();
- if (first == null || first.getDelay(NANOSECONDS) > 0)
- return null;
- else
- return q.poll();
- } finally {
- lock.unlock();
- }
- }
代碼非常短,理解起來非常簡(jiǎn)單,在加鎖后首先檢查堆頂元素,如果堆頂元素為空或沒有到期,那么直接返回空,否則返回堆頂元素,然后解鎖。
面試官:好了,鋪墊完了吧,該講阻塞方法的過程了吧?
Hydra:阻塞的take方法理解起來會(huì)比上面稍微困難一點(diǎn),我們還是直接看它的源碼:
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- E first = q.peek();
- if (first == null)
- available.await();
- else {
- long delay = first.getDelay(NANOSECONDS);
- if (delay <= 0)
- return q.poll();
- first = null; // don't retain ref while waiting
- if (leader != null)
- available.await();
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && q.peek() != null)
- available.signal();
- lock.unlock();
- }
- }
阻塞過程中分支條件比較復(fù)雜,我們一個(gè)一個(gè)看:
- 首先獲取堆頂元素,如果為空,那么說明隊(duì)列中還沒有元素,讓當(dāng)前線程在available上進(jìn)行阻塞等待
- 如果堆頂元素不為空,那么查看它的過期時(shí)間,如果已到期,那么直接彈出堆頂元素
- 如果堆頂元素還沒有到期,那么查看leader線程是否為空,如果leader線程不為空的話,表示已經(jīng)有其他線程在等待獲取隊(duì)列的元素,直接阻塞當(dāng)前線程。
- 如果leader為空,那么把當(dāng)前線程賦值給它,并調(diào)用awaitNanos方法,在阻塞delay時(shí)間后自動(dòng)醒來。喚醒后,如果leader還是當(dāng)前線程那么把它置為空,重新進(jìn)入循環(huán),再次判斷堆頂元素是否到期。
當(dāng)有隊(duì)列中的元素完成出隊(duì)后,如果leader線程為空,并且堆中還有元素,就喚醒阻塞在available上的其他線程,并釋放持有的鎖。
面試官:我注意到一個(gè)問題,在上面的代碼中,為什么要設(shè)置first = null呢?
Hydra:假設(shè)有多個(gè)線程在執(zhí)行take方法,當(dāng)?shù)谝粋€(gè)線程進(jìn)入時(shí),堆頂元素還沒有到期,那么會(huì)將leader指向自己,然后阻塞自己一段時(shí)間。如果在這期間有其他線程到達(dá),會(huì)因?yàn)閘eader不為空阻塞自己。
當(dāng)?shù)谝粋€(gè)線程阻塞結(jié)束后,如果將堆頂元素彈出成功,那么first指向的元素應(yīng)該被gc回收掉。但是如果還被其他線程持有的話,它就不會(huì)被回收掉,所以將first置為空可以幫助完成垃圾回收。
面試官:我突然有一個(gè)發(fā)散性的疑問,定時(shí)任務(wù)線程池ScheduledThreadPoolExecutor,底層使用的也是DelayQueue嗎?
Hydra:問題很不錯(cuò),但很遺憾并不是,ScheduledThreadPoolExecutor在類中自己定義了一個(gè)DelayedWorkQueue內(nèi)部類,并沒有直接使用DelayQueue。不過如果你看一下源碼,就會(huì)看到它們實(shí)現(xiàn)的邏輯基本一致,同樣是基于二叉堆的上浮、下沉、擴(kuò)容,也同樣基于leader、鎖、條件等待等操作,只不過自己用數(shù)組又實(shí)現(xiàn)了一遍而已。說白了,看看兩個(gè)類的作者,都是Doug Lea大神,所以差異根本沒有多大。
面試官:好了,今天先到這吧,能最后再總結(jié)一下嗎?
Hydra:DelayQueue整體理解起來也沒有什么困難的點(diǎn),難的地方在前面聊優(yōu)先級(jí)隊(duì)列的時(shí)候基本已經(jīng)掃清了,新加的東西也就是一個(gè)對(duì)于leader線程的操作,使用了leader線程來減少不必要的線程等待時(shí)間。
面試官:今天的面試有點(diǎn)短啊,總是有點(diǎn)意猶未盡的感覺,看來下次得給你加點(diǎn)料了。
Hydra:…