偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

圖解ReentrantLock的條件變量Condition機(jī)制

開發(fā) 前端
想必大家都使用過wait()和notify()這兩個(gè)方法吧,這兩個(gè)方法主要用于多線程間的協(xié)同處理,即控制線程之間的等待、通知、切換及喚醒。而RenentrantLock也支持這樣條件變量的能力,而且相對于synchronized 更加強(qiáng)大,能夠支持多個(gè)條件變量。

概述

想必大家都使用過wait()和notify()這兩個(gè)方法吧,這兩個(gè)方法主要用于多線程間的協(xié)同處理,即控制線程之間的等待、通知、切換及喚醒。而RenentrantLock也支持這樣條件變量的能力,而且相對于synchronized 更加強(qiáng)大,能夠支持多個(gè)條件變量。

ReentrantLock條件變量使用

ReentrantLock類API

  • Condition newCondition(): 創(chuàng)建條件變量對象

Condition類API

  • void await(): 當(dāng)前線程從運(yùn)行狀態(tài)進(jìn)入等待狀態(tài),同時(shí)釋放鎖,該方法可以被中斷
  • void awaitUninterruptibly():當(dāng)前線程從運(yùn)行狀態(tài)進(jìn)入等待狀態(tài),該方法不能夠被中斷
  • void signal(): 喚醒一個(gè)等待在 Condition 條件隊(duì)列上的線程
  • void signalAll(): 喚醒阻塞在條件隊(duì)列上的所有線程
