面試多線程同步,你必須要思考的問題
ReentrantLock的實現(xiàn)網(wǎng)上有很多文章了,本篇文章會簡單介紹下其java層實現(xiàn),重點放在分析競爭鎖失敗后如何阻塞線程。 因篇幅有限,synchronized的內(nèi)容將會放到下篇文章。
Java Lock的實現(xiàn)
ReentrantLock是jdk中常用的鎖實現(xiàn),其實現(xiàn)邏輯主語基于AQS(juc包中的大多數(shù)同步類實現(xiàn)都是基于AQS);接下來會簡單介紹AQS的大致原理,關(guān)于其實現(xiàn)細節(jié)以及各種應(yīng)用,之后會寫一篇文章具體分析。
AQS
AQS是類AbstractQueuedSynchronizer.java的簡稱,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。
其大致原理如下:
- AQS維護一個叫做state的int型變量和一個雙向鏈表,state用來表示同步狀態(tài),雙向鏈表存儲的是等待鎖的線程
 - 加鎖時首先調(diào)用tryAcquire嘗試獲得鎖,如果獲得鎖失敗,則將線程插入到雙向鏈表中,并調(diào)用LockSupport.park()方法阻塞當(dāng)前線程。
 - 釋放鎖時調(diào)用LockSupport.unpark()喚起鏈表中的第一個節(jié)點的線程。被喚起的線程會重新走一遍競爭鎖的流程。
 
其中tryAcquire方法是抽象方法,具體實現(xiàn)取決于實現(xiàn)類,我們常說的公平鎖和非公平鎖的區(qū)別就在于該方法的實現(xiàn)。
ReentrantLock
ReentrantLock分為公平鎖和非公平鎖,我們只看公平鎖。 ReentrantLock.lock會調(diào)用到ReentrantLock#FairSync.lock中:
FairSync.java
- static final class FairSync extends Sync {
 - final void lock() {
 - acquire(1);
 - }
 - /**
 - * Fair version of tryAcquire. Don't grant access unless
 - * recursive call or no waiters or is first.
 - */
 - protected final boolean tryAcquire(int acquires) {
 - final Thread current = Thread.currentThread();
 - int c = getState();
 - if (c == 0) {
 - if (!hasQueuedPredecessors() &&
 - compareAndSetState(0, acquires)) {
 - setExclusiveOwnerThread(current);
 - return true;
 - }
 - }
 - else if (current == getExclusiveOwnerThread()) {
 - int nextc = c + acquires;
 - if (nextc < 0)
 - throw new Error("Maximum lock count exceeded");
 - setState(nextc);
 - return true;
 - }
 - return false;
 - }
 - }
 
AbstractQueuedSynchronizer.java
- public final void acquire(int arg) {
 - if (!tryAcquire(arg) &&
 - acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 - selfInterrupt();
 - }
 
可以看到FairSync.lock調(diào)用了AQS的acquire方法,而在acquire中首先調(diào)用tryAcquire嘗試獲得鎖,以下兩種情況返回true:
- state==0(代表沒有線程持有鎖),且等待隊列為空(公平的實現(xiàn)),且cas修改state成功。
 - 當(dāng)前線程已經(jīng)獲得了鎖,這次調(diào)用是重入
 
如果tryAcquire失敗則調(diào)用acquireQueued阻塞當(dāng)前線程。acquireQueued最終會調(diào)用到LockSupport.park()阻塞線程。
LockSupport.park
個人認為,要深入理解鎖機制,一個很重要的點是理解系統(tǒng)是如何阻塞線程的。
LockSupport.java
- public static void park(Object blocker) {
 - Thread t = Thread.currentThread();
 - setBlocker(t, blocker);
 - UNSAFE.park(false, 0L);
 - setBlocker(t, null);
 - }
 
park方法的參數(shù)blocker是用于負責(zé)這次阻塞的同步對象,在AQS的調(diào)用中,這個對象就是AQS本身。我們知道synchronized關(guān)鍵字是需要指定一個對象的(如果作用于方法上則是當(dāng)前對象或當(dāng)前類),與之類似blocker就是LockSupport指定的對象。
park方法調(diào)用了native方法UNSAFE.park,第一個參數(shù)代表第二個參數(shù)是否是絕對時間,第二個參數(shù)代表最長阻塞時間。
其實現(xiàn)如下,只保留核心代碼,完整代碼看查看unsafe.cpp
- Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time){
 - ...
 - thread->parker()->park(isAbsolute != 0, time);
 - ...
 - }
 
