如何在Python 的線程中運(yùn)行協(xié)程
在一篇文章理解Python異步編程的基本原理這篇文章中,我們講到,如果在異步代碼里面又包含了一段非常耗時(shí)的同步代碼,異步代碼就會(huì)被卡住。
那么有沒(méi)有辦法讓同步代碼與異步代碼看起來(lái)也是同時(shí)運(yùn)行的呢?方法就是使用事件循環(huán)的.run_in_executor()方法。
我們來(lái)看一下 Python 官方文檔[1]中的說(shuō)法:
那么怎么使用呢?還是以非常耗時(shí)的遞歸方式計(jì)算斐波那契數(shù)列的這個(gè)函數(shù)為例:
- def sync_calc_fib(n):
 - if n in [1, 2]:
 - return1
 - return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
 - async def calc_fib(n):
 - result = sync_calc_fib(n)
 - print(f'第 {n} 項(xiàng)計(jì)算完成,結(jié)果是:{result}')
 - return result
 
我們現(xiàn)在需要用 aiohttp 訪問(wèn)一個(gè)延遲5秒的網(wǎng)頁(yè),同時(shí)計(jì)算斐波那契數(shù)列第36項(xiàng)。
首先我們看看單獨(dú)計(jì)算第36項(xiàng)需要5秒鐘:
我們?cè)賮?lái)看看如果直接把這計(jì)算斐波那契數(shù)列和請(qǐng)求網(wǎng)站的兩個(gè)異步任務(wù)放在一起“并行”,實(shí)際時(shí)間是兩個(gè)任務(wù)的時(shí)間疊加:
具體原因我在上一篇文章里面已經(jīng)做了說(shuō)明。
現(xiàn)在,我想讓兩個(gè)任務(wù)“同時(shí)運(yùn)行”,于是就可以這樣修改代碼:
- import aiohttp
 - import asyncio
 - import time
 - from concurrent.futures import ThreadPoolExecutor
 - async def request(sleep_time):
 - async with aiohttp.ClientSession() as client:
 - resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
 - resp_json = await resp.json()
 - print(resp_json)
 - def sync_calc_fib(n):
 - if n in [1, 2]:
 - return 1
 - return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
 - def calc_fib(n):
 - result = sync_calc_fib(n)
 - print(f'第 {n} 項(xiàng)計(jì)算完成,結(jié)果是:{result}')
 - return result
 - async def main():
 - start = time.perf_counter()
 - loop = asyncio.get_event_loop()
 - with ThreadPoolExecutor(max_workers=4) as executor:
 - tasks_list = [
 - loop.run_in_executor(executor, calc_fib, 36),
 - asyncio.create_task(request(5))
 - ]
 - await asyncio.gather(*tasks_list)
 - end = time.perf_counter()
 - print(f'總計(jì)耗時(shí):{end - start}')
 - asyncio.run(main())
 
運(yùn)行效果如下圖所示:
在5秒鐘的時(shí)間,就把計(jì)算斐波那契數(shù)列和請(qǐng)求5秒延遲的網(wǎng)站都做完了。
實(shí)現(xiàn)這樣的轉(zhuǎn)變,關(guān)鍵的代碼就是:loop.run_in_executor(executor, calc_fib, 36)
其中的 loop就是主線程的事件循環(huán)(event loop),它是用來(lái)調(diào)度同一個(gè)線程里面的多個(gè)協(xié)程。
executor是我們使用ThreadPoolExecutor(max_workers=4)創(chuàng)建的一個(gè)有4個(gè)線程的線程池,calc_fib是一個(gè)耗時(shí)的同步函數(shù),36是傳入calc_fib的參數(shù)。loop.run_in_executor(executor, calc_fib, 36)的意思是說(shuō):
- 把calc_fib函數(shù)放到線程池里面去運(yùn)行
 - 給線程池增加一個(gè)回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)會(huì)在運(yùn)行結(jié)束后的下一次事件循環(huán)把結(jié)果保存下來(lái)。
 
請(qǐng)注意上圖中紅色箭頭對(duì)應(yīng)的calc_fib這是一個(gè)同步函數(shù),請(qǐng)與上一篇文章中的異步函數(shù)區(qū)分開(kāi)。run_in_executor的第二個(gè)參數(shù)需要是一個(gè)同步函數(shù)的函數(shù)名。
在上面的例子中,我們創(chuàng)建的是有4個(gè)線程的線程池。所以這個(gè)線程池最多允許4個(gè)阻塞式的同步函數(shù)“并行”。



















 
 
 











 
 
 
 