@Test
public void testCondition() throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
//創(chuàng)建新的條件變量
Condition condition = lock.newCondition();
Thread thread0 = new Thread(() -> {
lock.lock();
try {
System.out.println("線程0獲取鎖");
// sleep不會釋放鎖
Thread.sleep(500);
//進(jìn)入休息室等待
System.out.println("線程0釋放鎖,進(jìn)入等待");
condition.await();
System.out.println("線程0被喚醒了");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
thread0.start();
//叫醒
Thread thread1 = new Thread(() -> {
lock.lock();
try {
System.out.println("線程1獲取鎖");
//喚醒
condition.signal();
System.out.println("線程1喚醒線程0");
} finally {
lock.unlock();
System.out.println("線程1釋放鎖");
}
});
thread1.start();

thread0.join();
thread1.join();
}

運(yùn)行結(jié)果:

圖片

  • condition的wait和notify必須在lock范圍內(nèi)
  • 實(shí)現(xiàn)條件變量的等待和喚醒,他們必須是同一個(gè)condition。
  • 線程1執(zhí)行conidtion.notify()后,并沒有釋放鎖,需要等釋放鎖后,線程0重新獲取鎖成功后,才能繼續(xù)向下執(zhí)行。

圖解實(shí)現(xiàn)原理

await過程

  1. 線程0(Thread-0)一開始獲取鎖,exclusiveOwnerThread字段是Thread-0, 如下圖中的深藍(lán)色節(jié)點(diǎn)

圖片

  1. Thread-0調(diào)用await方法,Thread-0封裝成Node進(jìn)入ConditionObject的隊(duì)列,因?yàn)榇藭r(shí)只有一個(gè)節(jié)點(diǎn),所有firstWaiter和lastWaiter都指向Thread-0,會釋放鎖資源,NofairSync中的state會變成0,同時(shí)exclusiveOwnerThread設(shè)置為null。如下圖所示。

圖片

  1. 線程1(Thread-1)被喚醒,重新獲取鎖,如下圖的深藍(lán)色節(jié)點(diǎn)所示。

圖片

  1. Thread-0被park阻塞,如下圖灰色節(jié)點(diǎn)所示:

圖片

源碼如下:

下面是await()方法的整體流程,其中LockSupport.park(this)進(jìn)行阻塞當(dāng)前線程,后續(xù)喚醒,也會在這個(gè)程序點(diǎn)恢復(fù)執(zhí)行。

public final void await() throws InterruptedException {
// 判斷當(dāng)前線程是否是中斷狀態(tài),是就直接給個(gè)中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 將調(diào)用 await 的線程包裝成 Node,添加到條件隊(duì)列并返回
Node node = addConditionWaiter();
// 完全釋放節(jié)點(diǎn)持有的鎖,因?yàn)槠渌€程喚醒當(dāng)前線程的前提是【持有鎖】
int savedState = fullyRelease(node);

// 設(shè)置打斷模式為沒有被打斷,狀態(tài)碼為 0
int interruptMode = 0;

// 如果該節(jié)點(diǎn)還沒有轉(zhuǎn)移至 AQS 阻塞隊(duì)列, park 阻塞,等待進(jìn)入阻塞隊(duì)列
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線程,待會
LockSupport.park(this);
// 如果被打斷,退出等待隊(duì)列,對應(yīng)的 node 【也會被遷移到阻塞隊(duì)列】尾部,狀態(tài)設(shè)置為 0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 邏輯到這說明當(dāng)前線程退出等待隊(duì)列,進(jìn)入【阻塞隊(duì)列】

// 嘗試槍鎖,釋放了多少鎖就【重新獲取多少鎖】,獲取鎖成功判斷打斷模式
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

// node 在條件隊(duì)列時(shí) 如果被外部線程中斷喚醒,會加入到阻塞隊(duì)列,但是并未設(shè) nextWaiter = null
if (node.nextWaiter != null)
// 清理?xiàng)l件隊(duì)列內(nèi)所有已取消的 Node
unlinkCancelledWaiters();
// 條件成立說明掛起期間發(fā)生過中斷
if (interruptMode != 0)
// 應(yīng)用打斷模式
reportInterruptAfterWait(interruptMode);
}
  • 將線程封裝成Node, 加入到ConditionObject隊(duì)列尾部,此時(shí)節(jié)點(diǎn)的等待狀態(tài)時(shí)-2。
private Node addConditionWaiter() {
// 獲取當(dāng)前條件隊(duì)列的尾節(jié)點(diǎn)的引用,保存到局部變量 t 中
Node t = lastWaiter;
// 當(dāng)前隊(duì)列中不是空,并且節(jié)點(diǎn)的狀態(tài)不是 CONDITION(-2),說明當(dāng)前節(jié)點(diǎn)發(fā)生了中斷
if (t != null && t.waitStatus != Node.CONDITION) {
// 清理?xiàng)l件隊(duì)列內(nèi)所有已取消的 Node
unlinkCancelledWaiters();
// 清理完成重新獲取 尾節(jié)點(diǎn) 的引用
t = lastWaiter;
}
// 創(chuàng)建一個(gè)關(guān)聯(lián)當(dāng)前線程的新 node, 設(shè)置狀態(tài)為 CONDITION(-2),添加至隊(duì)列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node; // 空隊(duì)列直接放在隊(duì)首【不用CAS因?yàn)閳?zhí)行線程是持鎖線程,并發(fā)安全】
else
t.nextWaiter = node; // 非空隊(duì)列隊(duì)尾追加
lastWaiter = node; // 更新隊(duì)尾的引用
return node;
}
  • 清理?xiàng)l件隊(duì)列中的cancel類型的節(jié)點(diǎn),比如中斷、超時(shí)等會導(dǎo)致節(jié)點(diǎn)轉(zhuǎn)換為Cancel
