偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

如何精確控制 asyncio 中并發(fā)運行的多個任務(wù)

開發(fā) 前端
之前我們了解了如何創(chuàng)建多個任務(wù)來并發(fā)運行程序,方式是通過 asyncio.create_task 將協(xié)程包裝成任務(wù)。

之前我們了解了如何創(chuàng)建多個任務(wù)來并發(fā)運行程序,方式是通過 asyncio.create_task 將協(xié)程包裝成任務(wù),如下所示:

import asyncio, time

async def main():
    task1 = asyncio.create_task(asyncio.sleep(3))
    task2 = asyncio.create_task(asyncio.sleep(3))
    task3 = asyncio.create_task(asyncio.sleep(3))

    await task1
    await task2
    await task3

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
總耗時: 3.003109625
"""

但這種代碼編寫方式只適用于簡單情況,如果在同時發(fā)出數(shù)百、數(shù)千甚至更多 Web 請求的情況下,這種編寫方式將變得冗長且混亂。所以 asyncio 提供了許多便利的函數(shù),支持我們一次性等待多個任務(wù)。

等待一組任務(wù)全部完成

一個被廣泛用于等待一組任務(wù)的方式是使用 asyncio.gather,這個函數(shù)接收一系列的可等待對象,允許我們在一行代碼中同時運行它們。如果傳入的 awaitable 對象是協(xié)程,gather 函數(shù)會自動將其包裝成任務(wù),以確保它們可以同時運行。這意味著不必像之前那樣,用 asyncio.create_task 單獨包裝,但即便如此,還是建議手動包裝一下。

asyncio.gather 同樣返回一個 awaitable 對象,在 await 表達式中使用它時,它將暫停,直到傳遞給它的所有 awaitable 對象都完成為止。一旦所有任務(wù)都完成,asyncio.gather 將返回這些任務(wù)的結(jié)果所組成的列表。

import asyncio
import time
from aiohttp import ClientSession

async def fetch_status(session: ClientSession, url: str):
    async with session.get(url) as resp:
        return resp.status

async def main():
    async with ClientSession() as session:
        # 注意:requests 里面是 100 個協(xié)程
        # 傳遞給 asyncio.gather 之后會自動被包裝成任務(wù)
        requests = [fetch_status(session, "http://www.baidu.com")
                    for _ in range(100)]
        # 并發(fā)運行 100 個任務(wù),并等待這些任務(wù)全部完成
        # 相比寫 for 循環(huán)再單獨 await,這種方式就簡便多了
        status_codes = await asyncio.gather(*requests)
        print(f"{len(status_codes)} 個任務(wù)已全部完成")

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
100 個任務(wù)已全部完成
總耗時: 0.552532458
"""

完成 100 個請求只需要 0.55 秒鐘,由于網(wǎng)絡(luò)問題,測試的結(jié)果可能不準確,但異步肯定比同步要快。

另外傳給 gather 的每個 awaitable 對象可能不是按照確定性順序完成的,例如將協(xié)程 a 和 b 按順序傳遞給 gather,但 b 可能會在 a 之前完成。不過 gather 的一個很好的特性是,不管 awaitable 對象何時完成,都保證結(jié)果會按照傳遞它們的順序返回。

import asyncio
import time

async def main():
    # asyncio.sleep 還可以接收一個 result 參數(shù),作為 await 表達式的值
    tasks = [asyncio.sleep(second, result=f"我睡了 {second} 秒")
             for second in (5, 3, 4)]
    print(await asyncio.gather(*tasks))

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒']
總耗時: 5.002968417
"""

然后 gather 還可以實現(xiàn)分組,什么意思呢?

import asyncio
import time

async def main():
    gather1 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (5, 3, 4)]
    )
    gather2 = asyncio.gather(
        *[asyncio.sleep(second, result=f"我睡了 {second} 秒")
          for second in (3, 3, 3)]
    )
    results = await asyncio.gather(
        gather1, gather2, asyncio.sleep(6, "我睡了 6 秒")
    )
    print(results)


start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
[['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒'], 
 ['我睡了 3 秒', '我睡了 3 秒', '我睡了 3 秒'], 
 '我睡了 6 秒']
總耗時: 6.002826208
"""

