細(xì)說(shuō)Kestrel.scala中的PersistentQueue
本文是Scala代碼實(shí)例之Kestrel的第五部分,繼續(xù)講述PersistentQueue處理消息隊(duì)列并發(fā)請(qǐng)求的方式。
回顧一下之前我們讀過(guò)的兩個(gè)文件,Kestrel.scala, QueueCollection.scala。Kestrel.scala是啟動(dòng)文件,并且通過(guò)一個(gè)actor,保持整個(gè)項(xiàng)目不會(huì)因?yàn)闆](méi)有線程運(yùn)行而退出,同時(shí)注冊(cè)了一個(gè)acceptor,當(dāng)建立起新的鏈接的時(shí)候,訪問(wèn) KestrelHandler.scala(這個(gè)稍后我們?cè)僮x)。QueueCollection.scala,維護(hù)一個(gè)PersistentQueue的隊(duì)列,如果訪問(wèn)的queue_name不存在,則創(chuàng)建一個(gè),如果存在,就對(duì)相應(yīng)的QueueCollection進(jìn)行操作。如果留心的話,我們還可以看到QueueCollection在啟動(dòng)的時(shí)候,queue_name的來(lái)源是一個(gè)文件目錄。
我們就從這個(gè)入口繼續(xù)往下,看看PersistentQueue是如何處理消息隊(duì)列的并發(fā)請(qǐng)求的:
在前幾篇文章里面,我們?cè)?jīng)提到過(guò)PersistentQueue有兩個(gè)“類”,一個(gè)是object PersistentQueue,一個(gè)是class PersistentQueue。而object在scala是一個(gè)單例模式,也就是singleton。也可以看做是只有static類型的java類?,F(xiàn)在讓我們關(guān)注一下,看看class PersistentQueue和object Persistent之間的關(guān)系是怎樣的。
剛開(kāi)始的一段代碼有點(diǎn)嚇人:
- class OverlaySetting[T](base: => T) {
- @volatile private var local: Option[T] = None
- def set(value: Option[T]) = local = value
- def apply() = local.getOrElse(base)
- }
我們先跳過(guò)去,直接往下看,看到這里:
- def overlay[T](base: => T) = new OverlaySetting(base)
- // attempting to add an item after the queue reaches this size (in items) will fail.
- val maxItems = overlay(PersistentQueue.maxItems)
- // attempting to add an item after the queue reaches this size (in bytes) will fail.
- val maxSize = overlay(PersistentQueue.maxSize)
- ……
如果我們不細(xì)究overlay的內(nèi)容,這段代碼其實(shí)就是把object PersisitentQueue中的變量賦值給class PersistentQueue中,那么overlay究竟做了什么呢?其實(shí),overlay是將變量做了一個(gè)封裝,封裝在一個(gè)叫做OverlaySetting的類里面。這個(gè)類,根據(jù)我們之前對(duì)scala語(yǔ)法的了解,可以知道,它是一個(gè)OverlaySetting[T]的類,并且在創(chuàng)建的時(shí)候,需要帶入方法,方法沒(méi)有參數(shù),但是有一個(gè)返回值,類型就是T。(關(guān)于class類的語(yǔ)法規(guī)則,可以參考http://programming-scala.labs.oreilly.com/ch05.html#Constructors,不過(guò)里面的例子比OverlaySetting還復(fù)雜……-_-|||)
這個(gè)類在每次創(chuàng)建對(duì)象的時(shí)候,都會(huì)被賦值。我們也看到只有在使用apply方法的時(shí)候才會(huì)被調(diào)用(不過(guò)我沒(méi)有太想明白,如何通過(guò)函數(shù)的返回值來(lái)確定模板中的類型T,也許這就是Scala這種更加靈活的編譯算法,可以在new對(duì)象的時(shí)候,通過(guò)審查變量類型來(lái)獲取T的吧,畢竟Scala是一個(gè)靜態(tài)語(yǔ)言,如果是動(dòng)態(tài)語(yǔ)言就不太成為一個(gè)問(wèn)題了)。
這里面還存在一個(gè)Scala概念,就是方法=變量。當(dāng)然在很多動(dòng)態(tài)語(yǔ)言里面就已經(jīng)這么做了。在Scala里面,我們可以把def看作是val的一種特殊寫(xiě)法,def聲明的方法,也可以用 def func_name() = {} 這樣的語(yǔ)法規(guī)則,跟val基本就是一回事了。當(dāng)然,這一改變?cè)赟cala里面并不簡(jiǎn)單是一個(gè)語(yǔ)法規(guī)則的問(wèn)題,更進(jìn)一步的,所有的變量也都是類,所以我們可以把一個(gè)變量,看做一個(gè)類,也可以看做類的建構(gòu)函數(shù),返回的就是類本身……有點(diǎn)繞,不過(guò)這樣理解,就比較好理解為什么可以用常量,當(dāng)作沒(méi)有參數(shù)的方法調(diào)用了。
說(shuō)了那么多,結(jié)論很簡(jiǎn)單,maxSize是一個(gè)OverlaySetting[LONG]的類,如果maxSize沒(méi)有設(shè)置過(guò),那么返回的就是object PersistentQueue里面的maxSize。LONG類型。
在主程序體里面,我們看到了Journal類,然后是調(diào)用 configure 方法,這個(gè)方法印證了我們的對(duì)OverlaySetting的解釋,它從配置文件里面把參數(shù)都讀出來(lái)賦值給class PersistentQueue里面的那些常量,用的是set。這里是一個(gè)Scala的語(yǔ)法細(xì)節(jié),它省略了一些不必要的”.”和”()”。
休息一下。我們開(kāi)始討論在PersistentQueue里面的Actor
……
休息完畢
Scala中,消息傳遞的方式有一個(gè)特殊的語(yǔ)法結(jié)構(gòu):“Object ! MessageType” 就好像在源代碼里面出現(xiàn)的:“w.actor ! ItemArrived?!?,(關(guān)于Scala的Actor,詳細(xì)的語(yǔ)法說(shuō)明在http://programming-scala.labs.oreilly.com/ch09.html可以看到,建議先看一下,好對(duì)actor有一個(gè)比較深入的了解)
我們發(fā)現(xiàn)PersistentQueue中Actor的實(shí)現(xiàn),跟語(yǔ)法說(shuō)明里面的很不一樣,在語(yǔ)法說(shuō)明里面的Actor都是作為一個(gè)獨(dú)立的線程出現(xiàn)的,而在PersistentQueue中,你甚至看不見(jiàn)一個(gè)對(duì)Actor的重載,但我們可以發(fā)現(xiàn)與Actor相關(guān)的幾個(gè)地方,一個(gè)是Waiter的定義,它是一個(gè)case class,并且有一個(gè)成員變量叫做actor,類型是Actor:
- private case class Waiter(actor: Actor)
- ……
- private val waiters = new mutable.ArrayBuffer[Waiter]
- ……
- val w = Waiter(Actor.self)
- waiters += w
- ……
需要注意:之前我們提過(guò)一個(gè)Scala的語(yǔ)法規(guī)則,那就是類后面的建構(gòu)函數(shù)的參數(shù),就是類中的成員變量?。ú贿^(guò)這是在解釋,為什么在建構(gòu)函數(shù)里面會(huì)有private關(guān)鍵字時(shí)提到的……)所以,我們知道了一點(diǎn),就是每一個(gè)Waiter內(nèi)部都有一個(gè)actor,這些actor通過(guò)Actor.self共享了一個(gè)線程,當(dāng)然也和其他的PersistentQueue共享了一個(gè)Actor。這是有點(diǎn)讓人不習(xí)慣,因?yàn)檫@么要緊的一個(gè)線程的創(chuàng)建,竟然可以出現(xiàn)得那么隱蔽。甚至連一個(gè)大括號(hào)都沒(méi)有。
接下來(lái),我們來(lái)看看Actor是怎么在PersistentQueue里面工作了——這有點(diǎn)難,因?yàn)樗臋C(jī)制有點(diǎn)復(fù)雜,不是簡(jiǎn)單的象語(yǔ)法說(shuō)明里面的那樣,是一個(gè)完整的獨(dú)立的函數(shù),而是在一些函數(shù)中,突然切入進(jìn)來(lái),分享了Actor.self的一部分線程資源,就像下面代碼一樣:
- ……
- f operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {
- operateOrWait(op, timeoutAbsolute) match {
- case (item, None) =>
- f(item)
- case (None, Some(w)) =>
- Actor.self.reactWithin((timeoutAbsolute - Time.now) max 0) {
- case ItemArrived => operateReact(op, timeoutAbsolute)(f)
- case TIMEOUT => synchronized {
- waiters -= w
- // race: someone could have done an add() between the timeout and grabbing the lock.
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
- }
- }
- case _ => throw new RuntimeException()
- }
- ……
其中:
- Actor.self.reactWithin(0) {
- case ItemArrived => f(op)
- case TIMEOUT => f(op)
- }
就是Actor的一個(gè)語(yǔ)法,在一段時(shí)間里面等待消息,如果有消息就如何……,如果沒(méi)有消息(TIMEOUT),就如何……。但是在整個(gè)函數(shù)里面套用了兩層 Actor.self.reactWithin,有點(diǎn)讓人要暈菜的感覺(jué),再加上之前有一個(gè)match…case的結(jié)構(gòu),調(diào)用了operateOrWait(op, timeoutAbsolute)方法。要了解整個(gè)消息處理的機(jī)制,就需要把這三個(gè)部分聯(lián)系起來(lái)看了。
先簡(jiǎn)單看一下operateOrWait函數(shù),比較容易理解:
- private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {
- val item = op
- if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {
- val w = Waiter(Actor.self)
- waiters += w
- (None, Some(w))
- } else {
- (item, None)
- }
- }
返回值是一個(gè)map,包括兩個(gè)被Option封裝的類型QItem和Waiter,從QItem.scala中可以知道(代碼很簡(jiǎn)單),QItem就是把原始數(shù)據(jù)打了一個(gè)包,而Waiter之前我們也已經(jīng)說(shuō)過(guò)了。程序體中的判斷是這樣的:如果item,也就是op這個(gè)參數(shù)沒(méi)有定義,并且PersistentQueue也沒(méi)有停止,關(guān)閉,而且處理時(shí)間AbsoluteTime不是0,那么就創(chuàng)建一個(gè)Waiter,返回(None, Some[Waiter]);如果不滿足這些條件,那么就直接返回(op, None)。簡(jiǎn)單的說(shuō),就是如果系統(tǒng)還能等,就讓他等待正常一段時(shí)間然后操作,如果不能等,就直接返回操作指令。返回值只有兩種類型。
然后再看operateReact,如果返回的是時(shí)間參數(shù)是None(詳細(xì)的可以參考 actor .. case 的語(yǔ)法,地址是:http://programming-scala.labs.oreilly.com/ch03.html#MatchingOnCaseClasses),那么就直接執(zhí)行f(op)的函數(shù),把op這個(gè)方法,作為參數(shù)傳遞給f函數(shù)。如果返回的是一個(gè)時(shí)間戳,Some(w),那么我們就等待AbsoluteTime 到 Time.now()這段時(shí)間,如果在這段事件里面有ItemArrived事件發(fā)生,那么就處理一下,直到Time.now 等于或者大于 AbsoluteTime,那就會(huì)得到一個(gè)TIMEOUT,然后就退出了。(有一個(gè)異常的情況,需要清空一下事件隊(duì)列,通過(guò)reactWithin(0){})
這么理解這段actor還是不太清晰,那么讓我們回到上一層的調(diào)用??纯催@個(gè)f(op)到底是什么,然后我們看到了:
- def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit = {
- operateReact(remove(transaction), timeoutAbsolute)(f)
- }
我們就知道op其實(shí)是一個(gè)remove的操作,并且返回remove得到的QItem對(duì)象。再往上一層到QueueCollection,我們看到:
- q.removeReact(if (timeout == 0) timeout else Time.now + timeout, transaction) {
- case None =>
- queueMisses.incr
- f(None)
- case Some(item) =>
- queueHits.incr
- f(Some(item))
- }
f方法的操作,如果之前的remove返回的是一個(gè)None,則記錄queueMess(未命中)添加1,如果返回的是一個(gè)QItem的值,那么就記錄queueHits(命中)添加1,并且,對(duì)這個(gè)QItem進(jìn)行操作(注意:這里的f是QueueCollection中remove帶入的那個(gè)方法,而不是前面提到的removeReact里面提到的f。
從QueueCollection的remove調(diào)用到***層PersistentQueue的operateReact調(diào)用,我們大致可以了解這么曲折的調(diào)用關(guān)系解決了一個(gè)什么問(wèn)題——從消息隊(duì)列里面獲取QItem。
回顧一下QueueCollection其他的代碼,我們發(fā)現(xiàn),只有waiter.size > 0的時(shí)候,有新的QItem添加,才會(huì)發(fā)出ItemArrived事件。也就是說(shuō),只有有一個(gè)獲取消息隊(duì)列的進(jìn)程存在的時(shí)候,才會(huì)觸發(fā)ItemArrived事件。獲取消息隊(duì)列,則通過(guò)使用reactWithin,允許在一個(gè)規(guī)定的時(shí)間內(nèi),連續(xù)處理一系列的ItemArrived事件??碤ueueCollection的remove方法,我們還可以知道,當(dāng)啟動(dòng)q.removeReact之前,首先會(huì)調(diào)用q.peek來(lái)檢查,隊(duì)列是不是為空,如果不是空的話,就直接返回隊(duì)列里面最前面的那個(gè)元素。所以我們可以把這個(gè)消息隊(duì)列理解成——如果消息隊(duì)列為空的情況下,讓獲取消息隊(duì)列的Client等待一段時(shí)間的機(jī)制,以降低反復(fù)進(jìn)行SOCKET連接帶來(lái)的不必要的耗損。
這個(gè)機(jī)制,可以讓我們比較好地理解,為什么Kestrel提示說(shuō),如果運(yùn)行多個(gè)獨(dú)立的進(jìn)程來(lái)處理消息隊(duì)列的時(shí)候,會(huì)讓這個(gè)消息隊(duì)列的處理變成一個(gè)缺乏時(shí)序,但是處理并發(fā)能力很強(qiáng)的集群。每個(gè)連接對(duì)應(yīng)的是一個(gè)Waiter,但是當(dāng)ItemArrived觸發(fā)的時(shí)候,只可能有其中的一個(gè)reactWithin得到了這個(gè)事件,發(fā)送給對(duì)應(yīng)的那個(gè)線程處理這個(gè)消息。
我現(xiàn)在手上的是Kestrel-1.1.2版本的代碼,走讀這部分代碼的時(shí)候,其實(shí)發(fā)現(xiàn)作者在寫(xiě)這段代碼的時(shí)候,多了一些冗余的內(nèi)容——比如說(shuō)removeReceive方法,從而看出作者在使用Scala的特性中,也是逐步地把代碼優(yōu)化成如今的樣子。畢竟Scala和Java之間的差別很大,如果做到Type Less, Do More。是需要一個(gè)逐步積累的過(guò)程,誰(shuí)都不是天生就能把Scala寫(xiě)得很好的,更何況是需要性能非常高的時(shí)候。
【相關(guān)閱讀】