park方法在os_linux.cpp中(其他操作系統(tǒng)的實現(xiàn)在os_xxx中)
- void Parker::park(bool isAbsolute, jlong time) {
 - ...
 - //獲得當(dāng)前線程
 - Thread* thread = Thread::current();
 - assert(thread->is_Java_thread(), "Must be JavaThread");
 - JavaThread *jt = (JavaThread *)thread;
 - //如果當(dāng)前線程被設(shè)置了interrupted標(biāo)記,則直接返回
 - if (Thread::is_interrupted(thread, false)) {
 - return;
 - }
 - if (time > 0) {
 - //unpacktime中根據(jù)isAbsolute的值來填充absTime結(jié)構(gòu)體,isAbsolute為true時,time代表絕對時間且單位是毫秒,否則time是相對時間且單位是納秒
 - //absTime.tvsec代表了對于時間的秒
 - //absTime.tv_nsec代表對應(yīng)時間的納秒
 - unpackTime(&absTime, isAbsolute, time);
 - }
 - //調(diào)用mutex trylock方法
 - if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
 - return;
 - }
 - //_counter是一個許可的數(shù)量,跟ReentrantLock里定義的許可變量基本都是一個原理。 unpack方法調(diào)用時會將_counter賦值為1。
 - //_counter>0代表已經(jīng)有人調(diào)用了unpark,所以不用阻塞
 - int status ;
 - if (_counter > 0) { // no wait needed
 - _counter = 0;
 - //釋放mutex鎖
 - status = pthread_mutex_unlock(_mutex);
 - return;
 - }
 - //設(shè)置線程狀態(tài)為CONDVAR_WAIT
 - OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
 - ...
 - //等待
 - _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
 - pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
 - ...
 - //釋放mutex鎖
 - status = pthread_mutex_unlock(_mutex) ;
 - }
 
park方法用POSIX的pthread_cond_timedwait方法阻塞線程,調(diào)用pthread_cond_timedwait前需要先獲得鎖,因此park主要流程為:
- 調(diào)用pthread_mutex_trylock嘗試獲得鎖,如果獲取鎖失敗則直接返回
 - 調(diào)用pthread_cond_timedwait進行等待
 - 調(diào)用pthread_mutex_unlock釋放鎖
 
另外,在阻塞當(dāng)前線程前,會調(diào)用OSThreadWaitState的構(gòu)造方法將線程狀態(tài)設(shè)置為CONDVAR_WAIT,在Jvm中Thread狀態(tài)枚舉如下
- enum ThreadState {
 - ALLOCATED, // Memory has been allocated but not initialized
 - INITIALIZED, // The thread has been initialized but yet started
 - RUNNABLE, // Has been started and is runnable, but not necessarily running
 - MONITOR_WAIT, // Waiting on a contended monitor lock
 - CONDVAR_WAIT, // Waiting on a condition variable
 - OBJECT_WAIT, // Waiting on an Object.wait() call
 - BREAKPOINTED, // Suspended at breakpoint
 - SLEEPING, // Thread.sleep()
 - ZOMBIE // All done, but not reclaimed yet
 - };
 