asyncio.gather 里面可以通過繼續(xù)接收 asyncio.gather 返回的對象,從而實現(xiàn)分組功能,還是比較強大的。

如果 gather 里面啥都不傳的話,那么會返回一個空列表。

問題來了,在上面的例子中,我們假設(shè)所有請求都不會失敗或拋出異常,這是理想情況。但如果請求失敗了呢?我們來看一下,當 gather 里面的任務(wù)出現(xiàn)異常時會發(fā)生什么?

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    results = await asyncio.gather(normal_running(), raise_error())
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
Traceback (most recent call last):
    ......
    raise ValueError("出錯啦")
ValueError: 出錯啦
"""

我們看到拋異常了,其實 gather 函數(shù)的原理就是等待一組任務(wù)運行完畢,當某個任務(wù)完成時,就調(diào)用它的 result 方法,拿到返回值。但我們之前介紹 Future 和 Task 的時候說過,如果出錯了,調(diào)用 result 方法會將異常拋出來。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執(zhí)行時出現(xiàn)了異常")
    # 但是剩余的任務(wù)仍在執(zhí)行,拿到當前的所有正在執(zhí)行的任務(wù)
    all_tasks = asyncio.all_tasks()
    # task 相當于對協(xié)程做了一個封裝,那么通過 get_coro 方法也可以拿到對應(yīng)的協(xié)程
    print(f"當前剩余的任務(wù):", [task.get_coro().__name__ for task in all_tasks])
    # 繼續(xù)等待剩余的任務(wù)完成
    results = await asyncio.gather(
        *[task for task in all_tasks if task.get_coro().__name__ != "main"]
    )
    print(results)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
執(zhí)行時出現(xiàn)了異常
當前剩余的任務(wù): ['main', 'normal_running']
['正常運行']
"""

可以看到在 await asyncio.gather() 的時候,raise_error() 協(xié)程拋異常了,那么異常會向上傳播,在 main() 里面 await 處產(chǎn)生 ValueError。我們捕獲之后查看剩余未完成的任務(wù),顯然只剩下 normal_running() 和 main(),因為任務(wù)執(zhí)行出現(xiàn)異常也代表它完成了。

需要注意的是,一個任務(wù)出現(xiàn)了異常,并不影響剩余未完成的任務(wù),它們?nèi)栽诤笈_運行。我們舉個例子證明這一點:

import asyncio, time

async def normal_running():
    await asyncio.sleep(5)
    return "正常運行"

async def raise_error():
    await asyncio.sleep(3)
    raise ValueError("出錯啦")

async def main():
    try:
        await asyncio.gather(normal_running(), raise_error())
    except Exception:
        print("執(zhí)行時出現(xiàn)了異常")
    # raise_error() 會在 3 秒后拋異常,然后向上拋,被這里捕獲
    # 而 normal_running() 不會受到影響,它仍然在后臺運行
    # 顯然接下來它只需要再過 2 秒就能運行完畢
    time.sleep(2)  # 注意:此處會阻塞整個線程
    # asyncio.sleep 是不耗費 CPU 的,因此即使 time.sleep 將整個線程阻塞了,也不影響
    # 因為執(zhí)行 time.sleep 時,normal_running() 里面的 await asyncio.sleep(5) 已經(jīng)開始執(zhí)行了
    results = await asyncio.gather(*[task for task in asyncio.all_tasks()
                                     if task.get_coro().__name__ != "main"])
    print(results)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
