偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

異步編程進(jìn)階:asyncio、threading 和多進(jìn)程在實戰(zhàn)中的選擇

開發(fā)
在Python并發(fā)編程中,沒有絕對的最優(yōu)方案,只有最適合當(dāng)前場景的方案。理解GIL的影響、掌握三種方案的特點和適用場景,選擇合適的方案進(jìn)行組合使用,這才是實戰(zhàn)中的最佳實踐。

Python中的并發(fā)編程一直是開發(fā)者的難題。asyncio提供了異步編程的原生支持,threading提供了多線程能力,multiprocessing提供了多進(jìn)程支持。這三種方案各有所長,在不同的場景中發(fā)揮不同的作用。

本文將基于真實的應(yīng)用場景,深入分析這三種并發(fā)方案的原理、性能和最佳實踐,幫助你在實戰(zhàn)中做出正確的選擇。

三種并發(fā)模型的本質(zhì)區(qū)別

Python的全局解釋器鎖(GIL)是理解三種并發(fā)模型的關(guān)鍵:

┌─────────────────────────────────────┐
│        Python解釋器                  │
│  ┌──────────────────────────────┐  │
│  │      GIL(全局鎖)              │  │
│  │  一次只允許一個線程執(zhí)行      │  │
│  └──────────────────────────────┘  │
└─────────────────────────────────────┘

asyncio:單線程,在I/O等待時切換任務(wù) ? 不受GIL影響
threading:多線程,但受GIL限制 ? CPU密集型無法并行
multiprocessing:多進(jìn)程,每個進(jìn)程獨立GIL ? CPU密集型可并行

方案一:asyncio - I/O密集型的最優(yōu)選擇

asyncio的工作原理:

import asyncio

asyncdeffetch_data(url):
    """模擬獲取數(shù)據(jù)"""
    print(f"開始獲取 {url}")
    await asyncio.sleep(2)  # 模擬I/O操作
    print(f"完成獲取 {url}")
    returnf"Data from {url}"

asyncdefmain():
    # 并發(fā)執(zhí)行多個異步任務(wù)
    tasks = [
        fetch_data("http://example.com/1"),
        fetch_data("http://example.com/2"),
        fetch_data("http://example.com/3"),
    ]
    results = await asyncio.gather(*tasks)
    return results

# 運(yùn)行
results = asyncio.run(main())
# 耗時:約2秒(并發(fā)),而不是6秒(順序)

asyncio的高級用法:

import asyncio
from typing import AsyncGenerator

# 異步生成器
asyncdefasync_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

# 異步上下文管理器
classAsyncResource:
    asyncdef__aenter__(self):
        print("獲取資源")
        await asyncio.sleep(1)
        return self
    
    asyncdef__aexit__(self, exc_type, exc_val, exc_tb):
        print("釋放資源")
        await asyncio.sleep(1)

# 使用
asyncdefuse_resource():
    asyncwith AsyncResource() as resource:
        print("使用資源")

asyncio.run(use_resource())

asyncio的性能特性:

import asyncio
import time
import aiohttp

asyncdefbenchmark_asyncio():
    """測試asyncio處理1000個并發(fā)請求的性能"""
    start = time.time()
    
    asyncdeffetch(session, url):
        try:
            asyncwith session.get(url, timeout=5) as response:
                returnawait response.text()
        except:
            returnNone
    
    asyncwith aiohttp.ClientSession() as session:
        tasks = [fetch(session, f"http://example.com/{i}") for i in range(1000)]
        results = await asyncio.gather(*tasks)
    
    end = time.time()
    print(f"asyncio耗時:{end - start:.2f}秒,成功請求:{len([r for r in results if r])}")

asyncio.run(benchmark_asyncio())

方案二:threading - 輕量級并發(fā)

threading的適用場景:

import threading
import time
from queue import Queue

defworker(queue, worker_id):
    """工作線程"""
    whileTrue:
        task = queue.get()
        if task isNone:
            break
        print(f"Worker {worker_id} 處理任務(wù) {task}")
        time.sleep(1)
        queue.task_done()

