協(xié)程到底有什么用?六種I/O模式告訴你!
大家好,我是小風(fēng)哥,今天來聊一聊協(xié)程的作用。
假設(shè)磁盤上有10個(gè)文件,你需要讀取的內(nèi)存,那么你該怎么用代碼實(shí)現(xiàn)呢?
在接著往下看之前,先自己想一想這個(gè)問題,看看自己能想出幾種方法,各自有什么樣的優(yōu)缺點(diǎn)。想清楚了嗎(還在看嗎),想清楚了我們繼續(xù)往下看。
最簡單的方法——串行
這可能是大多數(shù)同學(xué)都能想到的最簡單方法,那就是一個(gè)一個(gè)的讀取,讀完一個(gè)接著讀下一個(gè)。用代碼表示是這樣的:
for file in files:
result = file.read()
process(result)
是不是非常簡單,我們假設(shè)每個(gè)文件讀取需要1分鐘,那么10個(gè)文件總共需要10分鐘才能讀取完成。這種方法有什么問題呢?實(shí)際上這種方法只有一個(gè)問題,那就是慢。除此之外,其它都是優(yōu)點(diǎn):
- 代碼簡單,容易理解
- 可維護(hù)性好,這代碼交給誰都能維護(hù)的了(論程序員的核心競爭力在哪里)
那么慢的問題該怎么解決呢?有的同學(xué)可能已經(jīng)想到了,為啥要一個(gè)一個(gè)讀取呢?并行讀取不就可以加快速度了嗎。
稍好的方法,并行
那么,該怎么并行讀取文件呢?顯然,地球人都知道,線程就是用來并行的。我們可以同時(shí)開啟10個(gè)線程,每個(gè)線程中讀取一個(gè)文件。用代碼實(shí)現(xiàn)就是這樣的:
def read_and_process(file):
result = file.read()
process(result)
def main():
files = [fileA,fileB,fileC......]
for file in files:
create_thread(read_and_process,
file).run()
# 等待這些線程執(zhí)行完成
怎么樣,是不是也非常簡單。那么這種方法有什么問題嗎?在開啟10個(gè)線程這種問題規(guī)模下沒有問題?,F(xiàn)在我們把問題難度加大,假設(shè)有10000個(gè)文件,需要處理該怎么辦呢?有的同學(xué)可能想10個(gè)文件和10000個(gè)文件有什么區(qū)別嗎,直接創(chuàng)建10000個(gè)線程去讀不可以嗎?實(shí)際上這里的問題其實(shí)是說創(chuàng)建多個(gè)線程有沒有什么問題。我們知道,雖然線程號(hào)稱“輕量級進(jìn)程”,雖然是輕量級但當(dāng)數(shù)量足夠可觀時(shí)依然會(huì)有性能問題。這里的問題主要有這樣幾個(gè)方面:
- 創(chuàng)建線程需要消耗系統(tǒng)資源,像內(nèi)存等(想一想為什么?)
- 調(diào)度開銷,尤其是當(dāng)線程數(shù)量較多且都比較繁忙時(shí)(同樣想一想為什么?)
- 創(chuàng)建多個(gè)線程不一定能加快I/O(如果此時(shí)設(shè)備處理能力已經(jīng)飽和)
既然線程有這樣那樣的問題,那么還有沒有更好的方法?
答案是肯定的,并行編程不一定只能依賴線程這種技術(shù)。
這里的答案就是基于事件驅(qū)動(dòng)編程技術(shù)。
事件驅(qū)動(dòng) + 異步
沒錯(cuò),即使在單個(gè)線程中,使用事件驅(qū)動(dòng)+異步也可以實(shí)現(xiàn)IO并行處理,Node.js就是非常典型的例子。為什么單線程也可以做到并行呢?這是基于這樣兩個(gè)事實(shí):
- 相對于CPU的處理速度來說,IO是非常慢的
- IO不怎么需要計(jì)算資源
因此,當(dāng)我們發(fā)起IO操作后為什么要一直等著IO執(zhí)行完成呢?在IO執(zhí)行完之前的這段時(shí)間處理其它IO難道不香嗎?
這就是為什么單線程也可以并行處理多個(gè)IO的本質(zhì)所在。
回到我們的例子,該怎樣用事件驅(qū)動(dòng)+異步來改造上述程序呢?實(shí)際上非常簡單。首先我們需要?jiǎng)?chuàng)建一個(gè)event loop,這個(gè)非常簡單:
event_loop = EventLoop()
然后,我們需要往event loop中加入原材料,也就是需要監(jiān)控的event,就像這樣:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
注意當(dāng)執(zhí)行file.asyn_read這行代碼時(shí)會(huì)立即返回,不會(huì)阻塞線程,當(dāng)這行代碼返回時(shí)可能文件還沒有真正開始讀取,這就是所謂的異步。
file.asyn_read這行代碼的真正目的僅僅是發(fā)起IO,而不是等待IO執(zhí)行完成。此后我們將該IO放到event loop中進(jìn)行監(jiān)控,也就是event_loop.add(file)這行代碼的作用。
一切準(zhǔn)備就緒,接下來就可以等待event的到來了:
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
我們可以看到,event_loop會(huì)一直等待直到有文件讀取完成(event_loop.wait_one_IO_ready()),這時(shí)我們就能得到讀完的文件了,接下來處理即可。全部代碼如下所示:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC ...]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
多線程 VS 單線程 + event loop
接下來我們看下程序執(zhí)行的效果。
在多線程情況下,假設(shè)有10個(gè)文件,每個(gè)文件讀取需要1秒,那么很簡單,并行讀取10個(gè)文件需要1秒。
那么對于單線程+event loop呢?我們再次看下event loop + 異步版本的代碼:
def add_to_event_loop(event_loop, file):
file.asyn_read() # 文件異步讀取
event_loop.add(file)
def main():
files = [fileA,fileB,fileC......]
event_loop = EventLoop()
for file in files:
add_to_event_loop(event_loop, file)
while event_loop:
file = event_loop.wait_one_IO_ready()
process(file.result)
對于add_to_event_loop,由于文件異步讀取,因此該函數(shù)可以瞬間執(zhí)行完成,真正耗時(shí)的函數(shù)其實(shí)就是event loop的等待函數(shù),也就是這樣:
file = event_loop.wait_one_IO_ready()
我們知道,一個(gè)文件的讀取耗時(shí)是1秒,因此該函數(shù)在1s后才能返回,但是,但是,接下來是重點(diǎn)。但是雖然該函數(shù)wait_one_IO_ready會(huì)等待1s,不要忘了,我們利用這兩行代碼同時(shí)發(fā)起了10個(gè)IO操作請求。
for file in files: add_to_event_loop(event_loop, file)
因此在event_loop.wait_one_IO_ready等待的1s期間,剩下的9個(gè)IO也完成了,也就是說event_loop.wait_one_IO_ready函數(shù)只是在第一次循環(huán)時(shí)會(huì)等待1s,但是此后的9次循環(huán)會(huì)直接返回,原因就在于剩下的9個(gè)IO也完成了。
因此整個(gè)程序的執(zhí)行耗時(shí)也是1秒。是不是很神奇,我們只用一個(gè)線程就達(dá)到了10個(gè)線程的效果。
這就是event loop + 異步的威力所在。
一個(gè)好聽的名字:Reactors模式
本質(zhì)上,我們上述給出的event loop簡單代碼片段做的事情本質(zhì)上和生物一樣:
給出刺激,做出反應(yīng)。
我們這里的給出event,然后處理event。這本質(zhì)上就是所謂的Reactors模式。
現(xiàn)在你應(yīng)該明白所謂的Reactors模式是怎么一回事了吧。
所謂的一些看上去復(fù)雜的異步框架其核心不過就是這里給出的代碼片段,只是這些框架可以支持更加復(fù)雜的多階段任務(wù)處理以及各種類型的IO。而我們這里給出的代碼片段只能處理文件讀取這一類IO。
把回調(diào)也加進(jìn)來
如果我們需要處理各種類型的IO上述代碼片段會(huì)有什么問題嗎?
問題就在于上述代碼片段就不會(huì)這么簡單了,針對不同類型會(huì)有不同的處理方法,因此上述process方法需要判斷IO類型然后有針對性的處理,這會(huì)使得代碼越來越復(fù)雜,越來越難以維護(hù)。
幸好我們也有應(yīng)對策略,這就是回調(diào)。
我們可以把IO完成后的處理任務(wù)封裝到回調(diào)函數(shù)中,然后和IO一并注冊到event loop。就像這樣:
def IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
這樣,event_loop在檢測到有IO完成后就可以把該IO和關(guān)聯(lián)的callback處理函數(shù)一并檢索出來,直接調(diào)用callback函數(shù)就可以了。
while event_loop:
io, callback = event_loop.wait_one_IO_ready()
callback(io.result)
看到了吧,這樣event_loop內(nèi)部就極其簡潔了,even_loop根本就不關(guān)心該怎么處理該IO結(jié)果,這是注冊的callback該關(guān)心的事情,event_loop需要做的僅僅就是拿到event以及相應(yīng)的處理函數(shù)callback,然后調(diào)用該callback函數(shù)就可以了。
現(xiàn)在我們可以同單線程來并發(fā)編程了,也使用callback對IO處理進(jìn)行了抽象,使得代碼更加容易維護(hù),想想看還有沒有什么問題?
回調(diào)函數(shù)的問題
雖然回調(diào)函數(shù)使得event loop內(nèi)部更加簡潔,但依然有其它問題,讓我們來仔細(xì)看看回調(diào)函數(shù):
def start_IO_type_1(event_loop, io):
io.start()
def callback(result):
process_IO_type_1(result)
event_loop.add((io, callback))
從上述代碼中你能看到什么問題嗎?在上述代碼中,一次IO處理過程被分為了兩部分:
- 發(fā)起IO
- IO處理
其中第2部分放到了回調(diào)函數(shù)中,這樣的異步處理天然不容易理解,這和我們熟悉的發(fā)起IO,等待IO完成、處理IO結(jié)果的同步模塊有很大差別。
這里的給的例子很簡單,所以你可能不以為意,但是當(dāng)處理的任務(wù)非常復(fù)雜時(shí),可能會(huì)出現(xiàn)回調(diào)函數(shù)中嵌套回調(diào)函數(shù),也就是回調(diào)地獄,這樣的代碼維護(hù)起來會(huì)讓你懷疑為什么要稱為一名苦逼的碼農(nóng)。
問題出在哪里
讓我們再來仔細(xì)的看看問題出在了哪里?
同步編程模式下很簡單,但是同步模式下發(fā)起IO,線程會(huì)被阻塞,這樣我們就不得不創(chuàng)建多個(gè)線程,但是創(chuàng)建過多線程又會(huì)有性能問題。
這樣為了發(fā)起IO后不阻塞當(dāng)前線程我們就不得不采用異步編程+event loop。在這種模式下,異步發(fā)起IO不會(huì)阻塞調(diào)用線程,我們可以使用單線程加異步編程的方法來實(shí)現(xiàn)多線程效果,但是在這種模式下處理一個(gè)IO的流程又不得不被拆分成兩部分,這樣的代碼違反程序員直覺,因此難以維護(hù)。
那么很自然的,有沒有一種方法既能有同步編程的簡單理解又會(huì)有異步編程的非阻塞呢?
Finally!終于到了協(xié)程
利用協(xié)程我可以以同步的形式來異步編程。
這是什么意思呢?
我們之所以采用異步編程是為了發(fā)起IO后不阻塞當(dāng)前線程,而是用協(xié)程,程序員可以自行決定在什么時(shí)刻掛起當(dāng)前協(xié)程,這樣也不會(huì)阻塞當(dāng)前線程。
而協(xié)程最棒的一點(diǎn)就在于掛起后可以暫存執(zhí)行狀態(tài),恢復(fù)運(yùn)行后可以在掛起點(diǎn)繼續(xù)運(yùn)行,這樣我們就不再需要像回調(diào)那樣將一個(gè)IO的處理流程拆分成兩部分了。因此我們可以在發(fā)起異步IO,這樣不會(huì)阻塞當(dāng)前線程,同時(shí)在發(fā)起異步IO后掛起當(dāng)前協(xié)程,當(dāng)IO完成后恢復(fù)該協(xié)程的運(yùn)行,這樣我們就可以實(shí)現(xiàn)同步的方式來異步編程了。接下來我們就用協(xié)程來改造一下回調(diào)版本的IO處理方式:
def start_IO_type_1(io):
io.start() # IO異步請求
yield # 暫停當(dāng)前協(xié)程
process_IO_type_1(result) # 處理返回結(jié)果
此后我們要把該協(xié)程放到event loop中監(jiān)控起來:
def add_to_event_loop(io, event_loop):
coroutine = start_IO_type_1(io)
next(coroutine)
event_loop.add(coroutine)
最后,當(dāng)IO完成后event loop檢索出相應(yīng)的協(xié)程并恢復(fù)其運(yùn)行:
while event_loop:
coroutine = event_loop.wait_one_IO_ready()
next(coroutine)
現(xiàn)在你應(yīng)該看出來了吧,上述代碼中沒有回調(diào),也沒有把處理IO的流程拆成兩部分,整體的代碼都是以同步的方式來編寫,最棒的是依然能達(dá)到異步的效果。
實(shí)際上你會(huì)看到,采用協(xié)程后我們依然需要基于事件編程的event loop,因?yàn)楸举|(zhì)上協(xié)程并沒有改變IO的異步處理本質(zhì),只要IO是異步處理的那么我們就必須依賴event loop來監(jiān)控IO何時(shí)完成,只不過我們采用協(xié)程消除了對回調(diào)的依賴,整體編程方式上還是采用程序員最熟悉也最容易理解的同步方式。
總結(jié)
看上去簡簡單單的IO實(shí)際上一點(diǎn)都不簡單吧。為了高效進(jìn)行IO操作,我們采用的技術(shù)是這樣演進(jìn)的:
- 單線程串行 + 阻塞式IO(同步)
- 多線程并行 + 阻塞式IO(并行)
- 單線程 + 非阻塞式IO(異步) + event loop
- 單線程 + 非阻塞式IO(異步) + event loop + 回調(diào)
- Reactor模式(更好的單線程 + 非阻塞式IO+ event loop + 回調(diào))
- 單線程 + 非阻塞式IO(異步) + event loop + 協(xié)程
最終我們采用協(xié)程技術(shù)獲取到了異步編程的高效以及同步編程的簡單理解,這也是當(dāng)今高性能服務(wù)器常用的一種技術(shù)組合。