執(zhí)行時出現(xiàn)了異常
['正常運行']
總耗時: 5.004949666
"""

這里耗時是 5 秒,說明一個任務(wù)拋異常不會影響其它任務(wù),因為 time.sleep(2) 執(zhí)行完畢之后,normal_running() 里面 asyncio.sleep(5) 也已經(jīng)執(zhí)行完畢了,說明異常捕獲之后,剩余的任務(wù)沒有受到影響。

并且這里我們使用了 time.sleep,在工作中千萬不要這么做,因為它會阻塞整個線程,導致主線程無法再做其他事情了。而這里之所以用 time.sleep,主要是想說明一個任務(wù)出錯,那么將異常捕獲之后,其它任務(wù)不會受到影響。

那么問題來了,如果發(fā)生異常,我不希望它將異常向上拋該怎么辦呢?可能有人覺得這還不簡單,直接來一個異常捕獲不就行了?這是一個解決辦法,但 asyncio.gather 提供了一個參數(shù),可以更優(yōu)雅的實現(xiàn)這一點。

import asyncio

async def normal_running():
    await asyncio.sleep(3)
    return "正常運行"

async def raise_error():
    raise ValueError("出錯啦")

async def main():
    results = await asyncio.gather(
        normal_running(), raise_error(),
        return_exceptions=True
    )
    print(results)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
['正常運行', ValueError('出錯啦')]
"""

之前在介紹任務(wù)的時候我們說了,不管正常執(zhí)行結(jié)束還是出錯,都代表任務(wù)已完成,會將結(jié)果和異常都收集起來,只不過其中肯定有一個為 None。然后根據(jù)不同的情況,選擇是否將異常拋出來。所以在 asyncio 里面,異常只是一個普通的屬性,會保存在任務(wù)對象里面。

對于 asyncio.gather 也是同理,它里面有一個 return_exceptions 參數(shù),默認為 False,當任務(wù)出現(xiàn)異常時,會拋給 await 所在的位置。如果該參數(shù)設(shè)置為 True,那么出現(xiàn)異常時,會直接把異常本身返回(此時任務(wù)也算是結(jié)束了)。

在 asyncio 里面,異常變成了一個可控的屬性。因為執(zhí)行是以任務(wù)為單位的,當出現(xiàn)異常時,也會作為任務(wù)的一個普通的屬性。我們可以選擇將它拋出來,也可以選擇隱式處理掉。

至于我們要判斷哪些任務(wù)是正常執(zhí)行,哪些任務(wù)是拋了異常,便可以通過返回值來判斷。如果 isinstance(res, Exception) 為 True,那么證明任務(wù)出現(xiàn)了異常,否則正常執(zhí)行。雖然這有點笨拙,但也能湊合用,因為 API 并不完美。

當然以上這些都不能算是缺點,gather 真正的缺點有兩個:

  • 如果我希望所有任務(wù)都執(zhí)行成功,要是有一個任務(wù)失敗,其它任務(wù)自動取消,該怎么實現(xiàn)呢?比如發(fā)送 Web 請求,如果一個請求失敗,其他所有請求也會失敗(要取消請求以釋放資源)。顯然要做到這一點不容易,因為協(xié)程被包裝在后臺的任務(wù)中;
  • 其次,必須等待所有任務(wù)執(zhí)行完成,才能處理結(jié)果,如果想要在結(jié)果完成后立即處理它們,這就存在問題。例如有一個請求需要 100 毫秒,而另一個請求需要 20 秒,那么在處理 100 毫秒完成的那個請求之前,我們將等待 20 秒。

而 asyncio 也提供了用于解決這兩個問題的 API。

在任務(wù)完成時立即處理

如果想在某個結(jié)果生成之后就對其進行處理,這是一個問題;如果有一些可以快速完成的等待對象,和一些可能需要很長時間完成的等待對象,這也可能是一個問題。因為 gather 需要等待所有對象執(zhí)行完畢,這就導致應(yīng)用程序可能變得無法響應(yīng)。

想象一個用戶發(fā)出 100 個請求,其中兩個很慢,但其余的都很快完成。如果一旦有請求完成,可以向用戶輸出一些信息,來提升用戶的使用體驗。

為處理這種情況,asyncio 公開了一個名為 as_completed 的 API 函數(shù),這個函數(shù)接收一個可等待對象(awaitable)組成的列表,并返回一個生成器。通過遍歷,等待它們中的每一個對象都完成,并且哪個先完成,哪個就先被迭代。這意味著將能在結(jié)果可用時立即就處理它們,但很明顯此時就沒有所謂的順序了,因為無法保證哪些請求先完成。

