Java 并發(fā)特性之 CountDownLatch 詳解!
CountDownLatch 是 Java 中的一個用于管理并發(fā)控制的同步輔助類,允許一個或多個線程等待其他線程完成操作,它的工作機制類似于“倒計時閂鎖”,線程會阻塞等待,直到閂鎖的計數(shù)器減少到 0,然后才能繼續(xù)執(zhí)行。這篇文章,我們將深度剖析其工作原理。

一、什么是 CountDownLatch?
CountDownLatch是java.util.concurrent 包的一部分,用于同步一個或多個線程以等待特定條件的滿足。它在創(chuàng)建時初始化一個給定的計數(shù),表示必須發(fā)生的事件數(shù)量,才能使線程繼續(xù)執(zhí)行。這個計數(shù)通過調(diào)用 countDown() 方法來遞減,等待該條件的線程調(diào)用 await() 方法來阻塞,直到計數(shù)達到零。
CountDownLatch的關(guān)鍵組件包含:
- 計數(shù):CountDownLatch 的核心概念是計數(shù)。它從創(chuàng)建鎖存器時指定的初始值開始,只能遞減,不能重置。
- await() :線程使用此方法等待計數(shù)達到零。如果當前計數(shù)大于零,這些線程將被置于等待狀態(tài)。
- countDown() :調(diào)用此方法以遞減計數(shù)。當計數(shù)達到零時,所有等待的線程將被釋放。
- 線程安全:CountDownLatch 是線程安全的,它使用內(nèi)部的 AQS(AbstractQueuedSynchronizer)來管理狀態(tài),確保計數(shù)的可見性和原子性。
二、工作原理
CountDownLatch 本質(zhì)上是一種簡化的信號量(Semaphore)。它的核心思想是設(shè)定一個計數(shù)器,當計數(shù)器值為 0 時,其他被阻塞的線程才會開始運行,線程的釋放建立在調(diào)用 countDown 方法去減少計數(shù)器次數(shù)的基礎(chǔ)上。
CountDownLatch 的典型功能包括:
- 使多個線程等待一系列事件發(fā)生。
- 讓一個線程等待完成多個步協(xié)作操作的線程。
- 在某個條件達到之前阻塞線程。
它包含了兩個核心方法:
- countDown(): 當前線程執(zhí)行完任務(wù)后,調(diào)用該方法時,計數(shù)器 -1;當計數(shù)器為 1,調(diào)用該方法可以使計數(shù)器變?yōu)?0。
- await(): 當前線程調(diào)用后,會阻塞,進入等待狀態(tài),直到計數(shù)器為 0。
通過這兩種操作,我們就可以構(gòu)建出各種靈活的并發(fā)控制邏輯。
1.簡單實現(xiàn)
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個計數(shù)器,設(shè)置初始的計數(shù)值為 3
CountDownLatch latch = new CountDownLatch(3);
// 創(chuàng)建三個工作線程
new Thread(() -> {
try {
Thread.sleep(1000); // 模擬任務(wù)耗時
System.out.println("Thread 1 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 每個線程任務(wù)完成后,使計數(shù)器減 1
}
}).start();
new Thread(() -> {
try {
Thread.sleep(2000); // 模擬任務(wù)耗時
System.out.println("Thread 2 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 每個線程任務(wù)完成后,使計數(shù)器減 1
}
}).start();
new Thread(() -> {
try {
Thread.sleep(3000); // 模擬任務(wù)耗時
System.out.println("Thread 3 finished");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 每個線程任務(wù)完成后,使計數(shù)器減 1
}
}).start();
// 主線程會在此阻塞,直到計數(shù)器減為 0
latch.await();
System.out.println("All tasks finished. Main thread proceeding.");
}
}輸出結(jié)果如下:
Thread 1 finished
Thread 2 finished
Thread 3 finished
All tasks finished. Main thread proceeding.在上述代碼中,主線程調(diào)用 latch.await(); 進入阻塞狀態(tài),等待三個工作線程完成任務(wù)后,計數(shù)器將變?yōu)?0,然后解除阻塞并進入后續(xù)邏輯。
2.底層工作原理
從底層工作原理來看,CountDownLatch 內(nèi)部維護了一個 Sync 類,這實際上是一個基于 AQS(AbstractQueuedSynchronizer, 抽象隊列同步器)的同步工具。
- State 的初始值為計數(shù)器值,也就是通過構(gòu)造函數(shù)傳遞的參數(shù)(n)。
- await 方法的實現(xiàn)就是通過驗證 state 的值是否為 0,若不為 0,則會阻塞當前線程并加入AQS等待隊列中,否則繼續(xù)向下執(zhí)行。
- countDown 方法會將 state 的值減 1,當state==0時,會喚醒所有在 AQS 阻塞隊列中的線程。
內(nèi)部實現(xiàn)機制對線程的阻塞、喚醒、隊列管理等是通過 AQS 實現(xiàn)的,AQS 的設(shè)計模式使得它能高效、安全地管理同步狀態(tài)。
3.核心代碼片段
static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}三、使用場景分析
CountDownLatch 的應(yīng)用場景比較廣泛,尤其是在處理并發(fā)問題時,這里列舉了幾個:
1.批量任務(wù)協(xié)調(diào)
有時候,不同子線程可能會同時執(zhí)行各自的任務(wù),然而主線程會等待所有子線程的執(zhí)行完畢后,才繼續(xù)執(zhí)行后續(xù)操作。
比如 Web 應(yīng)用中多個 API 的響應(yīng)聚合:假設(shè)有多個遠程服務(wù)需要調(diào)用,主線程希望在所有調(diào)用都返回結(jié)果后,再執(zhí)行后續(xù)處理,可以使用 CountDownLatch 來等待響應(yīng)的到來。示例代碼如下:
// 類似一些應(yīng)用需要同時從多個微服務(wù)中拉數(shù)據(jù),再一起處理
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try {
// 模擬拉取和處理數(shù)據(jù)
} catch (Exception e) {
// 異常處理
} finally {
latch.countDown(); // 每個任務(wù)結(jié)束后調(diào)用
}
});
}
latch.await(); // 等待所有子任務(wù)執(zhí)行結(jié)束
System.out.println("匯總所有數(shù)據(jù).");2.并行計算
假如有這樣一個情景:計算任務(wù)很耗時,但是可以分成多個部分并行處理,然后將結(jié)果進行合并。
實現(xiàn)方式:我們先將任務(wù)分解成 n 個子任務(wù),全部執(zhí)行完畢后,將子任務(wù)的結(jié)果進行匯總分析。示例代碼如下:
CountDownLatch latch = new CountDownLatch(n);
List<Integer> results = new CopyOnWriteArrayList<>();
for (int i = 0; i < n; i++) {
new Thread(() -> {
try {
int result = // 處理部分任務(wù)
results.add(result);
} finally {
latch.countDown(); // 完成后計數(shù)減 1
}
}).start();
}
latch.await(); // 等到結(jié)果全部處理完
int finalResult = results.stream().mapToInt(Integer::intValue).sum();3.服務(wù)啟動檢查
CountDownLatch 還可以為應(yīng)用服務(wù)做“健康檢查”。例如,系統(tǒng)在完全啟動之前,需要依賴多個外部服務(wù),那么我們可以通過異步方式檢測各個服務(wù)的健康狀態(tài),只有當所有服務(wù)都正常啟動時,才允許繼續(xù)執(zhí)行下一步。
簡單的示例代碼如下:
public class ServiceStartChecker {
private final CountDownLatch latch;
public ServiceStartChecker(int serviceCount) {
latch = new CountDownLatch(serviceCount);
}
public void checkServices() throws InterruptedException {
// 啟動多個異步線程去檢查服務(wù)是否就緒
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
// 模擬服務(wù)檢查
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " is ready");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 完成后調(diào)用,減少計數(shù)器
}
}).start();
}
latch.await(); // 等待所有服務(wù)就緒
System.out.println("All services are up. System is ready.");
}
public static void main(String[] args) throws InterruptedException {
ServiceStartChecker checker = new ServiceStartChecker(3);
checker.checkServices();
}
}四、與其他并發(fā)工具對比
CountDownLatch 只是 Java 并發(fā)工具包中的一個工具,其功能與一些其他工具如 CyclicBarrier、Semaphore 等具有一定的共性和不同點。
1.CountDownLatch vs CyclicBarrier
- 用途:CountDownLatch適用于一組線程完成某一工作后進入“繼續(xù)工作”的狀態(tài),且無法進行reset重新使用。而 CyclicBarrier 更適合復用,在一組線程都到達某一屏障時統(tǒng)一放行,之后可以通過 reset 重復使用。
- 結(jié)束條件:CyclicBarrier 需要每個等待的線程都到達某個同步點才能繼續(xù);而 CountDownLatch 則更靈活,它并不關(guān)心是哪個線程調(diào)用了 countDown,只關(guān)注 countDown 是否次數(shù)到了。
2.CountDownLatch vs Semaphore
任務(wù)控制力度的差異:Semaphore 更傾向于對信號量的數(shù)量進行限流。簡單來說,Semaphore 可以限制某個操作的并發(fā)次數(shù),比如最多只允許 5 個線程同時執(zhí)行某個任務(wù)。而 CountDownLatch 只是簡單的減少計數(shù),不去限流,只是關(guān)注完成情況。
五、實際項目中的使用
在多線程爬蟲、分布式系統(tǒng)、并行數(shù)據(jù)處理等具體項目中,CountDownLatch 都能找到合適的應(yīng)用場景。
1.分布式系統(tǒng)的啟動控制
假設(shè)我們在一個分布式服務(wù)系統(tǒng)中,每個微服務(wù)間可能有復雜的依賴關(guān)系,借助 CountDownLatch,我可以構(gòu)建出一個依賴的啟動順序。
2.性能測試
在進行性能測試時,可能需要多個線程同時工作,例如使用 CountDownLatch 控制開始時間,以模擬高并發(fā)訪問場景。
CountDownLatch ready = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
new Thread(() -> {
try {
ready.await(); // 等待所有線程就緒
// 執(zhí)行模擬請求
} finally {
done.countDown();
}
}).start();
}
// 開始測試
ready.countDown();
done.await(); // 等到所有線程結(jié)束通過這樣的實踐,我們可以輕松模擬高并發(fā)性能測試和壓力測試場景。
總結(jié)
本文我們深度剖析了CountDownLatch,CountDownLatch雖然是一個簡單的并發(fā)工具,對其整體總結(jié)如下:
- 核心工作原理:CountDownLatch基于AQS機制,用于管理一個線程集合的執(zhí)行流程控制。
- 適用場景: 主要用于在處理并行操作時控制線程的執(zhí)行順序。
- 與其他工具的對比:與CyclicBarrier和信號量Semaphore分別有不同的貢獻場景。
- 多應(yīng)用場景使用:包括但不限于服務(wù)啟動依賴、并行計算結(jié)果收集、并發(fā)控制等。

