Linux的timedwait
由上文我們可以知道LockSupport.park方法最終是由POSIX的 pthread_cond_timedwait的方法實現(xiàn)的。 我們現(xiàn)在就進一步看看pthread_mutex_trylock,pthread_cond_timedwait,pthread_mutex_unlock這幾個方法是如何實現(xiàn)的。
Linux系統(tǒng)中相關(guān)代碼在glibc庫中。
pthread_mutex_trylock
先看trylock的實現(xiàn), 代碼在glibc的pthread_mutex_trylock.c文件中,該方法代碼很多,我們只看主要代碼
- //pthread_mutex_t是posix中的互斥鎖結(jié)構(gòu)體
 - int
 - __pthread_mutex_trylock (mutex)
 - pthread_mutex_t *mutex;
 - {
 - int oldval;
 - pid_t id = THREAD_GETMEM (THREAD_SELF, tid);
 - switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex),
 - PTHREAD_MUTEX_TIMED_NP))
 - {
 - case PTHREAD_MUTEX_ERRORCHECK_NP:
 - case PTHREAD_MUTEX_TIMED_NP:
 - case PTHREAD_MUTEX_ADAPTIVE_NP:
 - /* Normal mutex. */
 - if (lll_trylock (mutex->__data.__lock) != 0)
 - break;
 - /* Record the ownership. */
 - mutex->__data.__owner = id;
 - ++mutex->__data.__nusers;
 - return 0;
 - }
 - }
 - //以下代碼在lowlevellock.h中
 - #define __lll_trylock(futex) \
 - (atomic_compare_and_exchange_val_acq (futex, 1, 0) != 0)
 - #define lll_trylock(futex) __lll_trylock (&(futex))
 
mutex默認用的是PTHREAD_MUTEX_NORMAL類型(與PTHREAD_MUTEX_TIMED_NP相同); 因此會先調(diào)用lll_trylock方法,lll_trylock實際上是一個cas操作,如果mutex->data.lock==0則將其修改為1并返回0,否則返回1。
如果成功,則更改mutex中的owner為當(dāng)前線程。
pthread_mutex_unlock
pthread_mutex_unlock.c
- int
 - internal_function attribute_hidden
 - __pthread_mutex_unlock_usercnt (mutex, decr)
 - pthread_mutex_t *mutex;
 - int decr;
 - {
 - if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
 - == PTHREAD_MUTEX_TIMED_NP)
 - {
 - /* Always reset the owner field. */
 - normal:
 - mutex->__data.__owner = 0;
 - if (decr)
 - /* One less user. */
 - --mutex->__data.__nusers;
 - /* Unlock. */
 - lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex));
 - return 0;
 - }
 - }
 
pthread_mutex_unlock將mutex中的owner清空,并調(diào)用了lll_unlock方法
lowlevellock.h
- #define __lll_unlock(futex, private) \
 - ((void) ({ \
 - int *__futex = (futex); \
 - int __val = atomic_exchange_rel (__futex, 0); \
 - \
 - if (__builtin_expect (__val > 1, 0)) \
 - lll_futex_wake (__futex, 1, private); \
 - }))
 - #define lll_unlock(futex, private) __lll_unlock(&(futex), private)
 - #define lll_futex_wake(ftx, nr, private) \
 - ({ \
 - DO_INLINE_SYSCALL(futex, 3, (long) (ftx), \
 - __lll_private_flag (FUTEX_WAKE, private), \
 - (int) (nr)); \
 - _r10 == -1 ? -_retval : _retval; \
 - })
 
lll_unlock分為兩個步驟:
- 將futex設(shè)置為0并拿到設(shè)置之前的值(用戶態(tài)操作)
 - 如果futex之前的值>1,代表存在鎖沖突,也就是說有線程調(diào)用了FUTEX_WAIT在休眠,所以通過調(diào)用系統(tǒng)函數(shù)FUTEX_WAKE喚醒休眠線程
 
