Java高并發(fā)編程基礎(chǔ)之AQS
引言
曾經(jīng)有一道比較比較經(jīng)典的面試題“你能夠說說java的并發(fā)包下面有哪些常見的類?”大多數(shù)人應(yīng)該都可以說出 CountDownLatch、CyclicBarrier、Sempahore多線程并發(fā)三大利器。這三大利器都是通過AbstractQueuedSynchronizer抽象類(下面簡(jiǎn)寫AQS)來實(shí)現(xiàn)的,所以學(xué)習(xí)三大利器之前我們有必要先來學(xué)習(xí)下AQS。
AQS是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊(duì)列模型的簡(jiǎn)單框架”
AQS結(jié)構(gòu)
說到同步我們?nèi)绾蝸肀WC同步?大家第一印象肯定是加鎖了,說到鎖的話大家肯定首先會(huì)想到的是Synchronized。Synchronized大家應(yīng)該基本上都會(huì)使用,加鎖和釋放鎖都是jvm 來幫我們實(shí)現(xiàn)的,我們只需要簡(jiǎn)單的加個(gè) Synchronized關(guān)鍵字就可以了。用起來超級(jí)方便。但是有沒有一種情況我們?cè)O(shè)置一個(gè)鎖的超時(shí)時(shí)間Synchronized就有點(diǎn)實(shí)現(xiàn)不了,這時(shí)候我們就可以用ReentrantLock來實(shí)現(xiàn),ReentrantLock是通過aqs來實(shí)現(xiàn)的,今天我們就通過ReentrantLock來學(xué)習(xí)一下aqs。
CAS && 公平鎖和非公平鎖
AQS里面用到了大量的CAS學(xué)習(xí)AQS之前我們還是有必要簡(jiǎn)單的先了解下CAS、公平鎖和非公平鎖。
CAS
- CAS 全稱是 compare and swap,是一種用于在多線程環(huán)境下實(shí)現(xiàn)同步功能的機(jī)制。CAS 操作包含三個(gè)操作數(shù) :內(nèi)存位置、預(yù)期數(shù)值和新值。CAS 的實(shí)現(xiàn)邏輯是將內(nèi)存位置處的數(shù)值與預(yù)期數(shù)值相比較,若相等,則將內(nèi)存位置處的值替換為新值。若不相等,則不做任何操作,這個(gè)操作是個(gè)原子性操作,java里面的AtomicInteger等類都是通過cas來實(shí)現(xiàn)的。
公平鎖和非公平鎖
- 公平鎖:多個(gè)線程按照申請(qǐng)鎖的順序去獲得鎖,線程會(huì)直接進(jìn)入隊(duì)列去排隊(duì),隊(duì)列中第一個(gè)才能獲得到鎖。優(yōu)點(diǎn):等待鎖的線程不會(huì)餓死,每個(gè)線程都可以獲取到鎖。缺點(diǎn):整體吞吐效率相對(duì)非公平鎖要低,等待隊(duì)列中除第一個(gè)線程以外的所有線程都會(huì)阻塞,CPU喚醒阻塞線程的開銷比非公平鎖大。
- 非公平鎖:多個(gè)線程去獲取鎖的時(shí)候,會(huì)直接去嘗試獲取,獲取不到,再去進(jìn)入等待隊(duì)列,如果能獲取到,就直接獲取到鎖。優(yōu)點(diǎn):可以減少CPU喚醒線程的開銷,整體的吞吐效率會(huì)高點(diǎn),CPU也不必喚醒所有線程,會(huì)減少喚起線程的數(shù)量。缺點(diǎn):處于等待隊(duì)列中的線程可能會(huì)餓死,或者等很久才會(huì)獲得鎖。文字有點(diǎn)拗口,我們來個(gè)實(shí)際的例子說明下。比如我們?nèi)ナ程镁筒偷臅r(shí)候都要排隊(duì),大家都按照先來后到的順序排隊(duì)打飯,這就是公平鎖。如果等到你準(zhǔn)備拿盤子打飯的時(shí)候 直接蹦出了一個(gè)五大三粗的胖子插隊(duì)到你前面,你看打不贏他只能忍氣吞聲讓他插隊(duì),等胖子打完飯了又來個(gè)小個(gè)子也來插你隊(duì),這時(shí)候你沒法忍了,直接大吼一聲讓他滾,這個(gè) 小個(gè)子只能屁顛屁顛到隊(duì)尾去排隊(duì)了這就是非公平鎖。我們先來看看AQS有哪些屬性
- // 頭節(jié)點(diǎn)
- private transient volatile Node head;
- // 阻塞的尾節(jié)點(diǎn),每個(gè)新的節(jié)點(diǎn)進(jìn)來,都插入到最后,也就形成了一個(gè)鏈表
- private transient volatile Node tail;
- // 這個(gè)是最重要的,代表當(dāng)前鎖的狀態(tài),0代表沒有被占用,大于 0 代表有線程持有當(dāng)前鎖
- // 這個(gè)值可以大于 1,是因?yàn)殒i可以重入,每次重入都加上 1
- private volatile int state;
- // 代表當(dāng)前持有獨(dú)占鎖的線程,舉個(gè)最重要的使用例子,因?yàn)殒i可以重入
- // reentrantLock.lock()可以嵌套調(diào)用多次,所以每次用這個(gè)來判斷當(dāng)前線程是否已經(jīng)擁有了鎖
- // if (currentThread == getExclusiveOwnerThread()) {state++}
- private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
下面我們來寫一個(gè)demo分析下lock 加鎖和釋放鎖的過程
- final void lock() {
- // 上來先試試直接把狀態(tài)置位1,如果此時(shí)沒人獲取鎖就直接
- if (compareAndSetState(0, 1))
- // 爭(zhēng)搶成功則修改獲得鎖狀態(tài)的線程
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
cas嘗試失敗,說明已經(jīng)有人再持有鎖,所以進(jìn)入acquire方法
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire方法,看名字大概能猜出什么意思,就是試一試。tryAcquire實(shí)際上是調(diào)用了父類Sync的nonfairTryAcquire方法
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- // 獲取下當(dāng)前鎖的狀態(tài)
- int c = getState();
- // 這個(gè)if 邏輯跟前面一進(jìn)來就獲取鎖的邏輯一樣都是通過cas嘗試獲取下鎖
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- // 進(jìn)入這個(gè)判斷說明 鎖重入了 狀態(tài)需要進(jìn)行+1
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- // 如果鎖的重入次數(shù)大于int的最大值,直接就拋出異常了,正常情況應(yīng)該不存在這種情況,不過jdk還是嚴(yán)謹(jǐn)?shù)?nbsp;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- // 返回false 說明嘗試獲取鎖失敗了,失敗了就要進(jìn)行acquireQueued方法了
- return false;
- }
tryAcquire方法如果獲取鎖失敗了,那么肯定就要排隊(duì)等待獲取鎖。排隊(duì)的線程需要待在哪里等待獲取鎖?這個(gè)就跟我們線程池執(zhí)行任務(wù)一樣,線程池把任務(wù)都封裝成一個(gè)work,然后當(dāng)線程處理任務(wù)不過來的時(shí)候,就把任務(wù)放到隊(duì)列里面。AQS同樣也是類似的,把排隊(duì)等待獲取鎖的線程封裝成一個(gè)NODE。然后再把NODE放入到一個(gè)隊(duì)列里面。隊(duì)列如下所示,不過需要注意一點(diǎn)head是不存NODE的。
接下來我們繼續(xù)分析源碼,看下獲取鎖失敗是如何被加入隊(duì)列的。就要執(zhí)行acquireQueued方法,執(zhí)行acquireQueued方法之前需要先執(zhí)行addWaiter方法
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- // cas 加入隊(duì)列隊(duì)尾
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 尾結(jié)點(diǎn)不為空 || cas 加入尾結(jié)點(diǎn)失敗
- enq(node);
- return node;
- }
enq
接下來再看看enq方法
- // 通過自旋和CAS一定要當(dāng)前node加入隊(duì)尾
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- // 尾結(jié)點(diǎn)為空說明隊(duì)列還是空的,還沒有被初始化,所以初始化頭結(jié)點(diǎn),可以看到頭結(jié)點(diǎn)的node 是沒有綁定線程的也就是不存數(shù)據(jù)的
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
通過addWaiter方法已經(jīng)把獲取鎖的線程通過封裝成一個(gè)NODE加入對(duì)列。上述方法的一個(gè)執(zhí)行流程圖如下:
接下來就是繼續(xù)執(zhí)行acquireQueued方法
acquireQueued
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- // 通過自旋去獲取鎖 前驅(qū)節(jié)點(diǎn)==head的時(shí)候去嘗試獲取鎖,這個(gè)方法在前面已經(jīng)分析過了。
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 進(jìn)入這個(gè)if說明node的前驅(qū)節(jié)點(diǎn)不等于head 或者嘗試獲取鎖失敗了
- // 判斷是否需要掛起當(dāng)前線程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- // 異常情況進(jìn)入cancelAcquire,在jdk11的時(shí)候這個(gè)源碼直接是catch (Throwable e){ cancelAcquire(node);} 簡(jiǎn)單明了
- if (failed)
- cancelAcquire(node);
- }
- }
setHead
這個(gè)方法每當(dāng)有一個(gè)node獲取到鎖了,就把當(dāng)前node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),可以簡(jiǎn)單的看做當(dāng)前節(jié)點(diǎn)獲取到鎖了就把當(dāng)前節(jié)點(diǎn)”移除“(變?yōu)轭^結(jié)點(diǎn))隊(duì)列。
shouldParkAfterFailedAcquire
說到這個(gè)方法我們就要先看下NODE可能會(huì)有哪些狀態(tài)在源碼里面我們可以看到總共會(huì)有四種狀態(tài)
- CANCELLED:值為1,在同步隊(duì)列中等待的線程等待超時(shí)或被中斷,需要從同步隊(duì)列中取消該Node的結(jié)點(diǎn),其結(jié)點(diǎn)的waitStatus為CANCELLED,即結(jié)束狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。
- SIGNAL:值為-1,被標(biāo)識(shí)為該等待喚醒狀態(tài)的后繼結(jié)點(diǎn),當(dāng)其前繼結(jié)點(diǎn)的線程釋放了同步鎖或被取消,將會(huì)通知該后繼結(jié)點(diǎn)的線程執(zhí)行。說白了,就是處于喚醒狀態(tài),只要前繼結(jié)點(diǎn)釋放鎖,就會(huì)通知標(biāo)識(shí)為SIGNAL狀態(tài)的后繼結(jié)點(diǎn)的線程執(zhí)行。
- CONDITION:值為-2,與Condition相關(guān),該標(biāo)識(shí)的結(jié)點(diǎn)處于等待隊(duì)列中,結(jié)點(diǎn)的線程等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
- PROPAGATE:值為-3,與共享模式相關(guān),在共享模式中,該狀態(tài)標(biāo)識(shí)結(jié)點(diǎn)的線程處于可運(yùn)行狀態(tài)。
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 前驅(qū)節(jié)點(diǎn)狀態(tài) 如果這個(gè)狀態(tài)為-1 則返回true,把當(dāng)前線程掛起
- if (ws == Node.SIGNAL)
- return true;
- // 大于0,說明狀態(tài)為CANCELLED
- if (ws > 0) {
- do {
- // 刪除被取消的node(讓被取消的node成為一個(gè)沒有引用的node等著下次GC被回收)
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // 進(jìn)入這里只能是 0,-2,-3。NODE節(jié)點(diǎn)初始化的時(shí)候waitStatus默認(rèn)值是0,所以只有這里才有修改waitStatus的地方
- // 通過cas 把前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為-1,然后返回false ,外面調(diào)用這個(gè)方法的是個(gè)循環(huán),又會(huì)調(diào)用一次這個(gè)方法
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
parkAndCheckInterrupt
掛起當(dāng)前線程,并且阻塞
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this); // 掛起當(dāng)前線程,阻塞
- return Thread.interrupted();
- }
在這里插入圖片描述
解鎖
加鎖成功了,那鎖用完了就應(yīng)該釋放鎖了,釋放鎖重點(diǎn)看下unparkSuccessor這個(gè)方法就好了
- private void unparkSuccessor(Node node) {
- // 頭結(jié)點(diǎn)狀態(tài)
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- // s==null head的successor節(jié)點(diǎn)獲取鎖成功后,執(zhí)行了head.next=null的操作后,解鎖線程讀取了head.next,因此s==null
- // head的successor節(jié)點(diǎn)被取消(cancelAcquire)時(shí),執(zhí)行了如下操作:successor.waitStatus=1 ; successor.next = successor;
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 從尾節(jié)點(diǎn)開始往前找,找到最前面的非取消的節(jié)點(diǎn) 這里沒有break 哦
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- // 喚醒線程 ,喚醒的線程會(huì)從acquireQueued去獲取鎖
- LockSupport.unpark(s.thread);
- }
釋放鎖代碼比較簡(jiǎn)單,基本都寫在代碼注釋里面了,流程如下:
這段代碼里面有一個(gè)比較經(jīng)典的面試題:如果頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為空或者頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)的狀態(tài)為取消的時(shí)候?yàn)槭裁匆獜暮笸罢?,找到最前面非取消的?jié)點(diǎn)?
- node.prev = pred; compareAndSetTail(pred, node) 這兩個(gè)地方可以看作Tail入隊(duì)的原子操作,但是此時(shí)pred.next = node;還沒執(zhí)行,如果這個(gè)時(shí)候執(zhí)行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。
- 在產(chǎn)生CANCELLED狀態(tài)節(jié)點(diǎn)的時(shí)候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node
總結(jié)
reentrantLock的獲取鎖和釋放鎖基本就講完了,里面還涉及多比較多的細(xì)節(jié),感興趣的同學(xué)可以對(duì)著源碼一行一行去debug試試。
適當(dāng)?shù)牧私鈇qs才能更好的學(xué)習(xí)CountDownLatch、CyclicBarrier、Sempahore,因?yàn)檫@三個(gè)利器都是基于aqs來實(shí)現(xiàn)的。
本文轉(zhuǎn)載自微信公眾號(hào)「java金融」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系java金融公眾號(hào)。