阿粉寫了八千多字,就是為了把 ReentrantLock 講透
ReentrantLock 是可重入鎖
啥是可重入鎖呢?比如:線程 1 通過調(diào)用 lock() 方法獲取鎖之后,再調(diào)用 lock 時,就不會再進(jìn)行阻塞獲取鎖,而是直接增加重試次數(shù)。
還記得 synchronized 嗎?它有 monitorenter 和 monitorexit 兩種指令來保證鎖,而它們的作用可以理解為每個鎖對象擁有一個鎖計(jì)數(shù)器,也就是如果再次調(diào)用 lock() 方法,計(jì)數(shù)器會進(jìn)行加 1 操作
所以, synchronized 和 ReentrantLock 都是可重入鎖
ReentrantLock 與 synchronized 區(qū)別
既然 synchronized 和 ReentrantLock 都是可重入鎖,那 ReentrantLock 與 synchronized 有什么區(qū)別呢?
synchronized 是 Java 語言層面提供的語法,所以不需要考慮異常;ReentrantLock 是 Java 代碼實(shí)現(xiàn)的鎖,所以必須先要獲取鎖,然后再正確釋放鎖
synchronized 在獲取鎖時必須一直等待沒有額外的嘗試機(jī)制;ReentrantLock 可以嘗試獲取鎖(這一點(diǎn)等下分析源碼時會看到)
ReentrantLock 支持獲取鎖時的公平和非公平選擇
不 BB 了,直接上源碼
lock & NonfairSync & FairSync 詳解
- public void lock() {
- sync.lock();
- }
其中, sync 是 ReentrantLock 的靜態(tài)內(nèi)部類,它繼承 AQS 來實(shí)現(xiàn)重入鎖的邏輯, Sync 有兩個具體實(shí)現(xiàn)類: NonfairSync 和 FairSync
NonfairSync
先來看一下 NonfairSync :
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = 7316153563782823691L;
- /**
- * Performs lock. Try immediate barge, backing up to normal
- * acquire on failure.
- */
- // 重寫 Sync 的 lock 方法
- final void lock() {
- // 先不管其他,上來就先 CAS 操作,嘗試搶占一下鎖
- if (compareAndSetState(0, 1))
- // 如果搶占成功,就獲得了鎖
- setExclusiveOwnerThread(Thread.currentThread());
- else
- // 沒有搶占成功,調(diào)用 acquire() 方法,走里面的邏輯
- acquire(1);
- }
- // 重寫了 AQS 的 tryAcquire 方法
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- }
FairSync
接下來看一下 FairSync :
- static final class FairSync extends Sync {
- private static final long serialVersionUID = -3000897897090466540L;
- // 重寫 Sync 的 lock 方法
- final void lock() {
- acquire(1);
- }
- /**
- * Fair version of tryAcquire. Don't grant access unless
- * recursive call or no waiters or is first.
- */
- // 重寫了 Sync 的 tryAcquire 方法
- protected final boolean tryAcquire(int acquires) {
- // 獲取當(dāng)前執(zhí)行的線程
- final Thread current = Thread.currentThread();
- // 獲取 state 的值
- int c = getState();
- // 在無鎖狀態(tài)下
- if (c == 0) {
- // 沒有前驅(qū)節(jié)點(diǎn)且替換 state 的值成功時
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- // 保存當(dāng)前獲得鎖的線程,下次再來時,就不需要嘗試競爭鎖,直接重入即可
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- // 如果是同一個線程來獲得鎖,直接增加重入次數(shù)即可
- int nextc = c + acquires;
- // nextc 小于 0 ,拋異常
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- // 獲取鎖成功
- return true;
- }
- // 獲取鎖失敗
- return false;
- }
- }
總結(jié) NonfairSync 與 FairSync
到這里,應(yīng)該就比較清楚了, Sync 有兩個具體的實(shí)現(xiàn)類,分別是:
- NonfairSync :可以搶占鎖,調(diào)用 NonfairSync 時,不管當(dāng)前隊(duì)列上有沒有其他線程在等待,上來我就先 CAS 操作一番,成功了就獲得了鎖,沒有成功就走 acquire 的邏輯;在釋放鎖資源時,走的是 Sync.nonfairTryAcquire 方法
- FairSync :所有線程按照 FIFO 來獲取鎖,在 lock 方法中,沒有 CAS 嘗試,直接就是 acquire 的邏輯;在釋放資源時,走的是自己的 tryAcquire 邏輯
接下來咱們看看 NonfairSync 和 FairSync 是如何獲取鎖的
ReentrantLock 獲取鎖
NonfairSync.lock()
在 NonfairSync 中,獲取鎖的方法是:
- final void lock() {
- // 不管別的,上來就先 CAS 操作,嘗試搶占一下鎖
- if (compareAndSetState(0, 1))
- // 如果搶占成功,就獲得了鎖
- setExclusiveOwnerThread(Thread.currentThread());
- else
- // 沒有搶占成功,調(diào)用 acquire() 方法,走里面的邏輯
- acquire(1);
- }
if 里面沒啥說的,咱們來看看 acquire() 方法
AQS.acquire()
acquire 是 AQS 的核心方法:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
在這里,會先 tryAcquire 去嘗試獲取鎖,如果獲取成功,那就返回 true ,如果失敗就通過 addWaiter 方法,將當(dāng)前線程封裝成 Node 插入到等待隊(duì)列中
先來看 tryAcquire 方法:
NonfairSync.tryAcquire(arg)
在 AQS 中 tryAcquire 方法沒有具體實(shí)現(xiàn),只是拋出了異常:
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
NonfairSync 中的 tryAcquire() 方法,才是我們想要看的:
- final boolean nonfairTryAcquire(int acquires) {
- // 獲取當(dāng)前執(zhí)行的線程
- final Thread current = Thread.currentThread();
- // 獲取 state 的值
- int c = getState();
- // 當(dāng) state 為 0 是,說明此時為無鎖狀態(tài)
- if (c == 0) {
- // CAS 替換 state 的值,如果 CAS 成功,則獲取鎖成功
- if (compareAndSetState(0, acquires)) {
- // 保存當(dāng)前獲得鎖的線程,當(dāng)該線程再次獲得鎖時,直接重入即可
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- // 判斷是否是同一個線程來競爭鎖
- else if (current == getExclusiveOwnerThread()) {
- // 如果是,直接增加重入次數(shù)
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- // 獲取鎖成功
- return true;
- }
- // 獲取鎖失敗
- return false;
- }
有沒有一種似曾相識的趕腳?在 FairSync 那里,分析過 90% 的代碼(好像說分析過 99% 的代碼也不過分),只是 FairSync 多了一個判斷就是,是否有前驅(qū)節(jié)點(diǎn)
tryAcquire 分析完畢了,接下來看 addWaiter 方法
AQS.addWaiter
如果 tryAcquire() 方法獲取鎖成功,那就直接執(zhí)行線程的任務(wù)就可以了,執(zhí)行完畢釋放鎖
如果獲取鎖失敗,就會調(diào)用 addWaiter 方法,將當(dāng)前線程插入到等待隊(duì)列中,插入的邏輯大概是這樣的:
- 將當(dāng)前線程封裝成 Node 節(jié)點(diǎn)
- 當(dāng)前鏈表中 tail 節(jié)點(diǎn)(也就是下面的 pred )是否為空,如果不為空,則 CAS 操作將當(dāng)前線程的 node 添加到 AQS 隊(duì)列
- 如果為空,或者 CAS 操作失敗,則調(diào)用 enq 方法,再次自旋插入
咱們看具體的代碼實(shí)現(xiàn):
- private Node addWaiter(Node mode) {
- // 生成該線程所對應(yīng)的 Node 節(jié)點(diǎn)
- Node node = new Node(Thread.currentThread(), mode);
- // 將 Node 插入隊(duì)列中
- Node pred = tail;
- // 如果 pred 不為空
- if (pred != null) {
- node.prev = pred;
- // 使用 CAS 操作,如果成功就返回
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 如果 pred == null 或者 CAS 操作失敗,則調(diào)用 enq 方法再次自旋插入
- enq(node);
- return node;
- }
- // 自旋 CAS 插入等待隊(duì)列
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- // 必須初始化,使用 CAS 操作進(jìn)行初始化
- if (compareAndSetHead(new Node()))
- // 初始化狀態(tài)時,頭尾節(jié)點(diǎn)指向同一節(jié)點(diǎn)
- tail = head;
- } else {
- node.prev = t;
- // 如果剛開始就是初始化好的,直接 CAS 操作,將 Node 插入到隊(duì)尾即可
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
AQS.acquireQueued
通過 addWaiter 將當(dāng)前線程加入到隊(duì)列中之后,會走 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法
acquireQueued 方法實(shí)現(xiàn)的主要邏輯是:
- 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn) p
- 如果節(jié)點(diǎn) p 為 head 節(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)為第二個節(jié)點(diǎn),那么它就可以嘗試獲取鎖,調(diào)用 tryAcquire 方法嘗試進(jìn)行獲取
- 調(diào)用 tryAcquire 方法獲取鎖成功之后,就將 head 指向自己,原來的節(jié)點(diǎn) p 就需要從隊(duì)列中刪除
- 如果獲取鎖失敗,則調(diào)用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法來決定后面操作
最后,通過 cancelAcquire 方法取消獲得鎖 看具體的代碼實(shí)現(xiàn):
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 如果 Node 的前驅(qū)節(jié)點(diǎn) p 是 head,說明 Node 是第二個節(jié)點(diǎn),那么它就可以嘗試獲取鎖
- if (p == head && tryAcquire(arg)) {
- // 如果鎖獲取成功,則將 head 指向自己
- setHead(node);
- // 鎖獲取成功之后,將 next 指向 null ,即將節(jié)點(diǎn) p 從隊(duì)列中移除
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 節(jié)點(diǎn)進(jìn)入等待隊(duì)列后,調(diào)用 shouldParkAfterFailedAcquire 或者 parkAndCheckInterrupt 方法
- // 進(jìn)入阻塞狀態(tài),即只有頭結(jié)點(diǎn)的線程處于活躍狀態(tài)
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
shouldParkAfterFailedAcquire
線程獲取鎖失敗之后,會通過調(diào)用 shouldParkAfterFailedAcquire 方法,來決定這個線程要不要掛起
shouldParkAfterFailedAcquire 方法實(shí)現(xiàn)的主要邏輯:
- 首先判斷 pred 的狀態(tài)是否為 SIGNAL ,如果是,則直接掛起即可
- 如果 pred 的狀態(tài)大于 0 ,說明該節(jié)點(diǎn)被取消了,那么直接從隊(duì)列中移除即可
- 如果 pred 的狀態(tài)不是 SIGNAL 也不大于 0 ,進(jìn)行 CAS 操作修改節(jié)點(diǎn)狀態(tài)為 SIGNAL ,返回 false ,也就是不需要掛起
看一下代碼是如何實(shí)現(xiàn)的:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- // 獲取 pred 的狀態(tài)
- int ws = pred.waitStatus;
- // 如果狀態(tài)為 SIGNAL ,那么直接返回 true ,掛起線程即可
- if (ws == Node.SIGNAL)
- return true;
- // 如果狀態(tài)大于 0 ,說明線程被取消
- if (ws > 0) {
- // 從鏈表中移除被 cancel 的線程,使用循環(huán)來保證移除成功
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- // CAS 操作修改 pred 節(jié)點(diǎn)狀態(tài)為 SIGNAL
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- // 不需要掛起線程
- return false;
- }
到這里,關(guān)于 NonfairSync 的獲取鎖就結(jié)束了
接下來咱們看看 FairSync 的獲取鎖和它有什么不同
FairSync.lock()
在 FairSync.lock() 方法中是這樣的:
- final void lock() {
- acquire(1);
- }
因?yàn)?FairSync 是公平鎖,所以不存在 CAS 操作去競爭,直接就是調(diào)用 acquire 方法
接下來的邏輯就和上面一樣了,這里我就不重復(fù)了
咱們瞅瞅 ReentrantLock 是怎么釋放鎖的
ReentrantLock 釋放鎖
在 ReentrantLock 釋放鎖時,調(diào)用的是 sync.release() 方法:
- public void unlock() {
- sync.release(1);
- }
點(diǎn)進(jìn)去發(fā)現(xiàn)調(diào)用的是 AQS 的 release 方法
AQS.release()
AQS 的 release 方法比較好理解,就直接看源碼了:
- public final boolean release(int arg) {
- // 如果釋放鎖成功
- if (tryRelease(arg)) {
- // 獲取 AQS 隊(duì)列的頭結(jié)點(diǎn)
- Node h = head;
- // 如果頭結(jié)點(diǎn)不為空,且狀態(tài) != 0
- if (h != null && h.waitStatus != 0)
- // 調(diào)用 unparkSuccessor 方法喚醒后續(xù)節(jié)點(diǎn)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
ReentrantLock.tryRelease()
在 AQS 中的 tryRelease 方法,只是拋出了異常而已,說明具體實(shí)現(xiàn)是由子類 ReentrantLock 來實(shí)現(xiàn)的
就直接看 ReentrantLock 中的 tryRelease 方法了
在 ReentrantLock 中實(shí)現(xiàn) tryRelease 方法主要邏輯是:
- 首先,如果是同一個線程獲取的同一個鎖,那么它有可能被重入多次,所以需要獲取到要釋放線程的重入次數(shù)即 getState() 然后判斷,該線程是否為獲取到鎖的線程,只有獲取到鎖的線程,才有釋放鎖一說
- 進(jìn)行 unlock 釋放鎖,即:將 state 的值減到 0 ,才算是釋放掉了鎖,此時才能將 owner 置為 null 同時返回 true
看一下具體實(shí)現(xiàn):
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- // 判斷當(dāng)前線程是否為獲取到鎖的線程,如果不是則拋出異常
- // 只有獲取到鎖的線程才釋放鎖
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- // 次數(shù)為 0 ,說釋放鎖完畢
- if (c == 0) {
- free = true;
- // 釋放之后,當(dāng)前線程置為 null
- setExclusiveOwnerThread(null);
- }
- // 更新重入次數(shù)
- setState(c);
- return free;
- }
AQS.unparkSuccessor
釋放鎖成功之后,接下來要做的就是喚醒后面的進(jìn)程,這個方法是在 AQS 中實(shí)現(xiàn)的
主要邏輯是:
- 獲取當(dāng)前節(jié)點(diǎn)狀態(tài),如果小于 0 ,則置為 0
- 獲取當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn),如果不為空,直接喚醒
- 如果為空,或者節(jié)點(diǎn)狀態(tài)大于 0 ,則尋找下一個狀態(tài)小于 0 的節(jié)點(diǎn)
代碼的具體實(shí)現(xiàn)
- private void unparkSuccessor(Node node) {
- // 獲取當(dāng)前節(jié)點(diǎn)的狀態(tài)
- int ws = node.waitStatus;
- // 如果節(jié)點(diǎn)狀態(tài)小于 0 ,則進(jìn)行 CAS 操作設(shè)置為 0
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 獲取當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn) s
- Node s = node.next;
- // 如果 s 為空,則從尾部節(jié)點(diǎn)開始,或者s.waitStatus 大于 0 ,說明節(jié)點(diǎn)被取消
- // 從尾節(jié)點(diǎn)開始,尋找到距離 head 節(jié)點(diǎn)最近的一個 waitStatus <= 0 的節(jié)點(diǎn)
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- // next 節(jié)點(diǎn)不為空,直接喚醒即可
- LockSupport.unpark(s.thread);
- }
為什么要從尾節(jié)點(diǎn)開始尋找距離 head 節(jié)點(diǎn)最近的一個 waitStatus <= 0 的節(jié)點(diǎn)呢?
這是因?yàn)樵?enq() 構(gòu)建節(jié)點(diǎn)的方法中,最后是 t.next = node (忘了就再往上翻翻看),設(shè)置原來的 tail 的 next 節(jié)點(diǎn)指向新的節(jié)點(diǎn)
如果在 CAS 操作之后, t.next = node 操作之前,有其他線程調(diào)用 unlock 方法從 head 開始向后遍歷,因?yàn)榇藭r t.next = node 還沒有執(zhí)行結(jié)束,意味著鏈表的關(guān)系還沒有建立好,這樣就會導(dǎo)致遍歷的時候到 t 節(jié)點(diǎn)這里發(fā)生中斷,因?yàn)榇藭r tail 還沒有指向新的尾節(jié)點(diǎn)
如果從后向前遍歷的話,就不會存在這樣的問題
接下來下一個線程就被喚醒了,然后程序會把它當(dāng)成新的節(jié)點(diǎn)開始執(zhí)行
而原來執(zhí)行結(jié)束的線程,則會將它從隊(duì)列中移除,然后開始循環(huán)循環(huán)
這篇文章終于講完了,阿粉的頭發(fā)都快禿了
參考:JDK 源碼( 1.8 )

