import asyncio
import time

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    # asyncio 提供的用于等待一組 awaitable 對象的 API 都很智能
    # 如果檢測到你傳遞的是協(xié)程,那么會自動包裝成任務(wù)
    # 不過還是建議手動包裝一下
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (3, 5, 2, 4, 6, 1)]
    for finished in asyncio.as_completed(tasks):
        print(await finished)

loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
end = time.perf_counter()
print("總耗時:", end - start)
"""
我睡了 1 秒
我睡了 2 秒
我睡了 3 秒
我睡了 4 秒
我睡了 5 秒
我睡了 6 秒
總耗時: 6.000872417
"""

和 gather 不同,gather 是等待一組任務(wù)全部完成之后才返回,并且會自動將結(jié)果取出來,結(jié)果值的順序和添加任務(wù)的順序是一致的。對于 as_completed 而言,它會返回一個生成器,我們遍歷它,哪個任務(wù)先完成則哪個就先被處理。

那么問題來了,如果出現(xiàn)異常了該怎么辦?很簡單,直接異常捕獲即可。

然后我們再來思考一個問題,任何基于 Web 的請求都存在花費很長時間的風險,服務(wù)器可能處于過重的資源負載下,或者網(wǎng)絡(luò)連接可能很差。

之前我們看到了通過 wait_for 函數(shù)可以為特定請求添加超時,但如果想為一組請求設(shè)置超時怎么辦?as_completed 函數(shù)通過提供一個可選的 timeout 參數(shù)來處理這種情況,它允許以秒為單位指定超時時間。如果花費的時間超過設(shè)定的時間,那么迭代器中的每個可等待對象都會在等待時拋出 TimeoutException。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時啦")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
"""

as_completed 非常適合用于盡快獲得結(jié)果,但它也有缺點。

第一個缺點是沒有任何方法可快速了解我們正在等待哪個協(xié)程或任務(wù),因為運行順序是完全不確定的。如果不關(guān)心順序,這可能沒問題,但如果需要以某種方式將結(jié)果與請求相關(guān)聯(lián),那么將面臨挑戰(zhàn)。

第二個缺點是超時,雖然會正確地拋出異常并繼續(xù)運行程序,但創(chuàng)建的所有任務(wù)仍將在后臺運行。如果想取消它們,很難確定哪些任務(wù)仍在運行,這是我們面臨的另一個挑戰(zhàn)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in (1, 5, 6)]
    for finished in asyncio.as_completed(tasks, timeout=3):
        try:
            print(await finished)
        except asyncio.TimeoutError:
            print("超時啦")

    # tasks[1] 還需要 2 秒運行完畢,tasks[2] 還需要 3 秒運行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(2)
    # 此時只剩下 tasks[2],還需要 1 秒運行完畢
    print(tasks[1].done(), tasks[2].done())

    await asyncio.sleep(1)
    # tasks[2] 也運行完畢
    print(tasks[1].done(), tasks[2].done())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
超時啦
超時啦
False False
True False
True True
"""

根據(jù)輸出結(jié)果可以發(fā)現(xiàn),雖然因為抵達超時時間, await 會導致 TimeoutError,但未完成的任務(wù)不會受到影響,它們?nèi)匀辉诤笈_執(zhí)行。

但這對于我們來說,有時卻不是一件好事,因為我們希望如果抵達超時時間,那么未完成的任務(wù)就別在執(zhí)行了,這時候如何快速找到那些未完成的任務(wù)呢?為處理這種情況,asyncio 提供了另一個 API 函數(shù):wait。

使用 wait 進行細粒度控制

