Linux內(nèi)核中的同步與互斥
先看進程間的互斥。在linux內(nèi)核中主要通過semaphore機制和spin_lock機制實現(xiàn)。主要的區(qū)別是在semaphore機制中,進不了臨界區(qū)時會進行進程的切換,而spin_lock剛執(zhí)行忙等(在SMP中)。
先看內(nèi)核中的semaphore機制。前提是對引用計數(shù)count增減的原子性操作。內(nèi)核用ato
mic_t的數(shù)據(jù)結(jié)構(gòu)和在它上面的一系列操作如atomic_add()、atomic_sub()等等實現(xiàn)。(定義在atomic.h中)
semaphone機制主要通過up()和down()兩個操作實現(xiàn)。
semaphone的結(jié)構(gòu)為
- struct semaphore {
- atomic_t count;
- int sleepers;
- wait_queue_head_t wait;
- };
相應(yīng)的down()函數(shù)為
- static inline void down(struct semaphore*sem)
- {
/* 1 */sem->count--; //為原子操作
- if(sem->count<0)
- {
- struct task_struct *tsk = current;
- DECLARE_WAITQUEUE(wait, tsk);
- tsk->state = TASK_UNINTERRUPTIBLE;
- add_wait_queue_exclusive(&sem->wait, &wait);
- spin_lock_irq(&semaphore_lock);
/* 2 */ sem->sleepers++;
- for (;;) {
- int sleepers = sem->sleepers;
- /*
- * Add "everybody else" into it. They aren't
- * playing, because we own the spinlock.
- */
/* 3 */ if (!atomic_add_negative(sleepers - 1, &sem->count)) {
/* 4 */ sem->sleepers = 0; //這時sem->count=0
- break;
- }
/* 4 */ sem->sleepers = 1; /* us - see -1 above */ // 這時sem
- ->count
- =-1
- spin_unlock_irq(&semaphore_lock);
- schedule();
- tsk->state = TASK_UNINTERRUPTIBLE;
- spin_lock_irq(&semaphore_lock);
- }
- spin_unlock_irq(&semaphore_lock);
- remove_wait_queue(&sem->wait, &wait);
- tsk->state = TASK_RUNNING;
- wake_up(&sem->wait);
- }
- }
相應(yīng)的up()函數(shù)為
- void up(struct semaphore*sem)
- {
sem->count++; //為原子操作
- if(sem->count<=0)
- {
//喚醒等待隊列中的一個符合條件的進程(因為每個進程都加了TASK_EXCLUSIVE標(biāo)志)
。
};
假設(shè)開始時,count=1;sleepers=0
#p#
當(dāng)進程A執(zhí)行down()時,引用計數(shù)count--,如果這時它的值大于等于0,則從down()中直接返回。如果count少于0,則A的state改為TASK_INTERRUPTIBLE后進入這個信號量的等待隊列中,同時使sleepers++;然后重新計算count=sleepers - 1 + count,若這時引用計數(shù)仍小于0(一般情況下應(yīng)為-1,因為count = - sleepers,不過在SMP結(jié)構(gòu)中,期間別的進程可能執(zhí)行了up()和down()從而使得引用計數(shù)的值可能變化),則執(zhí)行進程切換。
當(dāng)進程A又獲得機會運行時,它先執(zhí)行wake_up(&sem->wait)操作,喚醒等待隊列里的一個進程,接著它進入臨界區(qū),從臨界區(qū)出來時執(zhí)行up()操作,使sem>count++,(如果進程A是從down()中直接返回,因為這時等待隊列一定為空,所以它不用執(zhí)行wake_up()操作,直接進入臨界區(qū),在從臨界區(qū)出來時一樣執(zhí)行up()操作,使 sem->count++)。這時如果count的值小于等于0,這表明在它在臨界區(qū)期間又有一個進程(可能就是它進入臨界區(qū)時喚醒的那個進程)進入睡眠了,則執(zhí)行wake_up()操作,反之,如果count的值已經(jīng)大于0,這表明在它在臨界區(qū)期間沒有別的進程(包括在它進入臨界區(qū)時被它喚醒過的那個進程)進入睡眠,那么它就可以直接返回了。
從被喚醒的那個進程看看,如果在喚醒它的進程沒執(zhí)行up()之前它就得到了運行機會,這時它又重新計算count=sleepers - 1 + count=-1;從而sleepers被賦值1;這時它又必須進行調(diào)度讓出運行的機會給別的進程,自己去睡眠。這正是發(fā)生在喚醒它的進程在臨界區(qū)時運行的時候。
如果是在喚醒它的進程執(zhí)行了up()操作后它才得到了運行機會,而且在喚醒它的進程在臨界區(qū)期間時沒別的進程執(zhí)行down(),則count的值在進程執(zhí)行up()之前依然為0,這時在up()里面就不必要再執(zhí)行wake_up()函數(shù)了。
可以通過一個例子來說明具體的實現(xiàn)。設(shè)開始sem->count=sem->sleepers=0。也就是有鎖但無等待隊列 (一個進程已經(jīng)在運行中)。先后分別進行3個down()操作,和3個up()操作,如下:
為了闡述方便,只保留了一些會改變sleepers和count值的步驟,并且遵循從左到右依次進行的原則。
down1:
count(0->-1),sleepers(0->1),sleepers-1+count(-1),count(-1),sleepers(1),調(diào)度
down2:
count(-1->-2),sleepers(1->2),sleepers-1+count(-1),count(-1),sleepers(1),調(diào)度
down3:
count(-1->-2),sleepers(1->2),sleepers-1+count(-1),count(-1),sleepers(1),調(diào)度
up1:
count(-1->0),喚醒一個睡眠進程(設(shè)為1),(進程1得到機會運行)sleepers-1+count
(0),count(0),sleepers(0),break,
喚醒另一個睡眠進程(設(shè)為2),
?。ㄟM程2得到機會運行)sleepers-1+count(-1),count(-1),sleepers(1),調(diào)度(沒達到
條件,又得睡覺)
也可能是這樣的:
up1`:
count(-1->0),喚醒一個睡眠進程(設(shè)為1),(進程1得到機會運行)sleepers-1+count
(0),count(0),sleepers(0),break,
喚醒另一個睡眠進程(設(shè)為2),
進程2在以后才得到機會運行)
up2:
count(-1->0),(因為count<=0)喚醒一個睡眠進程(設(shè)為2),
進程2得到機會運行)sleepers-+count(0) , count(0) , sleepers(0) ,break,
喚醒另一個睡眠進程(設(shè)為3),
進程3得到機會運行)sleepers-1+count(-1),count(-1),sleepers(1),調(diào)度(沒達到條
件,又得睡覺)
對應(yīng)上面的1`:
up2`:
count(0->1),(因為count>0,所以直接返回)
進程2得到機會運行)sleepers-1+count(0),count(0),sleepers(0),break,
喚醒另一個睡眠進程,(設(shè)為3)
up3:
count(-1->0),(因為count<=0)喚醒一個睡眠進程(設(shè)為3),
進程3得到機會運行)sleepers-1+count(0),count(0),sleepers(0),break,
喚醒另一個睡眠進程(這時隊列里沒進程了)
進程3運行結(jié)束,執(zhí)行up(), 使count =1 ,這時變成沒鎖狀態(tài) )
對應(yīng)上邊的2`:
up3`:
count(0->1),(因為count>0,所以直接返回)
進程3得到機會運行)sleepers-1+count(0),count(0),sleepers(0),break,
喚醒另一個睡眠進程(這時隊列里沒進程了)
進程3運行結(jié)束,執(zhí)行up(), 使count =1 ,這時變成沒鎖狀態(tài) )
當(dāng)然,還有另一種情況,就是up()操作和down()操作是交*出現(xiàn)的,
一般的規(guī)律就是,如果進程在臨界區(qū)期間又有進程(無論是哪個進程,新來的還是剛被喚醒的那個)進入睡眠,就會令count的值從0變?yōu)?1,從而進程在從臨界區(qū)出來執(zhí)行up()里就必須執(zhí)行一次wake_up(),以確保所有的進程都能被喚醒,因為多喚醒幾個是沒關(guān)系的。如果進程在臨界區(qū)期間沒有別的進程進入睡眠,則從臨界區(qū)出來執(zhí)行up()時就用不著去執(zhí)行wake_up()了(當(dāng)然,執(zhí)行了也沒什么影響,不過多余罷了)。
而為什么要把wake_up()和count++分開呢,可以從上面的up1看出來,例如,進程2第一次得到機會運行時,本來這時喚醒它的進程還沒執(zhí)行up()的,但有可能其它進程執(zhí)行了 up()了,所以真有可能會發(fā)現(xiàn)count==1的情況,這時它就真的不用睡覺了,令count=sl eepers=0,就可以接著往下執(zhí)行了。
還可看出一點,一般的,( count ,sleepers)的值的取值范圍為(n ,0)[n>0] 和(0
,0
)和 (1 ,-1)。
#p#
下邊看看spin_lock機制。
Spin_lock采用的方式是讓一個進程運行,另外的進程忙等待,由于在只有一個cpu
的機
器(UP)上微觀上只有一個進程在運行。所以在UP中,spin_lock和spin_unlock就都是空
的了。
在SMP中,spin_lock()和spin_unlock()定義如下。
- typedef struct {
- volatile unsigned int lock;
- } spinlock_t;
static inline void spin_lock(spinlock_t *lock)
- {
- __asm__ __volatile__(
- "\n1:\t"
- "lock ; decb %0\n\t"
- "js 2f\n" //lock->lock< 0 ,jmp 2 forward
- ".section .text.lock,\"ax\"\n"
- "2:\t"
- "cmpb $0,%0\n\t" //wait lock->lock==1
- "rep;nop\n\t"
- "jle 2b\n\t"
- "jmp 1b\n"
- ".previous"
- :"=m" (lock->lock) : : "memory");
- }
static inline void spin_unlock(spinlock_t *lock)
- {
- __asm__ __volatile__(
- "movb $1,%0"
- :"=m" (lock->lock) : : "memory"); //lock->lock=1
- }
一般是如此使用:
- #define SPIN_LOCK_UNLOCKED (spinlock_t) { 1 }
- spinlock_t xxx_lock = SPIN_LOCK_UNLOCKED;
- spin_lock_(&xxx_lock)
- ...
- critical section
- ...
- spin_unlock (&xxx_lock)
可以看出,它和semaphore機制解決的都是兩個進程的互斥問題,都是讓一個進程退出臨界區(qū)后另一個進程才進入的方法,不過sempahore機制實行的是讓進程暫時讓出CPU,進入等待隊列等待的策略,而spin_lock實行的卻是卻進程在原地空轉(zhuǎn),等著另一個進程結(jié)束的策略。
下邊考慮中斷對臨界區(qū)的影響。要互斥的還有進程和中斷服務(wù)程序之間。當(dāng)一個進程在執(zhí)行一個臨界區(qū)的代碼時,可能發(fā)生中斷,而中斷函數(shù)可能就會調(diào)用這個臨界區(qū)的代碼,不讓它進入的話就會產(chǎn)生死鎖。這時一個有效的方法就是關(guān)中斷了。
- #define local_irq_save(x) __asm__ __volatile__("pushfl ; popl %0 ;
- cli":
- "=g" (x): /* no input */ :"memory")
- #define local_irq_restore(x) __asm__ __volatile__("pushl %0 ; popfl": /*
- no
- output */ :"g" (x):"memory")
- #define local_irq_disable() __asm__ __volatile__("cli": : :"memory")
- #define local_irq_enable() __asm__ __volatile__("sti": : :"memory")
- #define cpu_bh_disable(cpu) do { local_bh_count(cpu)++; barrier(); } while (
- 0)
- #define cpu_bh_enable(cpu) do { barrier(); local_bh_count(cpu)--; } while
- (0
- )
- #define local_bh_disable() cpu_bh_disable(smp_processor_id())
- #define local_bh_enable() cpu_bh_enable(smp_processor_id())
對于UP來說,上面已經(jīng)是足夠了,不過對于SMP來說,還要防止來自其它cpu的影響,這
時解決的方法就可以把上面的spin_lock機制也綜合進來了。
- #define spin_lock_irqsave(lock, flags) do {
- local_irq_save(flags); sp
- in_lock(lock); } while (0)
- #define spin_lock_irq(lock) do { local_irq_disable();
- spin_lock(lo
- ck); } while (0)
- #define spin_lock_bh(lock) do { local_bh_disable();
- spin_lock(loc
- k); } while (0)
- #define spin_unlock_irqrestore(lock, flags) do { spin_unlock(lock); local_i
- rq_restore(flags); } while (0)
- #define spin_unlock_irq(lock) do { spin_unlock(lock);
- local_irq_enable();
- } while (0)
- #define spin_unlock_bh(lock) do { spin_unlock(lock);
- local_bh_enable();
- } while (0)
前面說過,對于UP來說,spin_lock()是空的,所以以上的定義就一起適用于UP 和SM
P的情形了。
而read_lock_irqsave(lock, flags) , read_lock_irq(lock),
read_lock_bh(lock) 和
write_lock_irqsave(lock, flags) , write_lock_irq(lock),
write_lock_bh(lock
) 就是spin_lock的一個小小的變型而己了。
一口能看完的都是大蝦,希望本文能對你們有很大的幫助。
【編輯推薦】
- Linux就這樣被黑客入侵
- Linux Kernel 2.6.37發(fā)布 Ext4性能提升
- 如何學(xué)好Linux 十一大建議
- 從頭學(xué)習(xí)Linux基礎(chǔ)的8大建議
- 眼見為實 近看Sandy Bridge內(nèi)核真身
- 2010年度報告:是誰在編寫Linux內(nèi)核?
- Linux內(nèi)核編譯之高手教程