FUTEX_WAKE在上一篇文章有分析,futex機制的核心是當(dāng)獲得鎖時,嘗試cas更改一個int型變量(用戶態(tài)操作),如果integer原始值是0,則修改成功,該線程獲得鎖,否則就將當(dāng)期線程放入到 wait queue中,wait queue中的線程不會被系統(tǒng)調(diào)度(內(nèi)核態(tài)操作)。
futex變量的值有3種:0代表當(dāng)前鎖空閑,1代表有線程持有當(dāng)前鎖,2代表存在鎖沖突。futex的值初始化時是0;當(dāng)調(diào)用try_lock的時候會利用cas操作改為1(見上面的trylock函數(shù));當(dāng)調(diào)用lll_lock時,如果不存在鎖沖突,則將其改為1,否則改為2。
- #define __lll_lock(futex, private) \
 - ((void) ({ \
 - int *__futex = (futex); \
 - if (__builtin_expect (atomic_compare_and_exchange_bool_acq (__futex, \
 - 1, 0), 0)) \
 - { \
 - if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
 - __lll_lock_wait_private (__futex); \
 - else \
 - __lll_lock_wait (__futex, private); \
 - } \
 - }))
 - #define lll_lock(futex, private) __lll_lock (&(futex), private)
 - void
 - __lll_lock_wait_private (int *futex)
 - {
 - //第一次進來的時候futex==1,所以不會走這個if
 - if (*futex == 2)
 - lll_futex_wait (futex, 2, LLL_PRIVATE);
 - //在這里會把futex設(shè)置成2,并調(diào)用futex_wait讓當(dāng)前線程等待
 - while (atomic_exchange_acq (futex, 2) != 0)
 - lll_futex_wait (futex, 2, LLL_PRIVATE);
 - }
 