gather 和 as_completed 的缺點之一是,當我們看到異常時,沒有簡單的方法可以取消已經(jīng)在運行的任務(wù)。這在很多情況下可能沒問題,但是想象一個場景:同時發(fā)送大批量 Web 請求(參數(shù)格式是相同的),如果某個請求的參數(shù)格式錯誤(說明所有請求的參數(shù)格式都錯了),那么剩余的請求還有必要執(zhí)行嗎?顯然是沒有必要的,而且還會消耗更多資源。另外 as_completed 的另一個缺點是,由于迭代順序是不確定的,因此很難準確跟蹤已完成的任務(wù)。

于是 asyncio 提供了 wait 函數(shù),注意它和 wait_for 的區(qū)別,wait_for 針對的是單個任務(wù),而 wait 則針對一組任務(wù)(不限數(shù)量)。

注:wait 函數(shù)接收的是一組 awaitable 對象,但未來的版本改為僅接收任務(wù)對象。因此對于 gather、as_completed、wait 等函數(shù),雖然它們會自動包裝成任務(wù),但我們更建議先手動包裝成任務(wù),然后再傳過去。

并且 wait 和 as_completed 接收的都是任務(wù)列表,而 gather 則要求將列表打散,以多個位置參數(shù)的方式傳遞,因此這些 API 的參數(shù)格式不要搞混了。

然后是 wait 函數(shù)的返回值,它會返回兩個集合:一個由已完成的任務(wù)(執(zhí)行結(jié)束或出現(xiàn)異常)組成的集合,另一個由未完成的任務(wù)組成的集合。而 wait 函數(shù)的參數(shù),它除了可以接收一個任務(wù)列表之外,還可以接收一個 timeout(超時時間)和一個 return_when(用于控制返回條件)。光說很容易亂,我們來實際演示一下。

等待所有任務(wù)完成

如果未指定 retun_when,則此選項使用默認值,并且它的行為與 asyncio.gather 最接近,但也存在一些差異。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in (3, 2, 4)]
    # 和 gather 一樣,默認會等待所有任務(wù)都完成
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    for done_task in done:
        print(await done_task)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 0
我睡了 2 秒
我睡了 4 秒
我睡了 3 秒
"""

await asynio.wait 時,會返回兩個集合,分別保存已完成的任務(wù)和仍然運行的任務(wù)。并且由于返回的是集合,所以是無序的。默認情況下,asyncio.wait 會等到所有任務(wù)都完成后才返回,所以待處理集合的長度為 0。

然后還是要說一下異常,如果某個任務(wù)執(zhí)行時出現(xiàn)異常了該怎么辦呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 5
未完成的任務(wù)數(shù): 0
Task exception was never retrieved
future: <Task finished ... coro=<delay() done, defined at .../main.py:3> 
         exception=ValueError('我出錯了(second is 3)')>
    ......
    raise ValueError("我出錯了(second is 3)")
ValueError: 我出錯了(second is 3)
"""

對于 asyncio.gather 而言,如果某個任務(wù)出現(xiàn)異常,那么異常會向上拋給 await 所在的位置。如果不希望它拋,那么可以將 gather 里面的 return_exceptions 參數(shù)指定為 True,這樣當出現(xiàn)異常時,會將異常返回。

而 asyncio.wait 也是如此,如果任務(wù)出現(xiàn)異常了,那么會直接視為已完成,異常同樣不會向上拋。但是從程序開發(fā)的角度來講,返回值可以不要,但異常不能不處理。