// 清理?xiàng)l件隊(duì)列內(nèi)所有已取消(不是CONDITION)的 node,【鏈表刪除的邏輯】
private void unlinkCancelledWaiters(){
// 從頭節(jié)點(diǎn)開始遍歷【FIFO】
Node t = firstWaiter;
// 指向正常的 CONDITION 節(jié)點(diǎn)
Node trail = null;
// 等待隊(duì)列不空
while (t != null) {
// 獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node next = t.nextWaiter;
// 判斷 t 節(jié)點(diǎn)是不是 CONDITION 節(jié)點(diǎn),條件隊(duì)列內(nèi)不是 CONDITION 就不是正常的
if (t.waitStatus != Node.CONDITION) {
// 不是正常節(jié)點(diǎn),需要 t 與下一個(gè)節(jié)點(diǎn)斷開
t.nextWaiter = null;
// 條件成立說明遍歷到的節(jié)點(diǎn)還未碰到過正常節(jié)點(diǎn)
if (trail == null)
// 更新 firstWaiter 指針為下個(gè)節(jié)點(diǎn)
firstWaiter = next;
else
// 讓上一個(gè)正常節(jié)點(diǎn)指向 當(dāng)前取消節(jié)點(diǎn)的 下一個(gè)節(jié)點(diǎn),【刪除非正常的節(jié)點(diǎn)】
trail.nextWaiter = next;
// t 是尾節(jié)點(diǎn)了,更新 lastWaiter 指向最后一個(gè)正常節(jié)點(diǎn)
if (next == null)
lastWaiter = trail;
} else {
// trail 指向的是正常節(jié)點(diǎn)
trail = t;
}
// 把 t.next 賦值給 t,循環(huán)遍歷
t = next;
}
}
  • fullyRelease方法將r讓Thread-0釋放鎖, 這個(gè)時(shí)候Thread-1就會去競爭鎖
// 線程可能重入,需要將 state 全部釋放
final int fullyRelease(Node node) {
// 完全釋放鎖是否成功,false 代表成功
boolean failed = true;
try {
// 獲取當(dāng)前線程所持有的 state 值總數(shù)
int savedState = getState();
// release -> tryRelease 解鎖重入鎖
if (release(savedState)) {
// 釋放成功
failed = false;
// 返回解鎖的深度
return savedState;
} else {
// 解鎖失敗拋出異常
throw new IllegalMonitorStateException();
}
} finally {
// 沒有釋放成功,將當(dāng)前 node 設(shè)置為取消狀態(tài)
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
  • 判斷節(jié)點(diǎn)是否在AQS阻塞對列中,不在條件對列中
final boolean isOnSyncQueue(Node node) {
// node 的狀態(tài)是 CONDITION,signal 方法是先修改狀態(tài)再遷移,所以前驅(qū)節(jié)點(diǎn)為空證明還【沒有完成遷移】
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 說明當(dāng)前節(jié)點(diǎn)已經(jīng)成功入隊(duì)到阻塞隊(duì)列,且當(dāng)前節(jié)點(diǎn)后面已經(jīng)有其它 node,因?yàn)闂l件隊(duì)列的 next 指針為 null
if (node.next != null)
return true;
// 說明【可能在阻塞隊(duì)列,但是是尾節(jié)點(diǎn)】
// 從阻塞隊(duì)列的尾節(jié)點(diǎn)開始向前【遍歷查找 node】,如果查找到返回 true,查找不到返回 false
return findNodeFromTail(node);
}

signal過程

  1. Thread-1執(zhí)行signal方法喚醒條件隊(duì)列中的第一個(gè)節(jié)點(diǎn),即Thread-0,條件隊(duì)列置空

圖片

  1. Thread-0的節(jié)點(diǎn)的等待狀態(tài)變更為0, 重新加入到AQS隊(duì)列尾部。

圖片

  1. 后續(xù)就是Thread-1釋放鎖,其他線程重新?lián)屾i。

源碼如下:

  • signal()方法是喚醒的入口方法
public final void signal() {
// 判斷調(diào)用 signal 方法的線程是否是獨(dú)占鎖持有線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 獲取條件隊(duì)列中第一個(gè) Node
Node first = firstWaiter;
// 不為空就將第該節(jié)點(diǎn)【遷移到阻塞隊(duì)列】
if (first != null)
doSignal(first);
}
  • 調(diào)用doSignal()方法喚醒節(jié)點(diǎn)
