三分鐘帶你搞懂 AQS 原理設(shè)計(jì)
一、摘要
在之前的文章中,我們介紹了 ReentrantLock、ReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor 等并發(fā)工具類的使用方式,它們?cè)谡?qǐng)求共享資源的時(shí)候,都能實(shí)現(xiàn)線程同步的效果。
在使用方式上稍有不同,有的是獨(dú)占式,多個(gè)線程競(jìng)爭(zhēng)時(shí)只有一個(gè)線程能執(zhí)行方法,比如 ReentrantLock 等;有的是共享式,多個(gè)線程可以同時(shí)執(zhí)行方法,比如:ReadWriteLock、CountDownLatch、Semaphore 等,不同的實(shí)現(xiàn)爭(zhēng)用共享資源的方式也不同。
如果仔細(xì)閱讀源碼,會(huì)發(fā)現(xiàn)它們都是基于AbstractQueuedSynchronizer這個(gè)抽象類實(shí)現(xiàn)的,我們簡(jiǎn)稱 AQS。
AQS 是一個(gè)提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能的框架,是除了 Java 自帶的synchronized關(guān)鍵字之外的鎖實(shí)現(xiàn)機(jī)制。
可以這么說,AQS是JUC包下線程同步類的基石,也是很多面試官喜歡提問的話題,掌握AQS原理對(duì)我們深入理解線程同步技術(shù)有著非常重要的意義。
本文以ReentrantLock作為切入點(diǎn),來解讀AQS相關(guān)的知識(shí)點(diǎn),最后配上簡(jiǎn)單的應(yīng)用示例來幫助大家理解 AQS,如果有描述不對(duì)的地方,歡迎大家留言指出,不勝感激!
二、ReentrantLock
在之前的線程系列文章中,我們介紹了ReentrantLock的基本用法,它是一個(gè)可重入的互斥鎖,它具有與使用synchronized關(guān)鍵字一樣的效果,并且功能更加強(qiáng)大,編程更加靈活,支持公平鎖和非公平鎖兩種模式。
使用方式也非常簡(jiǎn)單,只需要在相應(yīng)的代碼上調(diào)用加鎖和釋放鎖方法即可,簡(jiǎn)單示例如下!
public class Counter {
// 默認(rèn)非公平鎖模式
private final Lock lock = new ReentrantLock();
public void add() {
// 加鎖
lock.lock();
try {
// 具體業(yè)務(wù)邏輯...
} finally {
// 釋放鎖
lock.unlock();
}
}
}如果閱讀lock()和unlock()方法,會(huì)發(fā)現(xiàn)它的底層都是由AQS來實(shí)現(xiàn)的。
下面,我們一起來看看這兩個(gè)方法的源碼實(shí)現(xiàn),本文源碼內(nèi)容摘取自 JDK 1.8 版本,可能不同的版本略有區(qū)別!
2.1、lock 方法源碼
public class ReentrantLock implements Lock, java.io.Serializable {
// 同步鎖實(shí)現(xiàn)類
private final Sync sync;
public ReentrantLock() {
// 默認(rèn)構(gòu)造方法為非公平鎖實(shí)現(xiàn)類
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
// true:公平鎖實(shí)現(xiàn)類,false:非公平鎖實(shí)現(xiàn)類
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
// 加鎖操作
sync.lock();
}
// 非公平鎖實(shí)現(xiàn)類
static final class NonfairSync extends Sync {
// 加鎖操作
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
// 公平鎖實(shí)現(xiàn)類
static final class FairSync extends Sync {
// 加鎖操作
final void lock() {
acquire(1);
}
}
// 公平鎖和非公平鎖,都繼承自 AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
// lock 抽象方法
abstract void lock();
}
}從源碼上可以清晰的看到,當(dāng)初始化ReentrantLock對(duì)象時(shí),需要指定鎖的模式。
默認(rèn)構(gòu)造方法是非公平鎖模式,采用的是NonfairSync內(nèi)部實(shí)現(xiàn)類;公平鎖模式下,則采用的是FairSync內(nèi)部實(shí)現(xiàn)類;這兩個(gè)內(nèi)部實(shí)現(xiàn)類都繼承了Sync抽象類;同時(shí),Sync也繼承了AbstractQueuedSynchronizer,也就是我們上文提到的AQS。
如果把lock()方法的請(qǐng)求鏈路進(jìn)行抽象,可以用如下圖進(jìn)行簡(jiǎn)要概括。
圖片
無論是非公平鎖模式還是公平鎖模式,可能最終都會(huì)調(diào)用AQS的acquire()方法,它表示通過獨(dú)占式的方式加鎖,我們繼續(xù)往下看這個(gè)方法的源碼,部分核心代碼如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 通過獨(dú)占式的方式加鎖
public final void acquire(int arg) {
// 嘗試加鎖,會(huì)回調(diào)具體的實(shí)現(xiàn)類
if (!tryAcquire(arg) &&
// 如果嘗試加鎖失敗,將當(dāng)前線程加入等待隊(duì)列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 由子類完成加鎖邏輯的實(shí)現(xiàn),支持重寫該方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
}從AQS的源碼上可以看出,acquire()方法并不進(jìn)行具體加鎖邏輯的實(shí)現(xiàn),而是通過具體的實(shí)現(xiàn)類重寫tryAcquire()方法來完成加鎖操作,如果加鎖失敗,會(huì)將當(dāng)前線程加入等待隊(duì)列。
如果是非公平鎖模式,會(huì)回調(diào)ReentrantLock類的NonfairSync.tryAcquire()方法;如果是公平鎖模式,會(huì)回調(diào)ReentrantLock類的FairSync.tryAcquire()方法,我們繼續(xù)回看ReentrantLock類的源碼。
非公平鎖NonfairSync靜態(tài)內(nèi)部實(shí)現(xiàn)類,相關(guān)的源碼如下!
// 非公平鎖實(shí)現(xiàn)類
static final class NonfairSync extends Sync {
// 加鎖操作
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 嘗試非公平方式加鎖,重寫父類 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 采用CAS方式修改線程同步狀態(tài),如果成功返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 支持當(dāng)前線程,重復(fù)獲得鎖,將state值加1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}公平鎖FairSync靜態(tài)內(nèi)部實(shí)現(xiàn)類,相關(guān)的源碼如下!
// 公平鎖實(shí)現(xiàn)類
static final class FairSync extends Sync {
// 加鎖操作
final void lock() {
acquire(1);
}
// 嘗試公平方式加鎖,重寫父類 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1)判斷等待隊(duì)列是否有線程處于等待狀態(tài),如果沒有,嘗試獲取鎖;如果有,就進(jìn)入等待隊(duì)列
// 2)采用CAS方式修改線程同步狀態(tài),如果成功返回true
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 支持當(dāng)前線程,重復(fù)獲得鎖,將state值加1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}從源碼上可以清晰的看到,無論是是公平鎖還是非公平鎖模式,都是采用compareAndSetState()方法(簡(jiǎn)稱CAS)進(jìn)行加鎖,如果成功就返回true;同時(shí)支持當(dāng)前線程重復(fù)獲得鎖,也就是之前提到的鎖可重入機(jī)制。
唯一的區(qū)別在于:公平鎖實(shí)現(xiàn)類多了一個(gè)hasQueuedPredecessors()方法判斷,它的用途是判斷等待隊(duì)列是否有線程處于等待狀態(tài),如果沒有,嘗試獲取鎖;如果有,就將當(dāng)前線程存入等待隊(duì)列,依此排隊(duì),從而保證線程通過公平方式獲取鎖的目的。
關(guān)于 CAS 實(shí)現(xiàn)原理,在之前的并發(fā)原子類文章中已經(jīng)有所介紹,通過它加上volatile修飾符可以實(shí)現(xiàn)一個(gè)無鎖的線程安全訪問操作,本文不再重復(fù)解讀,有興趣的朋友可以翻閱之前的文章。
2.2、unlock 方法源碼
public class ReentrantLock implements Lock, java.io.Serializable {
// 同步鎖實(shí)現(xiàn)類
private final Sync sync;
public void unlock() {
// 釋放鎖操作
sync.release(1);
}
}unlock()方法的釋放鎖實(shí)現(xiàn)相對(duì)來說就簡(jiǎn)單多了,整個(gè)請(qǐng)求鏈路可以用如下圖進(jìn)行簡(jiǎn)要概括。
當(dāng)調(diào)用unlock()方法時(shí),會(huì)直接跳轉(zhuǎn)到AQS的release()方法上,AQS相關(guān)的源碼如下!
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 釋放鎖操作
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 從隊(duì)列頭部中獲取一個(gè)等待線程,并進(jìn)行喚醒操作
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 由子類完成釋放鎖邏輯的實(shí)現(xiàn),支持重寫該方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}與加鎖操作類似,AQS的release()方法并不進(jìn)行具體釋放鎖邏輯的實(shí)現(xiàn),而是通過具體的實(shí)現(xiàn)類重寫tryRelease()方法來完成釋放鎖操作,如果釋放鎖成功,會(huì)從隊(duì)列頭部中獲取一個(gè)等待線程,并進(jìn)行喚醒操作。
我們繼續(xù)回看ReentrantLock類的Sync.tryRelease()釋放鎖方法,部分核心源碼如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 嘗試釋放鎖
protected final boolean tryRelease(int releases) {
// 將state值進(jìn)行減1操作
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}相比加鎖過程,釋放鎖要簡(jiǎn)單的多,主要是將線程的同步狀態(tài)值進(jìn)行自減操作。
三、AQS 原理淺析
如果仔細(xì)的研究 AQS 的源碼,盡管實(shí)現(xiàn)上很復(fù)雜,但是也有規(guī)律可循。
從上到下,整個(gè)框架可以分為五層,架構(gòu)可以用如下圖來描述!(圖片來自ReentrantLock 的實(shí)現(xiàn)看 AQS 的原理及應(yīng)用 - 美團(tuán)技術(shù)團(tuán)隊(duì))
圖片
當(dāng)有自定義線程同步器接入AQS時(shí),只需要按需重寫第一層的方法即可,不需要關(guān)心底層的實(shí)現(xiàn)。
以加鎖為例,當(dāng)調(diào)用AQS的 API 層獲取鎖方法時(shí),會(huì)先嘗試進(jìn)行加鎖操作(具體邏輯由實(shí)現(xiàn)類完成),如果加鎖失敗,會(huì)進(jìn)入等待隊(duì)列處理環(huán)節(jié),這些處理邏輯同時(shí)也依賴最底層的基礎(chǔ)數(shù)據(jù)提供層來完成。
3.1、原理概述
整個(gè)AQS實(shí)現(xiàn)線程同步的核心思想,可以用如下這段話來描述!
AQS 內(nèi)部維護(hù)一個(gè)共享資源變量和線程等待隊(duì)列,如果被請(qǐng)求的共享資源空閑,那么就將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配。這個(gè)機(jī)制主要用的是 CLH 隊(duì)列的變體實(shí)現(xiàn)的,將暫時(shí)獲取不到鎖的線程加入到等待隊(duì)列中,待條件允許的時(shí)候?qū)⒕€程從隊(duì)列中取出并進(jìn)行喚醒。
CLH 隊(duì)列是一個(gè)單向鏈表隊(duì)列,對(duì)應(yīng)的還有 CLH 鎖實(shí)現(xiàn),它是一個(gè)基于邏輯隊(duì)列非線程饑餓的一種自旋公平鎖實(shí)現(xiàn),由 Craig、Landin 和 Hagersten 三位大佬發(fā)明,因此命名為 CLH 鎖。關(guān)于這方面的技術(shù)知識(shí)講解可以參閱這篇文章:多圖詳解 CLH 鎖的原理與實(shí)現(xiàn)。
而AQS中的隊(duì)列采用的是 CLH 變體的虛擬雙向隊(duì)列,通過將每一條請(qǐng)求共享資源的線程封裝成一個(gè) CLH 隊(duì)列的一個(gè)節(jié)點(diǎn)來實(shí)現(xiàn)鎖的分配。
具體實(shí)現(xiàn)原理,可以用如下圖來簡(jiǎn)單概括:
圖片
同時(shí),AQS中維護(hù)了一個(gè)共享資源變量state,通過它來實(shí)現(xiàn)線程的同步狀態(tài)控制,這個(gè)字段使用了volatile關(guān)鍵字修飾符來保證多線程下的可見性。
當(dāng)多個(gè)線程嘗試獲取鎖時(shí),會(huì)通過CAS方式來修改state值,當(dāng)state=1時(shí)表示當(dāng)前對(duì)象鎖已經(jīng)被占有(相對(duì)獨(dú)占模式來說),此時(shí)其他線程來加鎖時(shí)會(huì)失敗,加鎖失敗的線程會(huì)被放入上文說到的FIFO等待隊(duì)列中,并且線程會(huì)被掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。
總結(jié)下來,用大白話說就是,AQS是基于 CLH 隊(duì)列,使用volatile修飾共享變量state,線程通過CAS方式去改變state狀態(tài)值,如果成功則獲取鎖成功,失敗則進(jìn)入等待隊(duì)列,等待被喚醒的線程同步器框架。
打開 ReentrantLock、ReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore 等類的源碼實(shí)現(xiàn),你會(huì)發(fā)現(xiàn)它們的線程同步狀態(tài)都是基于AQS實(shí)現(xiàn)的,可以看成是AQS的衍生物。
下面我們一起來看看相關(guān)的源碼實(shí)現(xiàn)!
3.2、源碼淺析
3.2.1、線程同步狀態(tài)控制
AQS源碼中維護(hù)的共享資源變量state,表示同步狀態(tài)的意思,它是實(shí)現(xiàn)線程同步控制的關(guān)鍵字段,核心源碼如下:
/**
* The synchronization state.
*/
private volatile int state;針對(duì)state字段值的獲取和修改,AQS提供了三個(gè)方法,并且都采用Final修飾,意味著子類無法重寫它們,相關(guān)方法如下:
方法 | 描述 |
protected final int getState() | 獲取 |
protected final void setState(int newState) | 設(shè)置 |
protected final boolean compareAndSetState(int expect, int update) | 使用 CAS 方式更新 |
如果仔細(xì)分析源碼,state字段還有一個(gè)很大的用處,通過它可以實(shí)現(xiàn)多線程的獨(dú)占模式和共享模式。
以ReentrantLock和Semaphore類為例,它們的加鎖過程中state值的變化情況如下。
3.2.1.1、ReentrantLock 獨(dú)占模式的獲取鎖,簡(jiǎn)易流程圖如下:
圖片
ReentrantLock類部分核心源碼,實(shí)現(xiàn)邏輯如下:
public class ReentrantLock implements Lock, java.io.Serializable {
// 非公平鎖實(shí)現(xiàn)類
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加鎖操作
final void lock() {
// 將state從0設(shè)置為1,如果成功,直接獲取當(dāng)前共享資源
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 嘗試加鎖,會(huì)轉(zhuǎn)調(diào)tryAcquire方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 判斷state是否等于0
if (c == 0) {
// 嘗試state從0設(shè)置為1,如果成功,返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 支持當(dāng)前線程可重入,每調(diào)用一次,state的值加1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}3.2.1.2、Semaphore 共享模式的獲取鎖,簡(jiǎn)易流程圖如下:
圖片
Semaphore類部分核心源碼,實(shí)現(xiàn)邏輯如下:
public class Semaphore implements java.io.Serializable {
// 初始化的時(shí)候,設(shè)置線程最大并發(fā)數(shù),本質(zhì)設(shè)置的是state的值
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 非公平鎖內(nèi)部實(shí)現(xiàn)類
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
// 設(shè)置state的值
setState(permits);
}
// 通過共享方式,嘗試獲取鎖
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 嘗試獲取共享資源,會(huì)調(diào)用Sync.nonfairTryAcquireShared方法
public boolean tryAcquire() {
// 如果state的值小于0,表示無可用共享資源
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 抽象同步類
abstract static class Sync extends AbstractQueuedSynchronizer {
// 通過共享方式,嘗試獲取鎖
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 通過cas方式,設(shè)置state自減
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}3.2.2、公平鎖和非公平鎖實(shí)現(xiàn)
在上文的ReentrantLock源碼分析過程中,對(duì)于公平鎖和非公平鎖實(shí)現(xiàn),其實(shí)已經(jīng)有所解讀。
在AQS中所有的加鎖邏輯是有具有的實(shí)現(xiàn)類來完成,以ReentrantLock類為例,它的加鎖邏輯由兩個(gè)實(shí)現(xiàn)類來完成,分別是非公平鎖靜態(tài)內(nèi)部實(shí)現(xiàn)類NonfairSync和公平鎖靜態(tài)內(nèi)部實(shí)現(xiàn)類FairSync。
如上文的源碼介紹,這兩個(gè)類的的加鎖邏輯基本一致,唯一的區(qū)別在于:公平鎖實(shí)現(xiàn)類加鎖時(shí),增加了一個(gè)hasQueuedPredecessors()方法判斷,這個(gè)方法會(huì)判斷等待隊(duì)列是否有線程處于等待狀態(tài),如果沒有,嘗試獲取鎖;如果有,就進(jìn)入等待隊(duì)列。
簡(jiǎn)單的說就是,非公平鎖實(shí)現(xiàn)類的加鎖方式,如果有線程嘗試獲取鎖,直接嘗試通過CAS方式進(jìn)行搶鎖,如果搶成功了,就直接獲取鎖,沒有搶成功就進(jìn)入等待隊(duì)列;而公平鎖實(shí)現(xiàn)類的加鎖方式,會(huì)判斷等待隊(duì)列是否有線程處于等待狀態(tài),如果有則不去搶鎖,乖乖排到后面,如果沒有則嘗試搶鎖。
相對(duì)來說,非公平鎖會(huì)有更好的性能,因?yàn)樗耐掏铝勘容^大。其次,非公平鎖讓獲取鎖的時(shí)間變得更加不確定,可能會(huì)導(dǎo)致在阻塞隊(duì)列中的線程長(zhǎng)期處于饑餓狀態(tài)。
Semaphore類的公平鎖和非公平鎖實(shí)現(xiàn)也類似,擁有兩個(gè)靜態(tài)內(nèi)部實(shí)現(xiàn)類,源碼就不再解讀了,有興趣的朋友可以自行閱讀。
3.2.3、主要模板方法
從ReentrantLock的源碼實(shí)現(xiàn)中可以看出,AQS使用了模板方法設(shè)計(jì)模式,它不提供加鎖和釋放鎖的具體邏輯實(shí)現(xiàn),而是由實(shí)現(xiàn)類重寫對(duì)應(yīng)的方法來完成,這樣的好處就是實(shí)現(xiàn)更加的靈活,不同的線程同步器可以自行繼承AQS類,然后實(shí)現(xiàn)獨(dú)屬于自身的加鎖和解鎖功能。
常用的模板方法主要有以下幾個(gè):
方法 | 描述 |
protected boolean isHeldExclusively() | 判斷該線程是否正在獨(dú)占資源。只有用到 |
protected boolean tryAcquire(int arg) | 獨(dú)占方式。嘗試獲取資源, |
protected boolean tryRelease(int arg) | 獨(dú)占方式。嘗試釋放資源, |
protected int tryAcquireShared(int arg) | 共享方式。嘗試獲取資源, |
protected boolean tryReleaseShared(int arg) | 共享方式。嘗試釋放資源, |
通常自定義線程同步器,要么是獨(dú)占模式,要么是共享模式。
如果是獨(dú)占模式,重寫tryAcquire()和tryRelease()方法即可,比如 ReentrantLock 類。
如果是共享模式,重寫tryAcquireShared()和tryReleaseShared()方法即可,比如 Semaphore 類。
3.2.4、線程加入等待隊(duì)列實(shí)現(xiàn)
當(dāng)線程調(diào)用tryAcquire()方法獲取鎖失敗之后,就會(huì)調(diào)用addWaiter()方法,將當(dāng)前線程加入到等待隊(duì)列中去。
addWaiter()方法,部分核心源碼如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 將當(dāng)前線程加入等待隊(duì)列
private Node addWaiter(Node mode) {
// 以當(dāng)前線程構(gòu)造一個(gè)節(jié)點(diǎn),嘗試通過CAS方式插入到雙向鏈表的隊(duì)尾
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果插入沒有成功,則通過enq入隊(duì)
enq(node);
return node;
}
// 通過enq入隊(duì)
private Node enq(final Node node) {
// CAS 自旋方式,直到成功加入隊(duì)尾
for (;;) {
Node t = tail;
if (t == null) {
// 隊(duì)列為空,創(chuàng)建一個(gè)空結(jié)點(diǎn)作為head結(jié)點(diǎn),并將tail也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}我們?cè)賮砜纯碞ode類節(jié)點(diǎn)相關(guān)的屬性,部分核心源碼如下:
static final class Node {
// 當(dāng)前節(jié)點(diǎn)在隊(duì)列中的狀態(tài),狀態(tài)值枚舉含義如下:
// 0:節(jié)點(diǎn)初始化時(shí)的狀態(tài)
// 1: 表示節(jié)點(diǎn)引用線程由于等待超時(shí)或被打斷時(shí)的狀態(tài)
// -1: 表示當(dāng)隊(duì)列中加入后繼節(jié)點(diǎn)被掛起時(shí),其前驅(qū)節(jié)點(diǎn)會(huì)被設(shè)置為SIGNAL狀態(tài),表示該節(jié)點(diǎn)需要被喚醒
// -2:當(dāng)節(jié)點(diǎn)線程進(jìn)入condition隊(duì)列時(shí)的狀態(tài)
// -3:僅在釋放共享鎖releaseShared時(shí)對(duì)頭節(jié)點(diǎn)使用
volatile int waitStatus;
// 前驅(qū)節(jié)點(diǎn)
volatile Node prev;
// 后繼節(jié)點(diǎn)
volatile Node next;
//該節(jié)點(diǎn)的線程實(shí)例
volatile Thread thread;
// 指向下一個(gè)處于Condition狀態(tài)的節(jié)點(diǎn)(用于條件隊(duì)列)
Node nextWaiter;
//...
}可以很清晰的看到,每個(gè)關(guān)鍵屬性變量都加了volatile修飾符,確保多線程環(huán)境下可見。
正如上文所介紹的,Node其實(shí)是一個(gè)雙向鏈表數(shù)據(jù)結(jié)構(gòu),大致的數(shù)據(jù)結(jié)構(gòu)圖如下?。▓D片來自ReentrantLock 的實(shí)現(xiàn)看 AQS 的原理及應(yīng)用 - 美團(tuán)技術(shù)團(tuán)隊(duì))
圖片
其中第一個(gè)節(jié)點(diǎn),也叫頭節(jié)點(diǎn),為虛節(jié)點(diǎn),并不存儲(chǔ)任何線程信息,只是占位用;真正有數(shù)據(jù)的是從第二個(gè)節(jié)點(diǎn)開始,當(dāng)有線程需要加入等待隊(duì)列時(shí),會(huì)向隊(duì)尾進(jìn)行插入。
線程加入等待隊(duì)列之后,會(huì)再次調(diào)用acquireQueued()方法,嘗試進(jìn)行獲取鎖,如果成功或者中斷就退出,部分核心源碼如下:
final boolean acquireQueued(final Node node, int arg) {
// 標(biāo)記是否成功拿到鎖
boolean failed = true;
try {
// 標(biāo)記等待過程中是否中斷過
boolean interrupted = false;
// 開始自旋,要么獲取鎖,要么中斷
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果p是頭結(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)在等待隊(duì)列的頭部,嘗試獲取鎖(頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,頭指針移動(dòng)到當(dāng)前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果p不是頭節(jié)點(diǎn)或者是頭節(jié)點(diǎn)但獲取鎖失敗,判斷當(dāng)前節(jié)點(diǎn)是否要進(jìn)入阻塞,如果滿足要求,就通過park讓線程進(jìn)入阻塞狀態(tài),等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果沒有成功獲取鎖(比如超時(shí)或者被中斷),那么取消節(jié)點(diǎn)在隊(duì)列中的等待
cancelAcquire(node);
}
}線程加入等待隊(duì)列實(shí)現(xiàn),總結(jié)下來,大致步驟如下:
- 1.調(diào)用addWaiter()方法,將當(dāng)前線程封裝成一個(gè)節(jié)點(diǎn),嘗試通過CAS方式插入到雙向鏈表的隊(duì)尾,如果沒有成功,再通過自旋方式插入,直到成功為止
- 2.調(diào)用acquireQueued()方法,對(duì)在等待隊(duì)列中排隊(duì)的線程,嘗試獲取鎖操作,如果失敗,判斷當(dāng)前節(jié)點(diǎn)是否要進(jìn)入阻塞,如果滿足要求,就通過 LockSupport.park()方法讓線程進(jìn)入阻塞狀態(tài),并檢查是否被中斷,如果沒有,等待被喚醒
3.2.5、線程從等待隊(duì)列中被喚醒實(shí)現(xiàn)
當(dāng)線程調(diào)用tryRelease()方法釋放鎖成功之后,會(huì)從等待隊(duì)列的頭部開始,獲取排隊(duì)的線程,并進(jìn)行喚醒操作。
釋放鎖方法,部分核心源碼如下:
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
// 獲取頭部節(jié)點(diǎn)
Node h = head;
if (h != null && h.waitStatus != 0)
// 嘗試喚醒頭部節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)中的線程
unparkSuccessor(h);
return true;
}
return false;
}其中unparkSuccessor()是執(zhí)行喚醒線程的核心方法,部分核心源碼如下:
private void unparkSuccessor(Node node) {
// 獲取頭結(jié)點(diǎn) waitStatus
int ws = node.waitStatus;
// 置零當(dāng)前線程所在的結(jié)點(diǎn)狀態(tài),允許失敗
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)s
Node s = node.next;
// 如果下個(gè)節(jié)點(diǎn)是null或者被取消,就從隊(duì)列尾部依此尋找節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾部節(jié)點(diǎn)開始向前找,找到隊(duì)列中排在最前的有效節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 將當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)中的線程,進(jìn)行喚醒操作
if (s != null)
LockSupport.unpark(s.thread);
}線程從等待隊(duì)列中被喚醒實(shí)現(xiàn),總結(jié)下來,大致步驟如下:
- 1.當(dāng)線程調(diào)用tryRelease()方法釋放鎖成功之后,會(huì)從等待隊(duì)列獲取排隊(duì)的線程
- 2.如果隊(duì)列的頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)有效,會(huì)嘗試進(jìn)行喚醒節(jié)點(diǎn)中的線程;如果為空或者被取消,就從隊(duì)列尾部依此尋找節(jié)點(diǎn),找到隊(duì)列中排在最前的有效節(jié)點(diǎn),并嘗試進(jìn)行喚醒操作
四、簡(jiǎn)單應(yīng)用
了解完AQS基本原理之后,按照以上的知識(shí)點(diǎn),我們可以自己實(shí)現(xiàn)一個(gè)不可重入的互斥鎖線程同步類。
示例代碼如下:
public class MutexLock {
// 自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判斷是否鎖定狀態(tài)
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 嘗試獲取資源,立即返回。成功則返回true,否則false。
@Override
protected boolean tryAcquire(int acquires) {
//state為0才設(shè)置為1,不支持重入!
if (compareAndSetState(0, 1)) {
//設(shè)置為當(dāng)前線程獨(dú)占資源
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 嘗試釋放資源,立即返回。成功則為true,否則false。
@Override
protected boolean tryRelease(int releases) {
// 判斷資源是否已被釋放
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
//釋放資源,放棄占有狀態(tài)
setState(0);
return true;
}
}
// 真正同步類的實(shí)現(xiàn)都依賴?yán)^承于AQS的自定義同步器!
private final Sync sync = new Sync();
// 獲取鎖,會(huì)阻塞等待,直到成功才返回
public void lock() {
sync.acquire(1);
}
// 釋放鎖
public void unlock() {
sync.release(1);
}
}測(cè)試類如下:
public class MutexLockTest {
private static int count =0;
private static MutexLock lock = new MutexLock();
public static void main(String[] args) throws Exception {
final int threadNum = 10;
CountDownLatch latch = new CountDownLatch(threadNum);
// 創(chuàng)建10個(gè)線程,同時(shí)對(duì)count進(jìn)行1000相加操作
for (int i = 0; i < threadNum; i++) {
new Thread(new Runnable() {
@Override
public void run() {
// 加鎖
lock.lock();
for (int j = 0; j < 1000; j++) {
count++;
}
// 釋放鎖
lock.unlock();
// 線程數(shù)減 1
latch.countDown();
}
}).start();
}
// 等待線程執(zhí)行完畢
latch.await();
System.out.println("執(zhí)行結(jié)果:" + count);
}
}輸出結(jié)果:
執(zhí)行結(jié)果:10000從日志輸出結(jié)果可以清晰的看到,執(zhí)行結(jié)果與預(yù)期值一致!
五、小結(jié)
本文從ReentrantLock源碼分析到AQS原理解析,進(jìn)行了一次知識(shí)內(nèi)容的總結(jié),從上文的分析中可以看出,AQS是JUC包下線程同步器實(shí)現(xiàn)的基石。
ReentrantLock、ReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor 等并發(fā)工具類,線程同步的實(shí)現(xiàn)都基于AQS來完成,掌握AQS原理對(duì)線程同步的理解和使用至關(guān)重要。
AQS原理是面試時(shí)熱點(diǎn)話題,希望本篇能幫助到大家!
六、參考
1.https://www.cnblogs.com/waterystone/p/4920797.html
2.https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
3.https://zhuanlan.zhihu.com/p/197840259
4.https://juejin.cn/post/7006895386103119908

