pthread_cond_timedwait
pthread_cond_timedwait用于阻塞線程,實現(xiàn)線程等待, 代碼在glibc的pthread_cond_timedwait.c文件中,代碼較長,你可以先簡單過一遍,看完下面的分析再重新讀一遍代碼
- int
 - int
 - __pthread_cond_timedwait (cond, mutex, abstime)
 - pthread_cond_t *cond;
 - pthread_mutex_t *mutex;
 - const struct timespec *abstime;
 - {
 - struct _pthread_cleanup_buffer buffer;
 - struct _condvar_cleanup_buffer cbuffer;
 - int result = 0;
 - /* Catch invalid parameters. */
 - if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
 - return EINVAL;
 - int pshared = (cond->__data.__mutex == (void *) ~0l)
 - ? LLL_SHARED : LLL_PRIVATE;
 - //1.獲得cond鎖
 - lll_lock (cond->__data.__lock, pshared);
 - //2.釋放mutex鎖
 - int err = __pthread_mutex_unlock_usercnt (mutex, 0);
 - if (err)
 - {
 - lll_unlock (cond->__data.__lock, pshared);
 - return err;
 - }
 - /* We have one new user of the condvar. */
 - //每執(zhí)行一次wait(pthread_cond_timedwait/pthread_cond_wait),__total_seq就會+1
 - ++cond->__data.__total_seq;
 - //用來執(zhí)行futex_wait的變量
 - ++cond->__data.__futex;
 - //標(biāo)識該cond還有多少線程在使用,pthread_cond_destroy需要等待所有的操作完成
 - cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;
 - /* Remember the mutex we are using here. If there is already a
 - different address store this is a bad user bug. Do not store
 - anything for pshared condvars. */
 - //保存mutex鎖
 - if (cond->__data.__mutex != (void *) ~0l)
 - cond->__data.__mutex = mutex;
 - /* Prepare structure passed to cancellation handler. */
 - cbuffer.cond = cond;
 - cbuffer.mutex = mutex;
 - /* Before we block we enable cancellation. Therefore we have to
 - install a cancellation handler. */
 - __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);
 - /* The current values of the wakeup counter. The "woken" counter
 - must exceed this value. */
 - //記錄futex_wait前的__wakeup_seq(為該cond上執(zhí)行了多少次sign操作+timeout次數(shù))和__broadcast_seq(代表在該cond上執(zhí)行了多少次broadcast)
 - unsigned long long int val;
 - unsigned long long int seq;
 - val = seq = cond->__data.__wakeup_seq;
 - /* Remember the broadcast counter. */
 - cbuffer.bc_seq = cond->__data.__broadcast_seq;
 - while (1)
 - {
 - //3.計算要wait的相對時間
 - struct timespec rt;
 - {
 - #ifdef __NR_clock_gettime
 - INTERNAL_SYSCALL_DECL (err);
 - int ret;
 - ret = INTERNAL_VSYSCALL (clock_gettime, err, 2,
 - (cond->__data.__nwaiters
 - & ((1 << COND_NWAITERS_SHIFT) - 1)),
 - &rt);
 - # ifndef __ASSUME_POSIX_TIMERS
 - if (__builtin_expect (INTERNAL_SYSCALL_ERROR_P (ret, err), 0))
 - {
 - struct timeval tv;
 - (void) gettimeofday (&tv, NULL);
 - /* Convert the absolute timeout value to a relative timeout. */
 - rt.tv_sec = abstime->tv_sec - tv.tv_sec;
 - rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
 - }
 - else
 - # endif
 - {
 - /* Convert the absolute timeout value to a relative timeout. */
 - rt.tv_sec = abstime->tv_sec - rt.tv_sec;
 - rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec;
 - }
 - #else
 - /* Get the current time. So far we support only one clock. */
 - struct timeval tv;
 - (void) gettimeofday (&tv, NULL);
 - /* Convert the absolute timeout value to a relative timeout. */
 - rt.tv_sec = abstime->tv_sec - tv.tv_sec;
 - rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
 - #endif
 - }
 - if (rt.tv_nsec < 0)
 - {
 - rt.tv_nsec += 1000000000;
 - --rt.tv_sec;
 - }
 - /*---計算要wait的相對時間 end---- */
 - //是否超時
 - /* Did we already time out? */
 - if (__builtin_expect (rt.tv_sec < 0, 0))
 - {
 - //被broadcast喚醒,這里疑問的是,為什么不需要判斷__wakeup_seq?
 - if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
 - goto bc_out;
 - goto timeout;
 - }
 - unsigned int futex_val = cond->__data.__futex;
 - //4.釋放cond鎖,準(zhǔn)備wait
 - lll_unlock (cond->__data.__lock, pshared);
 - /* Enable asynchronous cancellation. Required by the standard. */
 - cbuffer.oldtype = __pthread_enable_asynccancel ();
 - //5.調(diào)用futex_wait
 - /* Wait until woken by signal or broadcast. */
 - err = lll_futex_timed_wait (&cond->__data.__futex,
 - futex_val, &rt, pshared);
 - /* Disable asynchronous cancellation. */
 - __pthread_disable_asynccancel (cbuffer.oldtype);
 - //6.重新獲得cond鎖,因為又要訪問&修改cond的數(shù)據(jù)了
 - lll_lock (cond->__data.__lock, pshared);
 - //__broadcast_seq值發(fā)生改變,代表發(fā)生了有線程調(diào)用了廣播
 - if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
 - goto bc_out;
 - //判斷是否是被sign喚醒的,sign會增加__wakeup_seq
 - //第二個條件cond->__data.__woken_seq != val的意義在于
 - //可能兩個線程A、B在wait,一個線程調(diào)用了sign導(dǎo)致A被喚醒,這時B因為超時被喚醒
 - //對于B線程來說,執(zhí)行到這里時第一個條件也是滿足的,從而導(dǎo)致上層拿到的result不是超時
 - //所以這里需要判斷下__woken_seq(即該cond已經(jīng)被喚醒的線程數(shù))是否等于__wakeup_seq(sign執(zhí)行次數(shù)+timeout次數(shù))
 - val = cond->__data.__wakeup_seq;
 - if (val != seq && cond->__data.__woken_seq != val)
 - break;
 - /* Not woken yet. Maybe the time expired? */
 - if (__builtin_expect (err == -ETIMEDOUT, 0))
 - {
 - timeout:
 - /* Yep. Adjust the counters. */
 - ++cond->__data.__wakeup_seq;
 - ++cond->__data.__futex;
 - /* The error value. */
 - result = ETIMEDOUT;
 - break;
 - }
 - }
 - //一個線程已經(jīng)醒了所以這里__woken_seq +1
 - ++cond->__data.__woken_seq;
 - bc_out:
 - //
 - cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;
 - /* If pthread_cond_destroy was called on this variable already,
 - notify the pthread_cond_destroy caller all waiters have left
 - and it can be successfully destroyed. */
 - if (cond->__data.__total_seq == -1ULL
 - && cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
 - lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);
 - //9.cond數(shù)據(jù)修改完畢,釋放鎖
 - lll_unlock (cond->__data.__lock, pshared);
 - /* The cancellation handling is back to normal, remove the handler. */
 - __pthread_cleanup_pop (&buffer, 0);
 - //10.重新獲得mutex鎖
 - err = __pthread_mutex_cond_lock (mutex);
 - return err ?: result;
 - }
 