所以當任務(wù)執(zhí)行出錯時,雖然異常不會向上拋,但 asyncio 會將它打印出來,于是就有了:Task exception was never retrieved。意思就是該任務(wù)出現(xiàn)異常了,但你沒有處理它。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]
    # done 里面保存的都是已完成的任務(wù)
    done, pending = await asyncio.wait(tasks)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    # 所以我們直接遍歷 done 即可
    for done_task in done:
        # 這里不能使用 await done_task,因為當任務(wù)完成時,它就等價于 done_task.result()
        # 而任務(wù)出現(xiàn)異常時,調(diào)用 result() 是會將異常拋出來的,所以我們需要先檢測異常是否為空
        exc = done_task.exception()
        if exc:
            print(exc)
        else:
            print(done_task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 5
未完成的任務(wù)數(shù): 0
我睡了 5 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 1 秒
"""

這里調(diào)用 result 和 exception 有一個前提,就是任務(wù)必須處于已完成狀態(tài),否則會拋異常:InvalidStateError: Result is not ready.。但對于我們當前是沒有問題的,因為 done 里面的都是已完成的任務(wù)。

這里能再次看到和 gather 的區(qū)別,gather 會幫你把返回值都取出來,放在一個列表中,并且順序就是任務(wù)添加的順序。而 wait 返回的是集合,集合里面是任務(wù),我們需要手動拿到返回值。

某個完成出現(xiàn)異常時取消其它任務(wù)

從目前來講,wait 的作用和 gather 沒有太大的區(qū)別,都是等到任務(wù)全部結(jié)束再解除等待(出現(xiàn)異常也視作任務(wù)完成,并且其它任務(wù)不受影響)。那如果我希望當有任務(wù)出現(xiàn)異常時,立即取消其它任務(wù)該怎么做呢?顯然這就依賴 wait 函數(shù)里面的 return_when,它有三個可選值:

  • asyncio.ALL_COMPLETED:等待所有任務(wù)完成后返回;
  • asyncio.FIRST_COMPLETED:有一個任務(wù)完成就返回;
  • asyncio.FIRST_EXCEPTION:當有任務(wù)出現(xiàn)異常時返回;

顯然為完成這個需求,我們應(yīng)該將 return_when 指定為 FIRST_EXCEPTION。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds), name=f"睡了 {seconds} 秒的任務(wù)")
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

    print("都有哪些任務(wù)完成了?")
    for t in done:
        print("    " + t.get_name())

    print("還有哪些任務(wù)沒完成?")
    for t in pending:
        print("    " + t.get_name())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
都有哪些任務(wù)完成了?
    睡了 2 秒的任務(wù)
    睡了 3 秒的任務(wù)
    睡了 1 秒的任務(wù)
還有哪些任務(wù)沒完成?
    睡了 4 秒的任務(wù)
    睡了 5 秒的任務(wù)
"""

當 delay(3) 失敗時,顯然 delay(1)、delay(2) 已完成,而 delay(4) 和 delay(5) 未完成。此時集合 done 里面的就是已完成的任務(wù),pending 里面則是未完成的任務(wù)。

當 wait 返回時,未完成的任務(wù)仍在后臺繼續(xù)運行,如果我們希望將剩余未完成的任務(wù)取消掉,那么直接遍歷 pending 集合即可。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")
    # 此時未完成的任務(wù)仍然在后臺運行,這時候我們可以將它們?nèi)∠?    for t in pending:
        t.cancel()
    # 阻塞 3 秒
    await asyncio.sleep(3)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
"""

在 await asyncio.sleep(3) 的時候,剩余兩個任務(wù)并沒有輸出,所以任務(wù)確實被取消了。注:出現(xiàn)異常的任務(wù)會被掛在已完成集合里面,如果沒有任務(wù)在執(zhí)行時出現(xiàn)異常,那么效果等價于 ALL_COMPLETED。

當任務(wù)完成時處理結(jié)果

ALL_COMPLETED 和 FIRST_EXCEPTION 都有一個缺點,在任務(wù)成功且不拋出異常的情況下,必須等待所有任務(wù)完成。對于之前的用例,這可能是可以接受的,但如果想要在某個協(xié)程成功完成后立即處理結(jié)果,那么現(xiàn)在的情況將不能滿足我們的需求。

雖然這個場景可使用 as_completed 實現(xiàn),但 as_completed 的問題是沒有簡單的方法可以查看哪些任務(wù)還在運行,哪些任務(wù)已經(jīng)完成。因為遍歷的時候,我們無法得知哪個任務(wù)先完成,所以 as_completed 無法完成我們的需求。

好在 wait 函數(shù)的 return_when 參數(shù)可以接收 FIRST_COMPLETED 選項,表示只要有一個任務(wù)完成就立即返回,而返回的可以是執(zhí)行出錯的任務(wù),也可以是成功運行的任務(wù)(任務(wù)失敗也表示已完成)。然后,我們可以取消其他正在運行的任務(wù),或者讓某些任務(wù)繼續(xù)運行,具體取決于用例。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    print(f"我睡了 {seconds} 秒")

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
已完成的任務(wù)數(shù): 1
未完成的任務(wù)數(shù): 4
"""

當 return_when 參數(shù)為 FIRST_COMPLETED 時,那么只要有一個任務(wù)完成就會立即返回,然后我們處理完成的任務(wù)即可。至于剩余的任務(wù),它們?nèi)栽诤笈_運行,我們可以繼續(xù)對其使用 wait 函數(shù)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    if seconds == 3:
        raise ValueError("我出錯了(second is 3)")
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    while True:
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for t in done:
            exc = t.exception()
            print(exc) if exc else print(t.result())

        if pending:  # 還有未完成的任務(wù),那么繼續(xù)使用 wait
            tasks = pending
        else:
            break

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
我睡了 1 秒
我睡了 2 秒
我出錯了(second is 3)
我睡了 4 秒
我睡了 5 秒
"""

整個行為和 as_completed 是一致的,但這種做法有一個好處,就是我們每一步都可以準確地知曉哪些任務(wù)已經(jīng)完成,哪些任務(wù)仍然運行,并且也可以做到精確取消指定任務(wù)。

處理超時

除了允許對如何等待協(xié)程完成進行更細粒度的控制外,wait 還允許設(shè)置超時,以指定我們希望等待完成的時間。要啟用此功能,可將 timeout 參數(shù)設(shè)置為所需的最大秒數(shù),如果超過了這個超時時間,wait 將立即返回 done 和 pending 任務(wù)集。

不過與目前所看到的 wait_for 和 as_completed 相比,超時在 wait 中的行為方式存在一些差異。

1)協(xié)程不會被取消。

當使用 wait_for 時,如果任務(wù)超時,則引發(fā) TimeouError,并且任務(wù)也會自動取消。但使用 wait 的情況并非如此,它的行為更接近我們在 as_completed 中看到的情況。如果想因為超時而取消協(xié)程,必須顯式地遍歷任務(wù)并取消,否則它們?nèi)栽诤笈_運行。

2)不會引發(fā)超時錯誤。

如果發(fā)生超時,則 wait 返回所有已完成的任務(wù),以及在發(fā)生超時的時候仍處于運行狀態(tài)的所有任務(wù)。

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, timeout=3.1)
    print(f"已完成的任務(wù)數(shù): {len(done)}")
    print(f"未完成的任務(wù)數(shù): {len(pending)}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
已完成的任務(wù)數(shù): 3
未完成的任務(wù)數(shù): 2
"""

