詳解F#異步及并行模式中的并行CPU及I/O計算
51CTO將持續(xù)關注函數(shù)式編程語言F#的發(fā)展,今天將為大家講述的是F#異步及并行模式中的并行CPU及I/O計算。
最后還是忍不住翻譯文章了。這系列的文章談論的是F#中常見的異步及并行模式,作者為F#語言的主要設計者Don Syme。異步相關的編程是F#語言中最重要的優(yōu)勢之一(我甚至在考慮“之一”兩個字能否去掉)。F#是一門非常有特色的語言,是一門能夠開闊眼界,改變您編程思路的語言,它經(jīng)過了幾年設計以及多個預覽之后終于要正式露面了——此刻不上,更待何時。
介紹
F#是一門并行(parallel)及響應式(reactive)語言。這個說法意味著一個F#程序可以存在多個進行中的運算(如使用.NET線程進行F#計算),或是多個等待中的回應(如等待事件或消息的回調(diào)函數(shù)及代理對象)。
F#的異步表達式是簡化異步及響應式程序編寫的方式之一。在這篇及今后的文章中,我會探討一些使用F#進行異步編程的基本方式──大致說來,它們都是F#異步編程時使用的模式。這里我假設您已經(jīng)掌握了async的基本使用方式,如入門指南中的內(nèi)容。
我們從兩個簡單的設計模式開始:CPU異步并行(Parallel CPU Asyncs)和I/O異步并行(Paralle I/O Asyncs)。
本系列的第2部分描述了如何從異步計算或后臺計算單元中獲得結果。
第3部分則描述了F#中輕量級的,響應式的,各自獨立的代理對象。
模式1:CPU異步并行
首先來了解第一個模式:CPU異步并行,這意味著并行地開展一系列的CPU密集型計算。下面的代碼計算的是斐波那契數(shù)列,它會將這些計算進行并行地調(diào)配:
- let rec fib x = if x <= 2 then 1 else fib(x-1) + fib(x-2)
 - let fibs =
 - Async.Parallel [ for i in 0..40 -> async { return fib(i) } ]
 - |> Async.RunSynchronously
 
結果是:
- val fibs : int array =
 - [|1; 1; 2; 3; 5; 8; 13; 21; 34; 55; 89; 144; 233; 377; 610; 987; 1597; 2584;
 - 4181; 6765; 10946; 17711; 28657; 46368; 75025; 121393; 196418; 317811;
 - 514229; 832040; 1346269; 2178309; 3524578; 5702887; 9227465; 14930352;
 - 24157817; 39088169; 63245986; 102334155|]
 
上面的代碼展示了并行CPU異步計算模式的要素:
1.“async { … }”用于指定一系列的CPU任務。
2.這些任務使用Async.Parallel進行fork-join式的組合。
在這里,我們使用Async.RunSynchronously方法來執(zhí)行組合后的任務,這會啟動一個異步任務,并同步地等待其最后結果。您可以使用這個模式來完成各種CPU并行(例如對矩陣乘法進行劃分和并行計算)或是批量處理任務。
模式2:I/O異步并行
現(xiàn)在我們已經(jīng)展示了在F#中進行CPU密集型并行編程的方式。F#異步編程的重點之一,便是可以用相同的方式進行CPU和I/O密集型的計算。這便是我們的第二種模式:I/O異步并行,即同時開展多個I/O操作(也被稱為overlapped I/O)。例如下面的代碼便并行地請求多個Web頁面,并響應每個請求的回復,再返回收集到的結果。
- open System
 - open System.Net
 - open Microsoft.FSharp.Control.WebExtensions
 - let http url =
 - async { let req = WebRequest.Create(Uri url)
 - use! resp = req.AsyncGetResponse()
 - use stream = resp.GetResponseStream()
 - use reader = new StreamReader(stream)
 - let contents = reader.ReadToEnd()
 - return contents }
 - let sites = ["http://www.bing.com";
 - "http://www.google.com";
 - "http://www.yahoo.com";
 - "http://www.search.com"]
 - let htmlOfSites =
 - Async.Parallel [for site in sites -> http site ]
 - |> Async.RunSynchronously
 