// 喚醒 - 【將沒取消的第一個(gè)節(jié)點(diǎn)轉(zhuǎn)移至 AQS 隊(duì)列尾部】
private void doSignal(Node first){
do {
// 成立說明當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是 null,當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)了,隊(duì)列中只有當(dāng)前一個(gè)節(jié)點(diǎn)了
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 將等待隊(duì)列中的 Node 轉(zhuǎn)移至 AQS 隊(duì)列,不成功且還有節(jié)點(diǎn)則繼續(xù)循環(huán)
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// signalAll() 會調(diào)用這個(gè)函數(shù),喚醒所有的節(jié)點(diǎn)
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
// 喚醒所有的節(jié)點(diǎn),都放到阻塞隊(duì)列中
} while (first != null);
}
  • 調(diào)用transferForSignal()方法,先將節(jié)點(diǎn)的 waitStatus 改為 0,然后加入 AQS 阻塞隊(duì)列尾部,將 Thread-3 的 waitStatus 改為 -1。
// 如果節(jié)點(diǎn)狀態(tài)是取消, 返回 false 表示轉(zhuǎn)移失敗, 否則轉(zhuǎn)移成功
final boolean transferForSignal(Node node) {
// CAS 修改當(dāng)前節(jié)點(diǎn)的狀態(tài),修改為 0,因?yàn)楫?dāng)前節(jié)點(diǎn)馬上要遷移到阻塞隊(duì)列了
// 如果狀態(tài)已經(jīng)不是 CONDITION, 說明線程被取消(await 釋放全部鎖失敗)或者被中斷(可打斷 cancelAcquire)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 返回函數(shù)調(diào)用處繼續(xù)尋找下一個(gè)節(jié)點(diǎn)
return false;

// 【先改狀態(tài),再進(jìn)行遷移】
// 將當(dāng)前 node 入阻塞隊(duì)列,p 是當(dāng)前節(jié)點(diǎn)在阻塞隊(duì)列的【前驅(qū)節(jié)點(diǎn)】
Node p = enq(node);
int ws = p.waitStatus;

// 如果前驅(qū)節(jié)點(diǎn)被取消或者不能設(shè)置狀態(tài)為 Node.SIGNAL,就 unpark 取消當(dāng)前節(jié)點(diǎn)線程的阻塞狀態(tài),
// 讓 thread-0 線程競爭鎖,重新同步狀態(tài)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

總結(jié)

本文講解了ReentrantLock中條件變量的使用和原理實(shí)現(xiàn),希望對大家有幫助。

責(zé)任編輯:武曉燕 來源: JAVA旭陽
相關(guān)推薦

2011-11-23 10:09:19

Java線程機(jī)制

2024-07-05 08:32:36

2022-01-14 07:56:38

Checkpoint機(jī)制Flink

2025-05-08 08:31:33

2025-04-14 08:31:20

2022-12-26 00:00:04

公平鎖非公平鎖

2023-10-11 08:22:33

線程AQScondition

2018-07-12 15:30:03

HTTP緩存機(jī)制

2016-12-08 10:19:18

Android事件分發(fā)機(jī)制

2022-11-02 15:35:35

Condition代碼線程

2022-06-20 08:03:17

KafkaJava NIO

2023-05-18 08:38:13

Java鎖機(jī)制

2023-04-12 08:00:34

Dubbo分布式服務(wù)

2021-12-09 08:31:01

ReentrantLoAQS

2020-09-26 21:43:59

Linux系統(tǒng)編程條件變量

2020-10-08 18:58:46

條件變量開發(fā)線程

2023-03-15 08:30:37

2023-07-06 08:06:47

LockCondition公平鎖

2013-07-31 11:09:05

C++11

2022-09-23 08:02:42

Kafka消息緩存
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號