偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

一日一技:使用 Asyncio 如何限制協(xié)程的并發(fā)數(shù)

開發(fā) 前端
如果大家要限制協(xié)程的并發(fā)數(shù),那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動(dòng)協(xié)程之前初始化它,然后傳給協(xié)程。要確保所有并發(fā)協(xié)程拿到的是同一個(gè)Semaphore對(duì)象。

[[414567]]

在昨天的直播中,有同學(xué)問道,如果使用 asyncio + httpx 實(shí)現(xiàn)并發(fā)請(qǐng)求,怎么限制請(qǐng)求的頻率呢?怎么限制最多只能有 x 個(gè)請(qǐng)求同時(shí)發(fā)出呢?我們今天給出兩種方案。

提出問題

假設(shè)如果我們同時(shí)發(fā)起12個(gè)請(qǐng)求,每個(gè)請(qǐng)求的時(shí)間不同,那么總共的請(qǐng)求時(shí)間大概跟最長耗時(shí)的請(qǐng)求差不多。我們先來寫一個(gè)用于測(cè)試的例子:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay): 
  7.     print(f'請(qǐng)求一個(gè)延遲為{delay}秒的接口'
  8.     async with httpx.AsyncClient(timeout=20) as client: 
  9.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  10.         result = resp.json() 
  11.         print(result) 
  12.  
  13.  
  14. async def main(): 
  15.     start = time.time() 
  16.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  17.     task_list = [] 
  18.     for delay in delay_list: 
  19.         task = asyncio.create_task(req(delay)) 
  20.         task_list.append(task) 
  21.     await asyncio.gather(*task_list) 
  22.     end = time.time() 
  23.     print(f'一共耗時(shí):{end - start}'
  24.  
  25. asyncio.run(main()) 

這段代碼,使用 for 循環(huán)創(chuàng)建了12個(gè)協(xié)程任務(wù),這些任務(wù)幾乎同時(shí)運(yùn)行,于是,請(qǐng)求完成所有的接口,總共耗時(shí)如下圖所示:

現(xiàn)在的問題是,由于網(wǎng)站有反爬蟲機(jī)制,最多只能同時(shí)發(fā)起3個(gè)請(qǐng)求。那么我們?cè)趺创_保同一時(shí)間最多只有3個(gè)協(xié)程在請(qǐng)求網(wǎng)絡(luò)呢?

限制協(xié)程任務(wù)數(shù)

第一個(gè)方案跟以前限制多線程的線程數(shù)的方案相同。我們創(chuàng)建一個(gè)列表,確保列表里面最多只有3個(gè)任務(wù),然后持續(xù)循環(huán)檢查,發(fā)現(xiàn)有任務(wù)完成了,就移除這個(gè)完成的任務(wù),并加入一個(gè)新的任務(wù),直到待爬的列表為空,這個(gè)任務(wù)列表也為空。代碼如下:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay): 
  7.     print(f'請(qǐng)求一個(gè)延遲為{delay}秒的接口'
  8.     async with httpx.AsyncClient(timeout=20) as client: 
  9.         resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  10.         result = resp.json() 
  11.         print(result) 
  12.  
  13.  
  14. async def main(): 
  15.     start = time.time() 
  16.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  17.     task_list = [] 
  18.     while True
  19.         if not delay_list and not task_list: 
  20.             break 
  21.         while len(task_list) < 3: 
  22.             if delay_list: 
  23.                 delay = delay_list.pop() 
  24.                 task = asyncio.create_task(req(delay)) 
  25.                 task_list.append(task) 
  26.             else
  27.                 break 
  28.         task_list = [task for task in task_list if not task.done()] 
  29.         await asyncio.sleep(1) 
  30.     end = time.time() 
  31.     print(f'一共耗時(shí):{end - start}'
  32.  
  33. asyncio.run(main()) 

運(yùn)行效果如下圖所示:

總共耗時(shí)大概28秒左右。比串行需要的58秒快了一半,但比全部同時(shí)并發(fā)多了一倍。

使用 Semaphore

asyncio 實(shí)際上自帶了一個(gè)限制協(xié)程數(shù)量的類,叫做Semaphore。我們只需要初始化它,傳入最大允許的協(xié)程數(shù)量,然后就可以通過上下文管理器來使用。我們看一下代碼:

  1. import asyncio 
  2. import httpx 
  3. import time 
  4.  
  5.  
  6. async def req(delay, sem): 
  7.     print(f'請(qǐng)求一個(gè)延遲為{delay}秒的接口'
  8.     async with sem: 
  9.         async with httpx.AsyncClient(timeout=20) as client: 
  10.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  11.             result = resp.json() 
  12.             print(result) 
  13.  
  14.  
  15. async def main(): 
  16.     start = time.time() 
  17.     delay_list = [3, 6, 1, 8, 2, 4, 5, 2, 7, 3, 9, 8] 
  18.     task_list = [] 
  19.     sem = asyncio.Semaphore(3) 
  20.     for delay in delay_list: 
  21.         task = asyncio.create_task(req(delay, sem)) 
  22.         task_list.append(task) 
  23.     await asyncio.gather(*task_list) 
  24.  
  25.     end = time.time() 
  26.     print(f'一共耗時(shí):{end - start}'
  27.  
  28. asyncio.run(main()) 

運(yùn)行效果如下圖所示:

耗時(shí)為22秒,比第一個(gè)方案更快。

我們來看看Semaphore的用法,它的格式為:

  1. sem = asyncio.Semaphore(同時(shí)運(yùn)行的協(xié)程數(shù)量) 
  2.  
  3. async def func(sem): 
  4.     async with sem: 
  5.         這里是并發(fā)執(zhí)行的代碼 
  6.  
  7. task_list = [] 
  8. for _ in range(總共需要執(zhí)行的任務(wù)數(shù)): 
  9.     task = asyncio.create_task(func(sem)) 
  10.     task_list.append(task) 
  11. await asyncio.gather(*task_list) 

當(dāng)我們要限制一個(gè)協(xié)程的并發(fā)數(shù)的時(shí)候,可以在調(diào)用協(xié)程之前,先初始化一個(gè)Semaphore對(duì)象。然后把這個(gè)對(duì)象傳到需要限制并發(fā)的協(xié)程里面,在協(xié)程里面,使用異步上下文管理器包住你的正式代碼:

  1. async with sem: 
  2.     正式代碼 

這樣一來,如果并發(fā)數(shù)沒有達(dá)到限制,那么async with sem會(huì)瞬間執(zhí)行完成,進(jìn)入里面的正式代碼中。如果并發(fā)數(shù)已經(jīng)達(dá)到了限制,那么其他的協(xié)程會(huì)阻塞在async with sem這個(gè)地方,直到正在運(yùn)行的某個(gè)協(xié)程完成了,退出了,才會(huì)放行一個(gè)新的協(xié)程去替換掉這個(gè)已經(jīng)完成的協(xié)程。

這個(gè)寫法其實(shí)跟多線程的加鎖很像。只不過鎖是確保同一個(gè)時(shí)間只有一個(gè)線程在運(yùn)行,而Semaphore可以人為指定能有多少個(gè)協(xié)程同時(shí)運(yùn)行。

如何限制1分鐘內(nèi)能夠運(yùn)行的協(xié)程數(shù)

可能同學(xué)看了上面的例子以后,只知道如何限制同時(shí)運(yùn)行的協(xié)程數(shù)。但是怎么限制在一段時(shí)間里同時(shí)運(yùn)行的協(xié)程數(shù)呢?

其實(shí)非常簡單,在并發(fā)的協(xié)程里面加個(gè) asyncio.sleep 就可以了。例如上面的例子,我想限制每分鐘只能有3個(gè)協(xié)程,那么可以把代碼改為:

  1. async def req(delay, sem): 
  2.     print(f'請(qǐng)求一個(gè)延遲為{delay}秒的接口'
  3.     async with sem: 
  4.         async with httpx.AsyncClient(timeout=20) as client: 
  5.             resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}'
  6.             result = resp.json() 
  7.             print(result) 
  8.     await asyncio.sleep(60) 

總結(jié)

如果大家要限制協(xié)程的并發(fā)數(shù),那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動(dòng)協(xié)程之前初始化它,然后傳給協(xié)程。要確保所有并發(fā)協(xié)程拿到的是同一個(gè)Semaphore對(duì)象。

當(dāng)然,你的程序里面,可能有多個(gè)不同的部分,有些部分限制并發(fā)數(shù)為 a,有些部分限制并發(fā)數(shù)為 b。那么你可以初始化多個(gè)Semaphore對(duì)象,分別傳給不同的協(xié)程。

本文轉(zhuǎn)載自微信公眾號(hào)「未聞Code」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系未聞Code公眾號(hào)。

 

責(zé)任編輯:武曉燕 來源: 未聞Code
相關(guān)推薦

2021-10-15 21:08:31

PandasExcel對(duì)象

2022-06-28 09:31:44

LinuxmacOS系統(tǒng)

2022-03-12 20:38:14

網(wǎng)頁Python測(cè)試

2025-05-28 03:15:00

Scrapy數(shù)據(jù)sleep

2024-08-27 22:08:13

2021-04-27 22:15:02

Selenium瀏覽器爬蟲

2023-10-28 12:14:35

爬蟲JavaScriptObject

2024-07-30 08:16:18

Python代碼工具

2024-07-30 08:11:16

2024-11-11 00:38:13

Mypy靜態(tài)類型

2021-05-08 19:33:51

移除字符零寬

2022-03-07 09:14:04

Selenium鼠標(biāo)元素

2021-02-14 22:22:18

格式圖片 HTTP

2024-11-13 09:18:09

2021-04-05 14:47:55

Python多線程事件監(jiān)控

2020-12-11 06:30:00

工具分組DataFrame

2023-10-29 09:16:49

代碼安全命令

2021-05-13 09:01:51

Cloud Flare瀏覽器網(wǎng)站

2021-11-12 05:00:43

裝飾器代碼功能

2024-10-16 21:47:15

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)