上面的代碼示例展示了I/O異步并行模式的基礎:
1.“async { … }”用于編寫任務,其中包含了一些異步I/O。
2.這些任務使用Async.Parallel進行fork-join式的組合。
在這里,我們使用Async.RunSynchronously方法來執(zhí)行組合后的任務,這會啟動一個異步任務,并同步地等待其最后結果。
使用let!(或與它類似的資源釋放指令use!)是進行異步操作的基礎方法。例如:
- let! resp = req.AsyncGetResponse()
 
上面這行代碼會“響應”一個HTTP GET操作所得到的回復,即async { … }在AsyncGetResponse操作完成之后的部分。然而,在等待響應的過程中并不會阻塞任何.NET或操作系統(tǒng)的線程:只有活動的CPU密集型運算會使用下層的.NET或操作系統(tǒng)線程。與此不同,等待中的響應操作(例如回調(diào)函數(shù),事件處理程序和代理對象)資源占用非常少,幾乎只相當于一個注冊好的對象而已。因此,您可以同時擁有數(shù)千個甚至數(shù)百萬個等待中的響應操作。例如,一個典型的GUI應用程序會注冊一些事件處理程序,而一個典型Web爬蟲會為每個發(fā)出的請求注冊一個回調(diào)函數(shù)。
在上面的代碼中,我們使用了“use!”而不是“let!”,這表示W(wǎng)eb請求相關的資源會在變量超出字面的作用域之后得到釋放。
I/O并行的美妙之處在于其伸縮性。在多核的環(huán)境下,如果您可以充分利用計算資源,則通常會獲得2倍、4倍甚至8倍的性能提高。而在I/O并行編程中,您可以同時進行成百上千的I/O操作(不過實際的并行效果還要取決于您的操作系統(tǒng)和網(wǎng)絡連接狀況),這意味著10倍、100倍、1000倍甚至更多的性能增強──而這一切在一臺單核的機器上也可以實現(xiàn)。例如,這里有一個使用F#異步功能的示例,而最終它們可以在一個IronPython應用程序中使用。
許多現(xiàn)代應用程序都是I/O密集型應用,因此這些設計模式在實踐中都有很重要的意義。
始于GUI線程,終于GUI線程
這兩個設計模式有個重要的變化,這便是使用Async.StartWithContinuations來代替Async.RunSynchronously方法。在一個并行操作開啟之后,您可以指定三個函數(shù),分別在它成功、失敗或取消時調(diào)用。
對于諸如“我想要獲得一個異步操作的結果,但我不能使用RunSynchronously方法”之類的問題,您便應該考慮:
1.使用let!(或use!)把這個異步操作作為更大的異步任務的一部分,
2.使用Async.StartWithContinuations方法執(zhí)行異步操作
在那些需要在GUI線程上發(fā)起異步操作的場景中,Async.StartWithContinuations方法尤其有用。因為,您不會因此阻塞住GUI線程,而且可以在異步操作完成后直接進行GUI的更新。例如,在F# JAOO Tutorial的BingTranslator示例中便使用了這個做法──您可以在本文結尾瀏覽它的完整代碼,不過這里最值得關注的部分則是在點擊“Translate”按鈕之后發(fā)生的事情:
- button.Click.Add(fun args ->
 - let text = textBox.Text
 - translated.Text <- "Translating..."
 - let task =
 - async { let! languages = httpLines languageUri
 - let! fromLang = detectLanguage text
 - let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]
 - return (fromLang,results) }
 - Async.StartWithContinuations(
 - task,
 - (fun (fromLang,results) ->
 - for (toLang, translatedText) in results do
 - translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),
 - (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),
 - (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))
 
