Redis源碼學(xué)習(xí)之事件驅(qū)動(dòng)
Redis基于多路復(fù)用技術(shù)實(shí)現(xiàn)了一套簡(jiǎn)單的事件驅(qū)動(dòng)庫,代碼在ae.h、ae.c以及ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select.c這幾個(gè)文件中。其中ae表示的是antirez eventloop的意思。
Redis里面包含兩種事件類型:FileEvent和TimeEvent。
Redis采用IO多路復(fù)用技術(shù),所有的事件都是在一個(gè)線程中進(jìn)行處理。Redis的事件驅(qū)動(dòng)模型可以以以下為代碼進(jìn)行表示:
- int main(int argc,char **argv)
- {
- while(true) {
- // 等待事件到來:wait4Event();
- // 處理事件:processEvent()
- }
- }
在一個(gè)死循環(huán)中等待事件的到來,然后對(duì)事件進(jìn)行處理,以此往復(fù)。這就是一個(gè)最經(jīng)典的網(wǎng)絡(luò)編程模型。
1.基本數(shù)據(jù)結(jié)構(gòu)
aeEventLoop
aeEventLoop是Redis中事件驅(qū)動(dòng)模型的核心,封裝了整個(gè)事件循環(huán),其中每個(gè)字段解釋如下:
- maxfd:已經(jīng)接受的最大的文件描述符。
- setsize:當(dāng)前循環(huán)中所能容納的文件描述符的數(shù)量。
- timeEventNextId:下一個(gè)時(shí)間事件的ID.
- lastTime:上一次被訪問的時(shí)間,用來檢測(cè)系統(tǒng)時(shí)鐘是否被修改。
- events:指針,指向保存所有注冊(cè)的事件的數(shù)組首地址。
- fired:指針,保存所有已經(jīng)買被觸發(fā)的事件的數(shù)組首地址。
- timeEventHead:Redis用一個(gè)鏈表來存儲(chǔ)所有的時(shí)間事件,timeEventHead是指向這個(gè)鏈表的首節(jié)點(diǎn)指針。
- stop:停止整個(gè)事件循環(huán)。
- apiData:指針,指向epoll結(jié)構(gòu)。
- beforeSleep:函數(shù)指針。每次實(shí)現(xiàn)循環(huán)的時(shí)候,在阻塞直到時(shí)間到來之前,會(huì)先調(diào)用這個(gè)函數(shù)。
aeFileEvent和aeTimeEvent
這兩個(gè)結(jié)構(gòu)分別表示文件事件和時(shí)間事件,定義如下
- typedef struct aeFileEvent {
- int mask; /* one of AE_(READABLE|WRITABLE) */
- aeFileProc *rfileProc; // 函數(shù)指針,寫事件處理
- aeFileProc *wfileProc; // 函數(shù)指針,讀事件處理
- void *clientData; // 具體的數(shù)據(jù)
- } aeFileEvent;
其中mask表示文件事件類型掩碼,可以是AE_READABLE表示是可讀事件,AE_WRITABLE為可寫事件。aeFileProc是函數(shù)指針。
- /* Time event structure */
- typedef struct aeTimeEvent {
- long long id; // 事件ID
- long when_sec; // 事件觸發(fā)的時(shí)間:s
- long when_ms; // 事件觸發(fā)的時(shí)間:ms
- aeTimeProc *timeProc; // 函數(shù)指針
- aeEventFinalizerProc *finalizerProc; // 函數(shù)指針:在對(duì)應(yīng)的aeTieEvent節(jié)點(diǎn)被刪除前調(diào)用,可以理解為aeTimeEvent的析構(gòu)函數(shù)
- void *clientData; // 指針,指向具體的數(shù)據(jù)
- struct aeTimeEvent *next; // 指向下一個(gè)時(shí)間事件指針
- } aeTimeEvent;
aeFiredEvent
aeFiredEvent結(jié)構(gòu)表示一個(gè)已經(jīng)被觸發(fā)的事件,結(jié)果如下:
- /* A fired event */
- typedef struct aeFiredEvent {
- int fd; // 事件被觸發(fā)的文件描述符
- int mask; // 被觸發(fā)事件的掩碼,表示被觸發(fā)事件的類型
- } aeFiredEvent;
fd表示事件發(fā)生在哪個(gè)文件描述符上面,mask用來表示具體事件的類型。
aeApiState
Redis底層采用IO多路復(fù)用技術(shù)實(shí)現(xiàn)高并發(fā),具體實(shí)現(xiàn)可以采用kqueue、select、epoll等技術(shù)。對(duì)于Linux來說,epoll的性能要優(yōu)于select,所以以epoll為例來進(jìn)行分析。
- typedef struct aeApiState {
- int epfd;
- struct epoll_event *events;
- } aeApiState;
aeApiState封裝了跟epoll相關(guān)的數(shù)據(jù),epfd保存epoll_create()返回的文件描述符。
具體實(shí)現(xiàn)細(xì)節(jié)
事件循環(huán)啟動(dòng):aeMain()
事件驅(qū)動(dòng)的啟動(dòng)代碼位于ae.c的aeMain()函數(shù)中,代碼如下:
從aeMain()方法中可以看到,整個(gè)事件驅(qū)動(dòng)是在一個(gè)while()循環(huán)中不停地執(zhí)行aeProcessEvents()方法,在這個(gè)方法中執(zhí)行從客戶端發(fā)送過來的請(qǐng)求。
初始化:aeCreateEventLoop()
aeEventLoop的初始化是在aeCreateEventLoop()方法中進(jìn)行的,這個(gè)方法是在server.c中的initServer()中調(diào)用的。實(shí)現(xiàn)如下:
在這個(gè)方法中主要就是給aeEventLoop對(duì)象分配內(nèi)存然后并進(jìn)行初始化。其中關(guān)鍵的地方有:
1、調(diào)用aeApiCreate()初始化epoll相關(guān)的數(shù)據(jù)。aeApiCreate()實(shí)現(xiàn)如下:
在aeApiCreate()方法中主要完成以下三件事:
- 分配aeApiState結(jié)構(gòu)需要的內(nèi)存。
- 調(diào)用epoll_create()方法生成epoll的文件描述符,并保存在aeApiState.epfd字段中。
- 把第一步分配的aeApiState的內(nèi)存地址保存在EventLoop->apidata字段中。
2、初始化events中的mask字段為為AE_NONE。
生成fileEvent:aeCreateFileEvent()
Redis使用aeCreateFileEvent()來生成fileEvent,代碼如下:
aeCreateFileEvent()方法主要做了以下三件事:
- 檢查新增的fd是否超過所能容納最大值。
- 調(diào)用aeApiAddEvent()方法把對(duì)應(yīng)的fd以mask模式添加到epoll監(jiān)聽器中。
- 設(shè)置相應(yīng)的字段值。
其中最關(guān)鍵的步驟是第二步,aeApiAddEvent()方法如下:
生成timeEvent:aeCreateTimeEvent()
aeCreateTimeEvent()方法主要是用來生成timeEvent節(jié)點(diǎn),其實(shí)現(xiàn)比較簡(jiǎn)單,代碼如下所示:
處理timeEevnt:processTimeEvents()
Redis在processTimeEvents()方法中來處理所有的timeEvent,實(shí)現(xiàn)如下:
- static int processTimeEvents(aeEventLoop *eventLoop) {
- int processed = 0;
- aeTimeEvent *te, *prev;
- long long maxId;
- time_t now = time(NULL);
- /**
- * 如果系統(tǒng)時(shí)間被調(diào)整到將來某段時(shí)間然后又被設(shè)置回正確的時(shí)間,
- * 這種情況下鏈表中的timeEvent有可能會(huì)被隨機(jī)的延遲執(zhí)行,因
- * 此在這個(gè)情況下把所有的timeEvent的觸發(fā)時(shí)間設(shè)置為0表示及執(zhí)行
- */
- if (now < eventLoop->lastTime) {
- te = eventLoop->timeEventHead;
- while(te) {
- te->when_sec = 0;
- te = te->next;
- }
- }
- eventLoop->lastTime = now; // 設(shè)置上次運(yùn)行時(shí)間為now
- prev = NULL;
- te = eventLoop->timeEventHead;
- maxId = eventLoop->timeEventNextId-1;
- while(te) {
- long now_sec, now_ms;
- long long id;
- /**
- * 刪除已經(jīng)被標(biāo)志位 刪除 的時(shí)間事件
- */
- if (te->id == AE_DELETED_EVENT_ID) {
- aeTimeEvent *next = te->next;
- if (prev == NULL)
- eventLoop->timeEventHead = te->next;
- else
- prev->next = te->next;
- if (te->finalizerProc)
- // 在時(shí)間事件節(jié)點(diǎn)被刪除前調(diào)用finlizerProce()方法
- te->finalizerProc(eventLoop, te->clientData);
- zfree(te);
- te = next;
- continue;
- }
- if (te->id > maxId) {
- /**
- * te->id > maxId 表示當(dāng)前te指向的timeEvent為當(dāng)前循環(huán)中新添加的,
- * 對(duì)于新添加的節(jié)點(diǎn)在本次循環(huán)中不作處理。
- * PS:為什么會(huì)出現(xiàn)這種情況呢?有可能是在timeProc()里面會(huì)注冊(cè)新的timeEvent節(jié)點(diǎn)?
- * 對(duì)于當(dāng)前的Redis版本中不會(huì)出現(xiàn)te->id > maxId這種情況
- */
- te = te->next;
- continue;
- }
- aeGetTime(&now_sec, &now_ms);
- if (now_sec > te->when_sec ||
- (now_sec == te->when_sec && now_ms >= te->when_ms))
- {
- // 如果當(dāng)前時(shí)間已經(jīng)超過了對(duì)應(yīng)的timeEvent節(jié)點(diǎn)設(shè)置的觸發(fā)時(shí)間,
- // 則調(diào)用timeProc()方法執(zhí)行對(duì)應(yīng)的任務(wù)
- int retval;
- id = te->id;
- retval = te->timeProc(eventLoop, id, te->clientData);
- processed++;
- if (retval != AE_NOMORE) {
- // 要執(zhí)行多次,則計(jì)算下次執(zhí)行時(shí)間
- aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
- } else {
- // 如果只需要執(zhí)行一次,則把id設(shè)置為-1,再下次循環(huán)中刪除
- te->id = AE_DELETED_EVENT_ID;
- }
- }
- prev = te;
- te = te->next;
- }
- return processed;
- }
在這個(gè)方法中會(huì)
- 判斷系統(tǒng)時(shí)間有沒有調(diào)整過,如果調(diào)整過,則會(huì)把timeEvent鏈表中的所有的timeEvent的觸發(fā)時(shí)間設(shè)置為0,表示立即執(zhí)行。
- 對(duì)timeEvent鏈表進(jìn)行遍歷,對(duì)于每個(gè)timeEvent節(jié)點(diǎn),如果有:
- 返回為AE_NOMORE,表示當(dāng)前timeEvent節(jié)點(diǎn)屬于一次性事件,標(biāo)記該節(jié)點(diǎn)ID為AE_DELETED_EVENT_ID,表示刪除節(jié)點(diǎn),該節(jié)點(diǎn)將會(huì)在下一輪的循環(huán)中被刪除。
- 返回不是AE_NOMORE,表示當(dāng)前timeEvent節(jié)點(diǎn)屬于周期性事件,需要多次執(zhí)行,調(diào)用aeAddMillisecondsToNow()方法設(shè)置下次被執(zhí)行時(shí)間。
- 如果已經(jīng)被標(biāo)記為刪除(AE_DELETED_EVENT_ID),則立即釋放對(duì)應(yīng)節(jié)點(diǎn)內(nèi)存,遍歷下個(gè)節(jié)點(diǎn)。
- 如果id大于maxId,則表示當(dāng)前節(jié)點(diǎn)為本次循環(huán)中新增節(jié)點(diǎn),咋本次循環(huán)中不錯(cuò)處理,繼續(xù)下個(gè)節(jié)點(diǎn)。
- 如果當(dāng)前節(jié)點(diǎn)的觸發(fā)時(shí)間大于當(dāng)前時(shí)間,則調(diào)用對(duì)應(yīng)節(jié)點(diǎn)的timeProc()方法執(zhí)行任務(wù)。根據(jù)timeProc()方法的返回,又分為兩種情況:
處理所有事件:aeProcessEvents()
Redis中所有的事件,包括timeEvent和fileEvent都是在aeProcessEvents()方法中進(jìn)行處理的,剛方法實(shí)現(xiàn)如下:
- /* Process every pending time event, then every pending file event
- * (that may be registered by time event callbacks just processed).
- * Without special flags the function sleeps until some file event
- * fires, or when the next time event occurs (if any).
- *
- * If flags is 0, the function does nothing and returns.
- * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
- * if flags has AE_FILE_EVENTS set, file events are processed.
- * if flags has AE_TIME_EVENTS set, time events are processed.
- * if flags has AE_DONT_WAIT set the function returns ASAP until all
- * the events that's possible to process without to wait are processed.
- *
- * The function returns the number of events processed. */
- int aeProcessEvents(aeEventLoop *eventLoop, int flags)
- {
- int processed = 0, numevents;
- /**
- * 既沒有時(shí)間事件也沒有文件事件,則直接返回
- */
- if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
- /**
- * -1 == eventloop->maxfd 表示還么有任何aeFileEvent被添加到epoll
- * 事件循環(huán)中進(jìn)行監(jiān)聽
- */
- if (eventLoop->maxfd != -1 ||
- ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
- int j;
- aeTimeEvent *shortest = NULL;
- struct timeval tv, *tvp;
- /**
- * 如果有aeFileEvent需要處理,就先要從所有待處理的
- * aeTimeEvent事件中找到最近的將要被執(zhí)行的aeTimeEvent節(jié)點(diǎn)
- * 并結(jié)算該節(jié)點(diǎn)觸發(fā)時(shí)間
- */
- if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
- shortest = aeSearchNearestTimer(eventLoop);
- if (shortest) {
- long now_sec, now_ms;
- aeGetTime(&now_sec, &now_ms);
- tvp = &tv;
- /* How many milliseconds we need to wait for the next
- * time event to fire? */
- // 計(jì)算epoll_wait()需要等待的時(shí)間
- long long ms =
- (shortest->when_sec - now_sec)*1000 +
- shortest->when_ms - now_ms;
- if (ms > 0) {
- tvp->tv_sec = ms/1000;
- tvp->tv_usec = (ms % 1000)*1000;
- } else {
- tvp->tv_sec = 0;
- tvp->tv_usec = 0;
- }
- } else {
- // 如果flags設(shè)置了AE_DONT_WAIT,則設(shè)置epoll_wait()等待時(shí)間為0,
- // 即立刻從epoll中返回
- if (flags & AE_DONT_WAIT) {
- tv.tv_sec = tv.tv_usec = 0;
- tvp = &tv;
- } else {
- /* Otherwise we can block */
- tvp = NULL; /* wait forever */
- }
- }
- // 調(diào)用aeApiPoll()進(jìn)行阻塞等待事件的到來,等待時(shí)間為tvp
- numevents = aeApiPoll(eventLoop, tvp);
- for (j = 0; j < numevents; j++) {
- aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
- int mask = eventLoop->fired[j].mask;
- int fd = eventLoop->fired[j].fd;
- int rfired = 0;
- /* note the fe->mask & mask & ... code: maybe an already processed
- * event removed an element that fired and we still didn't
- * processed, so we check if the event is still valid. */
- // fe->mask && mask 的目的是確保對(duì)應(yīng)事件時(shí)候還有效
- if (fe->mask & mask & AE_READABLE) {
- rfired = 1;
- fe->rfileProc(eventLoop,fd,fe->clientData,mask);
- }
- if (fe->mask & mask & AE_WRITABLE) {
- if (!rfired || fe->wfileProc != fe->rfileProc)
- fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- }
- processed++;
- }
- }
- /* Check time events */
- if (flags & AE_TIME_EVENTS)
- // 處理aeTimeEvent
- processed += processTimeEvents(eventLoop);
- return processed; /* return the number of processed file/time events */
- }
該方法的入?yún)lag表示要處理哪些事件,可以取以下幾個(gè)值 :
- AE_ALL_EVENTS:timeEvent和fileEvent都會(huì)處理。
- AE_FILE_EVENTS:只處理fileEvent。
- AE_TIME_EVENTS:只處理timeEvent。
- AE_DONT_WAIT:要么立馬返回,要么處理完那些不需要等待的事件之后再立馬返回。
aeProcessEvents()方法會(huì)做下面幾件事:
- 判斷傳入的flag的值,如果既不包含AE_TIME_EVENTS也不包含AE_FILE_EVENTS則直接返回。
- 計(jì)算如果有aeFileEvent事件需要進(jìn)行處理,則先計(jì)算epoll_wait()方法需要阻塞等待的時(shí)間,計(jì)算方式如下:
- 先從aeTimeEvent事件鏈表中找到最近的需要被觸發(fā)的aeTimeEvent節(jié)點(diǎn)并計(jì)算需要被觸發(fā)的時(shí)間,該被觸發(fā)時(shí)間則為epoll_wait()需要等待的時(shí)間。
- 如果沒有找到最近的aeTimeEvent節(jié)點(diǎn),表示沒有aeTimeEvent節(jié)點(diǎn)被加入鏈表,則判斷傳入的flags是否包含AE_DONT_WAIT選項(xiàng),則設(shè)置epoll_wait()需要等待時(shí)間為0,表示立即返回。
- 如果沒有設(shè)置AE_DONT_WAIT,則設(shè)置需要等待時(shí)間為NULL,表示epoll_wait()一直阻塞等待知道有fileEvent事件到來。
- 調(diào)用aeApiPoll()方法阻塞等待事件的到來,阻塞時(shí)間為第二步中計(jì)算的時(shí)間。aeApiPoll()實(shí)現(xiàn)見文末:
- aeApiPoll()會(huì)做下面幾件事:
- 根據(jù)傳入的tvp計(jì)算需要阻塞的時(shí)間,然后調(diào)用epoll_wait()進(jìn)行阻塞等待。
- 有事件到來之后先計(jì)算對(duì)應(yīng)事件的類型。
- 把事件發(fā)生的fd以及對(duì)應(yīng)的類型mask拷貝到fired數(shù)組中。
- aeApiPoll()會(huì)做下面幾件事:
- 從aeApiPoll()方法返回之后,所有事件已經(jīng)就緒了的fd以及對(duì)應(yīng)事件的類型mask已經(jīng)保存在eventLoop->fired[]數(shù)組中。依次遍歷fired數(shù)組,根據(jù)mask類型,執(zhí)行對(duì)應(yīng)的frileProc()或者wfileProce()方法。
- 如果傳入的flags中有AE_TIME_EVENTS,則調(diào)用processTimeEvents()執(zhí)行所有已經(jīng)到時(shí)間了的timeEvent。
本系列

