wait 調(diào)用將在 3 秒后返回 done 和 pending 集合,在 done 集合中,會有三個已完成的任務(wù)。而耗時 4 秒和 5 秒的任務(wù),由于仍在運行,因此它們將出現(xiàn)在 pending 集合中。我們可以繼續(xù)等待它們完成并提取返回值,也可以將它們?nèi)∠簟?/p>

需要注意:和之前一樣,pending 集合中的任務(wù)不會被取消,并且繼續(xù)運行,盡管會超時。對于要終止待處理任務(wù)的情況,我們需要顯式地遍歷 pending 集合并在每個任務(wù)上調(diào)用 cancel。

為什么要先將協(xié)程包裝成任務(wù)

我們說協(xié)程在傳給 wait 的時候會自動包裝成任務(wù),那為什么我們還要手動包裝呢?

import asyncio

async def delay(seconds):
    await asyncio.sleep(seconds)
    return f"我睡了 {seconds} 秒"

async def main():
    tasks = [asyncio.create_task(delay(seconds))
             for seconds in range(1, 6)]
    done, pending = await asyncio.wait(tasks, timeout=3.1)
    print(all(map(lambda t: t in tasks, done)))
    print(all(map(lambda t: t in tasks, pending)))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
True
True
"""

如果 wait 函數(shù)接收的就是任務(wù),那么 wait 函數(shù)就不會再包裝了,所以 done 和 pending 里面的任務(wù)和 tasks 里面的任務(wù)是相同的?;谶@個條件,我們后續(xù)可以做一些比較之類的。

比如有很多 Web 請求任務(wù),但如果當未完成的任務(wù)是 task1、task2、task3,那么就取消掉,于是可以這么做。

for t in pending:
    if t in (task1, task2, task3):
        t.cancel()

如果返回的 done 和 pending 里的任務(wù),是在 wait 函數(shù)中自動創(chuàng)建的,那么我們就無法進行任何比較來查看 pending 集合中的特定任務(wù)。

小結(jié)

1)asyncio.gather 函數(shù)允許同時運行多個任務(wù),并等待它們完成。一旦傳遞給它的所有任務(wù)全部完成,這個函數(shù)就會返回。由于 gather 會拿到里面每個任務(wù)的返回值,所以它要求每個任務(wù)都是成功的,如果有任務(wù)執(zhí)行出錯(沒有返回值),那么獲取返回值的時候就會將異常拋出來,然后向上傳遞給 await asyncio.gather。

為此,可以將 return_exceptions 設(shè)置為 True,這將返回成功完成的可等待對象的結(jié)果,以及產(chǎn)生的異常(異常會作為一個普通的屬性返回,和返回值是等價的)。

2)可使用 as_completed 函數(shù)在可等待對象列表完成時處理它們的結(jié)果,它會返回一個可以循環(huán)遍歷的生成器。一旦某個協(xié)程或任務(wù)完成,就能訪問結(jié)果并處理它。

3)如果希望同時運行多個任務(wù),并且還希望能了解哪些任務(wù)已經(jīng)完成,哪些任務(wù)在運行,則可以使用 wait。這個函數(shù)還允許在返回結(jié)果時進行更多控制,返回時,我們會得到一組已經(jīng)完成的任務(wù)和一組仍在運行的任務(wù)。

然后可以取消任何想要取消的任務(wù),或執(zhí)行其他任何需要執(zhí)行的任務(wù)。并且 wait 里面的任務(wù)出現(xiàn)異常,也不會影響其它任務(wù),異常會作為任務(wù)的一個屬性,只是在我們沒有處理的時候會給出警告。至于具體的處理方式,我們直接通過 exception 方法判斷是否發(fā)生了異常即可,沒有異常返回 result(),有異常返回 exception()。

責任編輯:華軒 來源: 古明地覺的編程教室
相關(guān)推薦

2023-04-26 11:59:06

Swift異步編程

2022-04-26 08:41:38

Swift并發(fā)系統(tǒng)iOS

2020-02-21 08:00:00

Pythonasyncio編程語言

2021-04-07 06:00:18

JavaScript 前端并發(fā)控制

2009-02-09 10:06:03

并發(fā)控制Web應(yīng)用悲觀鎖

2017-11-06 17:16:55

Linux設(shè)備驅(qū)動并發(fā)控制

2017-08-02 15:00:12

PythonAsyncio異步編程

2017-05-05 08:44:24

PythonAsyncio異步編程

2024-11-27 13:25:24

Rust線程池線程

2009-07-03 12:59:40

Servlet配置

2011-08-30 10:20:41

Silverlight

2021-01-12 10:22:45

JavaScript并發(fā)控制前端

2024-04-30 10:29:46

前端開發(fā)h5開發(fā)函數(shù)

2025-03-21 09:01:34

Swift任務(wù)取消機制協(xié)作式取消

2021-05-12 22:07:43

并發(fā)編排任務(wù)

2021-08-01 15:26:59

協(xié)程Asyncio并發(fā)數(shù)

2023-07-14 15:10:17

PythonAsyncIO庫

2024-03-04 00:02:00

Redis存儲令牌

2024-01-18 08:37:33

socketasyncio線程

2021-05-13 21:58:00

高并發(fā)應(yīng)用Asyncio
點贊
收藏

51CTO技術(shù)棧公眾號