高亮的部分,尤其是在async塊里的部分,展示了使用Async.Parallel將一種語言并行地翻譯成多種語言的做法。這個異步組合操作由Async.StartWithContinuations發(fā)起,它會在遇到第一個I/O操作時立即返回(譯注:存疑,為什么是在遇上I/O操作才返回?),并指定了三個函數(shù),分別在異步操作的成功,失敗或取消時調(diào)用。以下是任務完成后的截圖(不過在此不保證翻譯的準確性……):

Async.StartWithContinuations有一個重要的特性:如果異步操作由GUI線程發(fā)起(例如一個SynchronizationContext.Current不為null的線程),那么操作完成后的回調(diào)函數(shù)也是在GUI線程中調(diào)用的。這使GUI更新操作變的十分安全。F#異步類庫允許您組合多個I/O任務,并在GUI線程中直接使用,而無需您親自從后臺線程中更新GUI元素。在以后的文章中我們會進行更詳細地解釋。
關于Async.Parallel工作方式:
在執(zhí)行時,由Async.Parallel組合而成的異步操作會通過一個等待計算的隊列來逐步發(fā)起。與大部分進行異步處理的類庫一樣,它在內(nèi)部使用的是QueueUserWorkItem方法。當然,我們也有辦法使用分離的隊列,在以后的文章中我們會進行一些討論。
Async.Parallel方法并沒有什么神奇之處,您也完全可以使用Microsoft.FSharp.Control.Async類庫中的其他原語來定義您自己的異步組合方式──例如Async.StartChild方法。我們會在以后的文章中討論這個話題。
更多示例
在F# JAOO Tutorial包含多個使用這些模式的示例代碼:
BingTranslator.fsx與BingTranslatorShort.fsx:使用F#調(diào)用REST API,它們與其他基于Web的HTTP服務的調(diào)用方式十分類似。文末包含了示例的完整代碼。
AsyncImages.fsx:并行磁盤I/O及圖像處理。
PeriodicTable.fsx:調(diào)用一個Web服務,并行地獲取原子質(zhì)量。
本文模式的限制
上文介紹的兩個并行模式有一些限制。很明顯,使用Async.Parallel生成的異步操作在執(zhí)行時十分“安靜”──比方說,它們無法返回進度或部分的結果。為此,我們需要構建一個更為“豐富”的對象,它會在部分操作完成之后觸發(fā)一些事件。在以后的文章中我們會來關注這樣的設計模式。
此外,Async.Parallel只能處理固定數(shù)量的任務。在以后的文章中,我們會遇到很多一邊處理一邊生成任務的情況。換個方式來看,即Async.Parallel無法處理即時獲得的消息──例如,除了取消任務之外,一個代理對象的工作進度是可以得到控制的。
總結
CPU異步并行與I/O異步并行,是F#異步編程中最為簡單的兩種設計模式,而簡單的事物往往也是非常重要而強大的。請注意,兩種模式的不同之處,僅僅在于I/O并行使用了包含了I/O請求的async塊,以及一些額外的CPU任務,如創(chuàng)建請求對象及后續(xù)處理。
在今后的文章里,我們會關注F#中其他一些并行及響應式編程方面的設計方式,包括:
- 從GUI線程中發(fā)起異步操作
 - 定義輕量級異步代理對象
 - 使用async定義后臺工作程序
 - 使用async構建.NET任務
 - 使用async調(diào)用.NET的APM模式
 - 取消異步操作
 