defmain_threading():
    queue = Queue()
    num_workers = 4
    
    # 創(chuàng)建并啟動工作線程
    threads = []
    for i in range(num_workers):
        t = threading.Thread(target=worker, args=(queue, i))
        t.start()
        threads.append(t)
    
    # 添加任務(wù)
    for i in range(10):
        queue.put(i)
    
    # 等待所有任務(wù)完成
    queue.join()
    
    # 停止工作線程
    for _ in range(num_workers):
        queue.put(None)
    for t in threads:
        t.join()

main_threading()

threading的局限性:

import threading
import time

defcpu_intensive():
    """CPU密集型計算"""
    total = 0
    for i in range(100000000):
        total += i
    return total

# 單線程執(zhí)行
start = time.time()
cpu_intensive()
cpu_intensive()
print(f"單線程耗時:{time.time() - start:.2f}秒")

# 多線程執(zhí)行(受GIL影響,實際更慢)
start = time.time()
t1 = threading.Thread(target=cpu_intensive)
t2 = threading.Thread(target=cpu_intensive)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多線程耗時:{time.time() - start:.2f}秒")
# 結(jié)果:多線程因GIL競爭反而更慢!

threading的正確用途:

import threading
import time
import requests
from concurrent.futures import ThreadPoolExecutor

deffetch_url(url):
    """獲取URL內(nèi)容"""
    try:
        response = requests.get(url, timeout=5)
        return len(response.content)
    except:
        return0

defbenchmark_threading():
    urls = ["http://example.com"] * 100
    
    # 使用線程池
    with ThreadPoolExecutor(max_workers=10) as executor:
        sizes = list(executor.map(fetch_url, urls))
    
    print(f"成功獲取 {len([s for s in sizes if s > 0])} 個URL")

benchmark_threading()

方案三:multiprocessing - CPU密集型的利器

multiprocessing基礎(chǔ):

import multiprocessing
import time