上面的代碼雖然加了注釋,但相信大多數(shù)人第一次看都看不懂。 我們來簡單梳理下,上面代碼有兩把鎖,一把是mutex鎖,一把cond鎖。另外,在調(diào)用pthread_cond_timedwait前后必須調(diào)用pthread_mutex_lock(&mutex);和pthread_mutex_unlock(&mutex);加/解mutex鎖。
因此pthread_cond_timedwait的使用大致分為幾個流程:
- 加mutex鎖(在pthread_cond_timedwait調(diào)用前)
 - 加cond鎖
 - 釋放mutex鎖
 - 修改cond數(shù)據(jù)
 - 釋放cond鎖
 - 執(zhí)行futex_wait
 - 重新獲得cond鎖
 - 比較cond的數(shù)據(jù),判斷當(dāng)前線程是被正常喚醒的還是timeout喚醒的,需不需要重新wait
 - 修改cond數(shù)據(jù)
 - 是否cond鎖
 - 重新獲得mutex鎖
 - 釋放mutex鎖(在pthread_cond_timedwait調(diào)用后)
 
看到這里,你可能有幾點疑問:為什么需要兩把鎖?mutex鎖和cond鎖的作用是什么?
mutex鎖
說mutex鎖的作用之前,我們回顧一下java的Object.wait的使用。Object.wait必須是在synchronized同步塊中使用。試想下如果不加synchronized也能運行Object.wait的話會存在什么問題?
- Object condObj=new Object();
 - voilate int flag = 0;
 - public void waitTest(){
 - if(flag == 0){
 - condObj.wait();
 - }
 - }
 - public void notifyTest(){
 - flag=1;
 - condObj.notify();
 - }
 
如上代碼,A線程調(diào)用waitTest,這時flag==0,所以準(zhǔn)備調(diào)用wait方法進行休眠,這時B線程開始執(zhí)行,調(diào)用notifyTest將flag置為1,并調(diào)用notify方法,注意:此時A線程還沒調(diào)用wait,所以notfiy沒有喚醒任何線程。然后A線程繼續(xù)執(zhí)行,調(diào)用wait方法進行休眠,而之后不會有人來喚醒A線程,A線程將永久wait下去!
- Object condObj=new Object();
 - voilate int flag = 0;
 - public void waitTest(){
 - synchronized(condObj){
 - if(flag == 0){
 - condObj.wait();
 - }
 - }
 - }
 - public void notifyTest(){
 - synchronized(condObj){
 - flag=1;
 - condObj.notify();
 - }
 - }
 
