Scala Actor:多線程的基礎(chǔ)學(xué)習(xí)
Scala Actor是Scala里多線程的基礎(chǔ),核心思想是用消息傳遞來進(jìn)行線程間的信息共享和同步。
51CTO編輯推薦:Scala編程語言專題
Scala Actor線程模型可以這樣理解:所有Actor共享一個(gè)線程池,總的線程個(gè)數(shù)可以配置,也可以根據(jù)CPU個(gè)數(shù)決定;當(dāng)一個(gè)Actor啟動(dòng)之后,Scala分配一個(gè)線程給它使用,如果使用receive模型,這個(gè)線程就一直為該Actor所有,如果使用react模型,Scala執(zhí)行完react方法后拋出異常,則該線程就可以被其它Actor使用。
下面看一些核心代碼。
- def start(): Actor = synchronized {
 - // Reset various flags.
 - //
 - // Note that we do *not* reset `trapExit`. The reason is that
 - // users should be able to set the field in the constructor
 - // and before `act` is called.
 - exitReason = 'normal
 - exiting = false
 - shouldExit = false
 - scheduler execute {
 - ActorGC.newActor(Actor.this)
 - (new Reaction(Actor.this)).run()
 - }
 - this
 - }
 
其中Reaction實(shí)現(xiàn)Runnable接口,scheduler基本相當(dāng)于是一個(gè)線程池,所以調(diào)用start方法之后會(huì)有一個(gè)線程來為該Actor服務(wù)。
使用receive模型。
- def receive[R](f: PartialFunction[Any, R]): R = {
 - assert(Actor.self == this, "receive from channel belonging to other actor")
 - this.synchronized {
 - if (shouldExit) exit() // links
 - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
 - if (null eq qel) {
 - waitingFor = f.isDefinedAt
 - isSuspended = true
 - suspendActor()
 - } else {
 - received = Some(qel.msg)
 - sessions = qel.session :: sessions
 - }
 - waitingFor = waitingForNone
 - isSuspended = false
 - }
 - val result = f(received.get)
 - sessions = sessions.tail
 - result
 
如果當(dāng)前mailbox里面沒有可以處理的消息,調(diào)用suspendActor,該方法會(huì)調(diào)用wait;如果有消息,這調(diào)用PartialFunction進(jìn)行處理。
使用react模型。
- def react(f: PartialFunction[Any, Unit]): Nothing = {
 - assert(Actor.self == this, "react on channel belonging to other actor")
 - this.synchronized {
 - if (shouldExit) exit() // links
 - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
 - if (null eq qel) {
 - waitingFor = f.isDefinedAt
 - continuation = f
 - isDetached = true
 - } else {
 - sessions = List(qel.session)
 - scheduleActor(f, qel.msg)
 - }
 - throw new SuspendActorException
 - }
 
如果當(dāng)前mailbox沒有可以處理的消息,設(shè)置waitingFor和continuation,這兩個(gè)變量會(huì)在接收到消息的時(shí)候使用;如果有消息,則調(diào)用scheduleActor,該方法會(huì)在線程池里選擇一個(gè)新的線程來處理,具體的處理方法也是由PartialFunction決定。不管是哪條路徑,react都會(huì)立即返回,或者說是立即拋出異常,結(jié)束該線程的執(zhí)行,這樣該線程就可以被其它Actor使用。
再來看看接收消息的處理代碼。
- def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
 - if (waitingFor(msg)) {
 - received = Some(msg)
 - if (isSuspended)
 - sessions = replyTo :: sessions
 - else
 - sessions = List(replyTo)
 - waitingFor = waitingForNone
 - if (!onTimeout.isEmpty) {
 - onTimeout.get.cancel()
 - onTimeout = None
 - }
 - if (isSuspended)
 - resumeActor()
 - else // assert continuation != null
 - scheduler.execute(new Reaction(this, continuation, msg))
 - } else {
 - mailbox.append(msg, replyTo)
 - }
 
如果當(dāng)前沒有在等待消息或者接收到的消息不能處理,就丟到mailbox里去;相反,則進(jìn)行消息的處理。這里對(duì)于receive模型和react模型就有了分支:如果isSuspended為true,表示是receive模型,并且線程在wait,就調(diào)用resumeActor,該方法會(huì)調(diào)用notify;否則就是react模型,同樣在線程池里選擇一個(gè)線程進(jìn)行處理。
這樣,相信大家對(duì)Scala Actor就有了一個(gè)基本的認(rèn)識(shí)。
【相關(guān)閱讀】















 
 
 

 
 
 
 