如何用最快的方式發(fā)送 10 萬個(gè) HTTP 請(qǐng)求
假如有一個(gè)文件,里面有 10 萬個(gè) url,需要對(duì)每個(gè) url 發(fā)送 http 請(qǐng)求,并打印請(qǐng)求結(jié)果的狀態(tài)碼,如何編寫代碼盡可能快的完成這些任務(wù)呢?
Python 并發(fā)編程有很多方法,多線程的標(biāo)準(zhǔn)庫 threading,concurrency,協(xié)程 asyncio,當(dāng)然還有 grequests 這種異步庫,每一個(gè)都可以實(shí)現(xiàn)上述需求,下面一一用代碼實(shí)現(xiàn)一下,本文的代碼可以直接運(yùn)行,給你以后的并發(fā)編程作為參考:
隊(duì)列+多線程
定義一個(gè)大小為 400 的隊(duì)列,然后開啟 200 個(gè)線程,每個(gè)線程都是不斷的從隊(duì)列中獲取 url 并訪問。
主線程讀取文件中的 url 放入隊(duì)列中,然后等待隊(duì)列中所有的元素都被接收和處理完畢。代碼如下:
- from threading import Thread
 - import sys
 - from queue import Queue
 - import requests
 - concurrent = 200
 - def doWork():
 - while True:
 - url = q.get()
 - status, url = getStatus(url)
 - doSomethingWithResult(status, url)
 - q.task_done()
 - def getStatus(ourl):
 - try:
 - res = requests.get(ourl)
 - return res.status_code, ourl
 - except:
 - return "error", ourl
 - def doSomethingWithResult(status, url):
 - print(status, url)
 - q = Queue(concurrent * 2)
 - for i in range(concurrent):
 - t = Thread(target=doWork)
 - t.daemon = True
 - t.start()
 - try:
 - for url in open("urllist.txt"):
 - q.put(url.strip())
 - q.join()
 - except KeyboardInterrupt:
 - sys.exit(1)
 
運(yùn)行結(jié)果如下:
有沒有 get 到新技能?
線程池
如果你使用線程池,推薦使用更高級(jí)的 concurrent.futures 庫:
- import concurrent.futures
 - import requests
 - out = []
 - CONNECTIONS = 100
 - TIMEOUT = 5
 - urls = []
 - with open("urllist.txt") as reader:
 - for url in reader:
 - urls.append(url.strip())
 - def load_url(url, timeout):
 - ans = requests.get(url, timeout=timeout)
 - return ans.status_code
 - with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
 - future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
 - for future in concurrent.futures.as_completed(future_to_url):
 - try:
 - data = future.result()
 - except Exception as exc:
 - data = str(type(exc))
 - finally:
 - out.append(data)
 - print(data)
 
協(xié)程 + aiohttp
協(xié)程也是并發(fā)非常常用的工具了:
- import asyncio
 - from aiohttp import ClientSession, ClientConnectorError
 - async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
 - try:
 - resp = await session.request(method="GET", url=url, **kwargs)
 - except ClientConnectorError:
 - return (url, 404)
 - return (url, resp.status)
 - async def make_requests(urls: set, **kwargs) -> None:
 - async with ClientSession() as session:
 - tasks = []
 - for url in urls:
 - tasks.append(
 - fetch_html(url=url, session=session, **kwargs)
 - )
 - results = await asyncio.gather(*tasks)
 - for result in results:
 - print(f'{result[1]} - {str(result[0])}')
 - if __name__ == "__main__":
 - import sys
 - assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
 - with open("urllist.txt") as infile:
 - urls = set(map(str.strip, infile))
 - asyncio.run(make_requests(urls=urls))
 
grequests[1]
這是個(gè)第三方庫,目前有 3.8K 個(gè)星,就是 Requests + Gevent[2],讓異步 http 請(qǐng)求變得更加簡(jiǎn)單。Gevent 的本質(zhì)還是協(xié)程。
使用前:
- pip install grequests
 
使用起來那是相當(dāng)?shù)暮?jiǎn)單:
- import grequests
 - urls = []
 - with open("urllist.txt") as reader:
 - for url in reader:
 - urls.append(url.strip())
 - rs = (grequests.get(u) for u in urls)
 - for result in grequests.map(rs):
 - print(result.status_code, result.url)
 
注意 grequests.map(rs) 是并發(fā)執(zhí)行的。運(yùn)行結(jié)果如下:
也可以加入異常處理:
- >>> def exception_handler(request, exception):
 - ... print("Request failed")
 - >>> reqs = [
 - ... grequests.get('http://httpbin.org/delay/1', timeout=0.001),
 - ... grequests.get('http://fakedomain/'),
 - ... grequests.get('http://httpbin.org/status/500')]
 - >>> grequests.map(reqs, exception_handler=exception_handler)
 - Request failed
 - Request failed
 - [None, None, <Response [500]>]
 
最后的話
今天分享了并發(fā) http 請(qǐng)求的幾種實(shí)現(xiàn)方式,有人說異步(協(xié)程)性能比多線程好,其實(shí)要分場(chǎng)景看的,沒有一種方法適用所有的場(chǎng)景,筆者就曾做過一個(gè)實(shí)驗(yàn),也是請(qǐng)求 url,當(dāng)并發(fā)數(shù)量超過 500 時(shí),協(xié)程明顯變慢。

















 
 
 
















 
 
 
 