defcpu_intensive(n):
    """CPU密集型計算"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

defmain_multiprocessing():
    # 創(chuàng)建進(jìn)程池
    with multiprocessing.Pool(processes=4) as pool:
        tasks = [100000000] * 4
        start = time.time()
        results = pool.map(cpu_intensive, tasks)
        end = time.time()
    
    print(f"多進(jìn)程耗時:{end - start:.2f}秒")
    print(f"結(jié)果:{results}")

if __name__ == '__main__':
    main_multiprocessing()

multiprocessing的進(jìn)程間通信:

import multiprocessing
from multiprocessing import Queue, Pipe

defworker_queue(queue):
    """通過隊列通信"""
    queue.put("Message from worker")

defmain():
    # 方法1:使用隊列
    queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker_queue, args=(queue,))
    p.start()
    message = queue.get()
    print(f"收到消息:{message}")
    p.join()
    
    # 方法2:使用管道
    parent_conn, child_conn = multiprocessing.Pipe()
    
    defworker_pipe(conn):
        conn.send("Hello from pipe")
        conn.close()
    
    p = multiprocessing.Process(target=worker_pipe, args=(child_conn,))
    p.start()
    message = parent_conn.recv()
    print(f"收到消息:{message}")
    p.join()

if __name__ == '__main__':
    main()

性能對比與最佳實踐

綜合性能測試:

import asyncio
import threading
import multiprocessing
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

defbenchmark_all():
    """綜合性能對比"""
    
    # 測試1:I/O密集型(網(wǎng)絡(luò)請求)
    defio_task():
        try:
            requests.get("http://httpbin.org/delay/1", timeout=5)
            return1
        except:
            return0
    
    # asyncio版本
    asyncdefio_asyncio():
        import aiohttp
        asyncwith aiohttp.ClientSession() as session:
            tasks = []
            for _ in range(10):
                tasks.append(io_asyncio_task(session))
            returnawait asyncio.gather(*tasks)
    
    asyncdefio_asyncio_task(session):
        try:
            asyncwith session.get("http://httpbin.org/delay/1", timeout=5) as r:
                return1
        except:
            return0
    
    # threading版本
    defio_threading():
        with ThreadPoolExecutor(max_workers=10) as executor:
            return list(executor.map(lambda _: io_task(), range(10)))
    
    # 測試CPU密集型
    defcpu_task():
        return sum(i ** 2for i in range(10000000))
    
    # threading版本(CPU密集型)
    defcpu_threading():
        with ThreadPoolExecutor(max_workers=4) as executor:
            return list(executor.map(lambda _: cpu_task(), range(4)))
    
    # multiprocessing版本
    defcpu_multiprocessing():
        with ProcessPoolExecutor(max_workers=4) as executor:
            return list(executor.map(lambda _: cpu_task(), range(4)))
    
    print("=== 性能對比 ===")
    
    # I/O測試
    print("\n1. I/O密集型(10個網(wǎng)絡(luò)請求)")
    start = time.time()
    io_threading()
    print(f"threading耗時:{time.time() - start:.2f}秒")
    
    # CPU測試
    print("\n2. CPU密集型(4個大計算)")
    start = time.time()
    cpu_threading()
    print(f"threading耗時:{time.time() - start:.2f}秒")
    
    start = time.time()
    cpu_multiprocessing()
    print(f"multiprocessing耗時:{time.time() - start:.2f}秒")

benchmark_all()

選擇決策樹

┌─ 是I/O密集型嗎?
│  ├─ Yes ──> 是否需要實時響應(yīng)?
│  │          ├─ Yes ──> 使用 asyncio(推薦)
│  │          └─ No ──> 可用threading或asyncio
│  │
│  └─ No ──> 是CPU密集型嗎?
│             ├─ Yes ──> 使用 multiprocessing
│             └─ No ──> 數(shù)據(jù)處理量小?
│                       ├─ Yes ──> 單線程即可
│                       └─ No ──> 使用threading

實戰(zhàn)案例:混合方案

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

asyncdefhybrid_approach():
    """混合使用異步和多進(jìn)程"""
    
    defcpu_intensive(n):
        # CPU密集計算
        return sum(i ** 2for i in range(n))
    
    # 先用asyncio并發(fā)發(fā)起任務(wù)
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = []
        for i in range(10):
            # 在線程池中運(yùn)行CPU密集操作
            task = loop.run_in_executor(executor, cpu_intensive, 10000000)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
    
    return results

asyncio.run(hybrid_approach())

結(jié)尾

在Python并發(fā)編程中,沒有絕對的最優(yōu)方案,只有最適合當(dāng)前場景的方案。asyncio適合I/O密集型的高并發(fā)場景,threading提供了輕量級的并發(fā)支持,multiprocessing則是CPU密集型計算的終極武器。理解GIL的影響、掌握三種方案的特點和適用場景,選擇合適的方案進(jìn)行組合使用,這才是實戰(zhàn)中的最佳實踐。隨著FastAPI等現(xiàn)代框架的普及,asyncio已經(jīng)成為主流選擇,但在處理混合型應(yīng)用時,仍然需要靈活地運(yùn)用三種方案。

責(zé)任編輯:趙寧寧 來源: Python數(shù)智工坊
相關(guān)推薦

2017-08-02 15:00:12

PythonAsyncio異步編程

2017-05-05 08:44:24

PythonAsyncio異步編程

2024-03-29 06:44:55

Python多進(jìn)程模塊工具

2021-06-11 06:54:35

PythonThreadingMultiproces

2023-12-11 18:18:24

Python編程線程

2023-08-30 08:43:42

asyncioaiohttp

2010-10-15 08:57:15

PHP多進(jìn)程

2025-06-03 08:27:58

Python異步IO編程

2019-02-26 11:15:25

進(jìn)程多線程多進(jìn)程

2023-05-10 07:47:08

Python并發(fā)編程

2024-12-27 08:11:44

Python編程模式IO

2021-04-20 12:39:52

Node.js多線程多進(jìn)程

2021-03-23 07:56:54

JS基礎(chǔ)同步異步編程EventLoop底層

2021-08-04 23:30:28

Node.js開發(fā)線程

2011-02-22 09:09:21

.NETAsync CTP異步

2011-02-22 08:49:16

.NET同步異步

2023-11-28 13:52:00

Python多進(jìn)程多線程

2021-02-25 11:19:37

谷歌Android開發(fā)者

2024-08-26 08:39:26

PHP孤兒進(jìn)程僵尸進(jìn)程

2023-11-01 11:20:57

點贊
收藏

51CTO技術(shù)棧公眾號