FastAPI + RabbitMQ:構(gòu)建高性能異步任務(wù)系統(tǒng)
在現(xiàn)代微服務(wù)架構(gòu)中,任務(wù)解耦 和 異步處理 是系統(tǒng)擴(kuò)展能力的關(guān)鍵。本文將帶你使用 FastAPI + RabbitMQ 構(gòu)建一個(gè)簡(jiǎn)單的 異步任務(wù)隊(duì)列,模擬一個(gè)耗時(shí)的任務(wù)(如發(fā)送郵件),由后臺(tái)獨(dú)立 worker 消費(fèi)執(zhí)行。

技術(shù)棧
- FastAPI(主服務(wù),負(fù)責(zé)接收請(qǐng)求)
 - RabbitMQ(消息隊(duì)列)
 - aio-pika(Python 異步 RabbitMQ 客戶端)
 
系統(tǒng)架構(gòu)簡(jiǎn)圖
用戶請(qǐng)求 --> FastAPI(推送任務(wù)) --> RabbitMQ
                    ↓
             Worker(消費(fèi)執(zhí)行)安裝依賴
pip install fastapi uvicorn aio-pika pydanticRabbitMQ 環(huán)境準(zhǔn)備
可以用 Docker 啟動(dòng) RabbitMQ 服務(wù):
docker run -d --hostname rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management- 5672:RabbitMQ 消息端口
 - 15672:Web 管理后臺(tái)(默認(rèn)賬號(hào)密碼都是 guest)
 
FastAPI 應(yīng)用(producer)
# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
import aio_pika
app = FastAPI()
class TaskRequest(BaseModel):
    user_email: str
    message: str
RABBITMQ_URL = "amqp://guest:guest@localhost/"
@app.on_event("startup")
async def startup():
    app.state.rabbit_connection = await aio_pika.connect_robust(RABBITMQ_URL)
@app.on_event("shutdown")
async def shutdown():
    await app.state.rabbit_connection.close()
@app.post("/send-task")
async def send_task(task: TaskRequest):
    channel = await app.state.rabbit_connection.channel()
    queue = await channel.declare_queue("task_queue", durable=True)
    # 構(gòu)造消息
    msg_body = task.json().encode()
    message = aio_pika.Message(body=msg_body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT)
    await channel.default_exchange.publish(message, routing_key="task_queue")
    return {"status": "success", "msg": "任務(wù)已入隊(duì)"}消費(fèi)者 Worker(consumer)
# worker.py
import asyncio
import json
import aio_pika
RABBITMQ_URL = "amqp://guest:guest@localhost/"
async def main():
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue = await channel.declare_queue("task_queue", durable=True)
    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            data = json.loads(message.body)
            print(f"?? 收到任務(wù):發(fā)送郵件至 {data['user_email']},內(nèi)容:{data['message']}")
            await asyncio.sleep(2)  # 模擬耗時(shí)任務(wù)
            print("? 郵件發(fā)送完成")
    print("?? Worker 正在等待任務(wù)...")
    await queue.consume(on_message)
if __name__ == "__main__":
    asyncio.run(main())啟動(dòng)服務(wù)
啟動(dòng) FastAPI:
uvicorn app.main:app --reload啟動(dòng) worker:
python worker.py測(cè)試
發(fā)送 POST 請(qǐng)求到 /send-task:
POST http://localhost:8000/send-task
{
  "user_email": "test@example.com",
  "message": "歡迎使用 FastAPI + RabbitMQ!"
}終端會(huì)看到 worker 消費(fèi)消息并執(zhí)行任務(wù)的輸出。
總結(jié)
通過(guò) FastAPI + RabbitMQ,可以輕松實(shí)現(xiàn)異步任務(wù)分發(fā)系統(tǒng):
- 主服務(wù)響應(yīng)快速,避免卡頓
 - 異步 worker 后臺(tái)處理,任務(wù)解耦
 - RabbitMQ 提供可靠、高可用的消息傳遞機(jī)制
 















 
 
 











 
 
 
 