在有鎖保護下的情況下, 當(dāng)調(diào)用condObj.wait時,flag一定是等于0的,不會存在一直wait的問題。
回到pthread_cond_timedwait,其需要加mutex鎖的原因就呼之欲出了: 保證wait和其wait條件的原子性
不管是glibc的pthread_cond_timedwait/pthread_cond_signal還是java層的Object.wait/Object.notify,Jdk AQS的Condition.await/Condition.signal,所有的Condition機制都需要在加鎖環(huán)境下才能使用,其根本原因就是要保證進行線程休眠時,條件變量是沒有被篡改的。
注意下mutex鎖釋放的時機,回顧上文中pthread_cond_timedwait的流程,在第2步時就釋放了mutex鎖,之后調(diào)用futex_wait進行休眠,為什么要在休眠前就釋放mutex鎖呢?原因也很簡單:如果不釋放mutex鎖就開始休眠,那其他線程就永遠無法調(diào)用signal方法將休眠線程喚醒(因為調(diào)用signal方法前需要獲得mutex鎖)。
在線程被喚醒之后還要在第10步中重新獲得mutex鎖是為了保證鎖的語義(思考下如果不重新獲得mutex鎖會發(fā)生什么)。
cond鎖
cond鎖的作用其實很簡單: 保證對象cond->data的線程安全。 在pthread_cond_timedwait時需要修改cond->data的數(shù)據(jù),如增加total_seq(在這個cond上一共執(zhí)行過多少次wait)增加nwaiters(現(xiàn)在還有多少個線程在wait這個cond),所有在修改及訪問cond->data時需要加cond鎖。
這里我沒想明白的一點是,用mutex鎖也能保證cond->data修改的線程安全,只要晚一點釋放mutex鎖就行了。為什么要先釋放mutex,重新獲得cond來保證線程安全? 是為了避免mutex鎖住的范圍太大嗎?
該問題的答案可以見評論區(qū)@11800222 的回答:
mutex鎖不能保護cond->data修改的線程安全,調(diào)用signal的線程沒有用mutex鎖保護修改cond的那段臨界區(qū)。
pthread_cond_wait/signal這一對本身用cond鎖同步就能睡眠喚醒。 wait的時候需要傳入mutex是因為睡眠前需要釋放mutex鎖,但睡眠之前又不能有無鎖的空隙,解決辦法是讓mutex鎖在cond鎖上之后再釋放。 而signal前不需要釋放mutex鎖,在持有mutex的情況下signal,之后再釋放mutex鎖。
如何喚醒休眠線程
喚醒休眠線程的代碼比較簡單,主要就是調(diào)用lll_futex_wake。
- int
 - __pthread_cond_signal (cond)
 - pthread_cond_t *cond;
 - {
 - int pshared = (cond->__data.__mutex == (void *) ~0l)
 - ? LLL_SHARED : LLL_PRIVATE;
 - //因為要操作cond的數(shù)據(jù),所以要加鎖
 - lll_lock (cond->__data.__lock, pshared);
 - /* Are there any waiters to be woken? */
 - if (cond->__data.__total_seq > cond->__data.__wakeup_seq)
 - {
 - //__wakeup_seq為執(zhí)行sign與timeout次數(shù)的和
 - ++cond->__data.__wakeup_seq;
 - ++cond->__data.__futex;
 - ...
 - //喚醒wait的線程
 - lll_futex_wake (&cond->__data.__futex, 1, pshared);
 - }
 - /* We are done. */
 - lll_unlock (cond->__data.__lock, pshared);
 - return 0;
 - }
 
End
本文對Java簡單介紹了ReentrantLock實現(xiàn)原理,對LockSupport.park底層實現(xiàn)pthread_cond_timedwait機制做了詳細分析。















 
 
 








 
 
 
 