BingTranslator代碼示例
以下是BingTranslator的示例代碼,在運行時您需要申請一個Live API 1.1 AppID。請注意,這個示例需要根據(jù)Bing API 2.0進行適當調(diào)整,至少在2.0中已經(jīng)不包含這里的語言檢測API了──不過這些代碼仍然是不錯的示例:
- open System
 - open System.Net
 - open System.IO
 - open System.Drawing
 - open System.Windows.Forms
 - open System.Text
 - /// A standard helper to read all the lines of a HTTP request. The actual read of the lines is
 - /// synchronous once the HTTP response has been received.
 - let httpLines (uri:string) =
 - async { let request = WebRequest.Create uri
 - use! response = request.AsyncGetResponse()
 - use stream = response.GetResponseStream()
 - use reader = new StreamReader(stream)
 - let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]
 - return lines }
 - type System.Net.WebRequest with
 - /// An extension member to write content into an WebRequest.
 - /// The write of the content is synchronous.
 - member req.WriteContent (content:string) =
 - let bytes = Encoding.UTF8.GetBytes content
 - req.ContentLength <- int64 bytes.Length
 - use stream = req.GetRequestStream()
 - stream.Write(bytes,0,bytes.Length)
 - /// An extension member to read the content from a response to a WebRequest.
 - /// The read of the content is synchronous once the response has been received.
 - member req.AsyncReadResponse () =
 - async { use! response = req.AsyncGetResponse()
 - use responseStream = response.GetResponseStream()
 - use reader = new StreamReader(responseStream)
 - return reader.ReadToEnd() }
 - #load @"C:\fsharp\staging\docs\presentations\2009-10-04-jaoo-tutorial\BingAppId.fs"
 - //let myAppId = "please set your Bing AppId here"
 - /// The URIs for the REST service we are using
 - let detectUri = "http://api.microsofttranslator.com/V1/Http.svc/Detect?appId=" + myAppId
 - let translateUri = "http://api.microsofttranslator.com/V1/Http.svc/Translate?appId=" + myAppId + "&"
 - let languageUri = "http://api.microsofttranslator.com/V1/Http.svc/GetLanguages?appId=" + myAppId
 - let languageNameUri = "http://api.microsofttranslator.com/V1/Http.svc/GetLanguageNames?appId=" + myAppId
 - /// Create the user interface elements
 - let form = new Form (Visible=true, TopMost=true, Height=500, Width=600)
 - let textBox = new TextBox (Width=450, Text="Enter some text", Font=new Font("Consolas", 14.0F))
 - let button = new Button (Text="Translate", Left = 460)
 - let translated = new TextBox (Width = 590, Height = 400, Top = 50, ScrollBars = ScrollBars.Both, Multiline = true, Font=new Font("Consolas", 14.0F))
 - form.Controls.Add textBox
 - form.Controls.Add button
 - form.Controls.Add translated
 - /// An async method to call the language detection API
 - let detectLanguage text =
 - async { let request = WebRequest.Create (detectUri, Method="Post", ContentType="text/plain")
 - do request.WriteContent text
 - return! request.AsyncReadResponse() }
 - /// An async method to call the text translation API
 - let translateText (text, fromLang, toLang) =
 - async { let uri = sprintf "%sfrom=%s&to=%s" translateUri fromLang toLang
 - let request = WebRequest.Create (uri, Method="Post", ContentType="text/plain")
 - request.WriteContent text
 - let! translatedText = request.AsyncReadResponse()
 - return (toLang, translatedText) }
 - button.Click.Add(fun args ->
 - let text = textBox.Text
 - translated.Text <- "Translating..."
 - let task =
 - async { /// Get the supported languages
 - let! languages = httpLines languageUri
 - /// Detect the language of the input text. This could be done in parallel with the previous step.
 - let! fromLang = detectLanguage text
 - /// Translate into each language, in parallel
 - let! results = Async.Parallel [for lang in languages -> translateText (text, fromLang, lang)]
 - /// Return the results
 - return (fromLang,results) }
 - /// Start the task. When it completes, show the results.
 - Async.StartWithContinuations(
 - task,
 - (fun (fromLang,results) ->
 - for (toLang, translatedText) in results do
 - translated.Text <- translated.Text + sprintf "\r\n%s --> %s: \"%s\"" fromLang toLang translatedText),
 - (fun exn -> MessageBox.Show(sprintf "An error occurred: %A" exn) |> ignore),
 - (fun cxn -> MessageBox.Show(sprintf "A cancellation error ocurred: %A" cxn) |> ignore)))
 















 
 
 

 
 
 
 