Kestrel中的Journal.scala類詳解
本文是Scala代碼實(shí)例之Kestrel的第六部分,講述PersistentQueue中的Journal.scala類。
在PersistentQueue之下,有一個Journal.scala的類,支撐了消息隊列的存儲問題。這是Kestrel提供的另外一個特性:通過文件系統(tǒng)保存消息隊列,避免服務(wù)重啟的時候,Kestrel的隊列丟失。
通過前面一段時間的閱讀,我們對Scala的語法已經(jīng)有一個基本的把握,所以在閱讀Journal的時候,我們就更注重實(shí)現(xiàn)的方式,而不是語法細(xì)節(jié)了。當(dāng)然Journal也沒有太多的語法細(xì)節(jié)需要講的了。當(dāng)然出了還沒有詳細(xì)說過的case class和case object。
在Journal.scala的開始部分就定義了一個abstract class類JournalItem,并且定義了它的許多子類,這些子類是用來和PersistentQueue進(jìn)行消息傳遞的。case class/case object是一種特殊的class/object,其功能是在對象里面增加了幾個功能
1. 把所有的建構(gòu)函數(shù)的var變成val,也就是變成了不可變的常量
2.自動實(shí)現(xiàn)了equal, hashCode和toString三個方法
3.當(dāng)對象出現(xiàn)在case之后的時候,會自動apply出一個對象,對象的值和創(chuàng)建的時候一樣,這個功能保證了可以和match…case語法可以寫得很簡練。
關(guān)于Case class的具體說明可以參考:CaseClasses和MatchingOnCaseClasses。關(guān)于第三條特性,還可以參考CompanionObjects。
簡單的理解,我們就把case object/case class當(dāng)作消息傳遞中需要使用的對象類型就可以了。
Journal使用了noi的FileChannel,來處理文件的讀取和存儲。核心的算法,可以只看readJournalEntry和replay兩個方法。readJournalEntry的功能是從文件中讀取數(shù)據(jù),并且根據(jù)格式組成各種case class/case object,并且同時返回字節(jié)數(shù)。而在上層的方法,比如replay,則根據(jù)得到的不同數(shù)據(jù)類型,調(diào)用更上層的函數(shù)f(case class/case object)。
我們回到PersistentQueue中看replayJournal的時候,發(fā)現(xiàn)它將調(diào)用replay后得到的一系列的case class轉(zhuǎn)義成為在PersistentQueue中需要執(zhí)行的各種命令——所以這個方法的名字叫做replay!就是回放的意思。
當(dāng)系統(tǒng)重啟的時候,打開每個queue之前都需要一段回放的時間,把文件系統(tǒng)中記錄的當(dāng)時的整個存取過程重新回放一次,通過回放來重建內(nèi)存中的隊列?;剡^來再看Journal.scala的時候,我們就更清晰的知道,文件存儲的不是當(dāng)時的隊列狀態(tài),而是每一次系統(tǒng)執(zhí)行的軌跡。所以,Journal對整個Kestrel消息隊列的開銷才會很小。
但是另外一個問題隨之而來,如果記錄所有的操作過程,那么這個文件不是只會增大,不會縮小么?為了解決這個問題,Journal.scala實(shí)現(xiàn)了一個叫做roll的機(jī)制。從PersisitentQueue中的add方法中,我們可以看到這樣的代碼:
- if (keepJournal() && !journal.inReadBehind) {
- if (journal.size > maxJournalSize() * maxJournalOverflow() && queueSize < maxJournalSize()) {
- // force re-creation of the journal.
- log.info("Rolling journal file for '%s' (qsize=%d)", name, queueSize)
- journal.roll(xidCounter, openTransactionIds map { openTransactions(_) }, queue)
- }
- if (queueSize >= maxMemorySize()) {
- log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)
- journal.startReadBehind
- }
- }
如果發(fā)生了Journal文件的尺寸太大,但是實(shí)際的Queue尺寸也沒有滿的時候,就啟動roll進(jìn)程來重新建立一個Journal文件。處理的方法也很簡單,就是把當(dāng)前內(nèi)存中的隊列直接寫入到Journal對應(yīng)的文件中,變成一連串的add。這么做,是不是一個很好的做法?只有一種意外的情況,那就是現(xiàn)存在消息隊列里面的數(shù)據(jù)很多,那么重建Journal的時間就需要很多。作者也考慮到了這個問題,所有要求queue實(shí)際的size必須小于Journal所能存儲的量的時候,才會做roll的操作,也就是說,當(dāng)隊列里面有很多的事件沒有處理的時候,就算硬盤占用得再多,也不會啟動roll方法。而解決的方法是,當(dāng)內(nèi)存中的Queue太大,大到超過了***的內(nèi)存使用限制的時候,啟動readBehind模式。
當(dāng)readBehind模式啟動之后,會對文件增加一個read句柄,每次從內(nèi)存里面remove掉消息的時候,就會嘗試從文件中讀取消息放到內(nèi)存里面。在這樣的模式下,內(nèi)存就一致保持著最滿的隊列。更多的消息就先直接存儲到文件中,直到文件中的read指針和write指針重合,也就是說所有在文件系統(tǒng)中的消息都已經(jīng)被處理完畢了,系統(tǒng)就會重新切換回正常的模式。
在這種模式下,只要硬盤的數(shù)量足夠大,我們基本上可以把這個消息隊列理解為無限長……但是在readbehind模式下,是不會進(jìn)行roll的操作的。所以——大家需要注意的是,在配置中,maxJournalSize必須要小于maxMemorySize,否則這兩個機(jī)制就會打架了。而maxJournalSize這個數(shù)值也不應(yīng)該很大,這樣就能保證每次roll的效率會很快(因為roll的效率是取決于事件占用內(nèi)存的數(shù)量,也就是maxJournalSize的),超過這個值,系統(tǒng)就不會roll。
決定是否roll。還有一個數(shù)值就是maxJournalOverflow,這是一個很好的設(shè)計,相當(dāng)于Journal文件的利用率,比如說Overflow設(shè)置為5,表示現(xiàn)在有效的消息隊列數(shù)據(jù),只有整個 Journal 文件大小的 1/5。
假設(shè)我們平均每個消息的數(shù)據(jù)占有1K,那么其他的指令信息基本可以被忽略(因為都只有幾個字節(jié)而已),所以O(shè)verflow的比例相當(dāng)于總消息數(shù)量 / 還沒有處理過的消息數(shù)量。所以這個數(shù)值的上限取決于replay的效率。也就是讀取文件的速度,比如說,我們覺得啟動的時候,讀取100M的文件,大概需要10s,是我們可以接受的,而每個消息字節(jié)平均是1K,roll一次100個消息需要100ms,是可以接受的。那么 maxJournalOverflow = 100M / 100 * 1K = 1000。不過實(shí)際情況可能是roll的次數(shù)會更多一些,因為當(dāng)內(nèi)存中的消息隊列只有10個的時候,硬盤超過10M,就會觸發(fā)roll操作了。
但是設(shè)置1000的目的是,在最壞的情況可以接受,而不是在一半的情況下效率更高。這是需要注意的。
【相關(guān)閱讀】