面試官:CyclicBarrier有了解過嗎?
前言
Java提供了一些非常好用的并發(fā)工具類,不需要我們重復(fù)造輪子,本節(jié)我們講解CyclicBarrier,一起來看下吧~
CyclicBarrier
這個跟我們上節(jié)講的CountDownLatch有點類似,從字面意思講是相當(dāng)于一個可循環(huán)的屏障,他與CountDownLatch不同的是它可以重復(fù)利用,下一步的操作以,依賴上一步是否完成,就像去銀行辦業(yè)務(wù)一樣,排在你前面的人辦好了才輪到你,我們繼續(xù)通過上節(jié)的例子,來改寫一下它,這里我偷個懶,實際業(yè)務(wù)中盡量用類編寫,不要直接new Thread。
public class CyclicBarrierTest { public static void main(String[] args) throws BrokenBarrierException, InterruptedException { CyclicBarrier cyclicBarrier = new CyclicBarrier(1); IntStream.range(0, 10).forEach(i -> { new Thread(() -> { try { Thread.sleep(2000); System.out.println("worker 1------> " + i); cyclicBarrier.await(); Thread.sleep(2000); System.out.println("worker 2------> " + i); cyclicBarrier.await(); Thread.sleep(2000); System.out.println("worker 3------> " + i); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); }); System.out.println("completed !"); }}
實際輸出:
completed !worker 1------> 9worker 1------> 0worker 1------> 6worker 1------> 7worker 1------> 5worker 1------> 4worker 1------> 1worker 1------> 3worker 1------> 2worker 1------> 8worker 2------> 7worker 2------> 6worker 2------> 5worker 2------> 2worker 2------> 3worker 2------> 1worker 2------> 8worker 2------> 0worker 2------> 9worker 2------> 4worker 3------> 6worker 3------> 3worker 3------> 2worker 3------> 5worker 3------> 7worker 3------> 8worker 3------> 1worker 3------> 0worker 3------> 9worker 3------> 4
可以看到在即使在多線程下,每個操作都需要上一個await任務(wù)之后執(zhí)行,使用很簡單,也很好理解。
知其然知其所以然 & 源碼剖析
下面我們就一起探究一下,它是如何做到的?
同樣的,我們先看構(gòu)造函數(shù)。
public CyclicBarrier(int parties) { this(parties, null);}public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction;}
?默認(rèn)barrierAction是null, 這個參數(shù)是Runnable參數(shù),當(dāng)最后線程達(dá)到的時候執(zhí)行的任務(wù),剛剛的例子中沒有演示,大家可以在初始化的時候傳入一個,打印一下當(dāng)前的線程名稱,這樣理解起來比較容易點,parties int型,它的意思是參與的線程數(shù)。
我們再看它的定義, 可以看到它沒有繼承任何類或?qū)崿F(xiàn)任何接口?。
public class CyclicBarrier { .... }
await
我們重點看下這個方法。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen }}
這個方法干嘛用的呢?等到所有各方都在此屏障上調(diào)用了await 。如果當(dāng)前線程不是最后到達(dá)的,則出于線程調(diào)度目的將其禁用并處于休眠狀態(tài),除了以下情況:
- 最后一個線程到達(dá);或者。
- 其他一些線程中斷當(dāng)前線程;或者。
- 其他一些線程中斷了其他等待線程之一;或者。
- 其他一些線程在等待屏障時超時;或者。
- 其他一些線程在此屏障上調(diào)用reset 。
再看dowait(), 它是一個私有方法。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 全局鎖 final ReentrantLock lock = this.lock; lock.lock(); try { // 每次使用屏障都會生成一個實例 // private Generation generation = new Generation(); final Generation g = generation; // broken字面意思破壞,如果被破壞了就拋異常 if (g.broken) throw new BrokenBarrierException(); // 線程中斷檢測 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 剩余的等待線程數(shù) int index = --count; // 最后線程到達(dá)時 if (index == 0) { // tripped // 標(biāo)記任務(wù)是否被執(zhí)行(就是傳進(jìn)入的runable參數(shù)) boolean ranAction = false; try { final Runnable command = barrierCommand; // 執(zhí)行任務(wù) if (command != null) command.run(); ranAction = true; // 完成后 進(jìn)行下一組 初始化 generation 初始化 count 并喚醒所有等待的線程 // // private void nextGeneration() { // // signal completion of last generation // trip.signalAll(); // // set up next generation // count = parties; // generation = new Generation(); // } nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // index 不為0時 進(jìn)入自旋 for (;;) { try { // 先判斷超時 沒超時就繼續(xù)等著 if (!timed) trip.await(); // 如果超出指定時間 調(diào)用 awaitNanos 超時了釋放鎖 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 中斷異常捕獲 } catch (InterruptedException ie) { // 判斷是否被破壞 if (g == generation && ! g.broken) { // private void breakBarrier() { // generation.broken = true; // count = parties; // trip.signalAll(); // } breakBarrier(); throw ie; } else { // 否則的話中斷當(dāng)前線程 Thread.currentThread().interrupt(); } } // 被破壞拋異常 if (g.broken) throw new BrokenBarrierException(); // 正常調(diào)用 就返回 if (g != generation) return index; // 超時了而被喚醒的情況 調(diào)用 breakBarrier() if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
如果被破壞了怎么恢復(fù)呢?來看下reset, 源碼很簡單,break之后重新生成新的實例,對應(yīng)的會重新初始化count,在dowait里index==0也調(diào)用了nextGeneration,所以說它是可以循環(huán)利用的。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); }}
結(jié)束語
cyclicBarrier源碼相對簡單一些,下節(jié)給大家講下Phaser,它是增強版的CountDownLatch,它的實現(xiàn)相對更加復(fù)雜一點 。