偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

FastAPI + RabbitMQ:構(gòu)建高性能異步任務(wù)系統(tǒng)

開(kāi)發(fā) 架構(gòu)
本文將帶你使用 FastAPI + RabbitMQ 構(gòu)建一個(gè)簡(jiǎn)單的 異步任務(wù)隊(duì)列,模擬一個(gè)耗時(shí)的任務(wù)(如發(fā)送郵件),由后臺(tái)獨(dú)立 worker 消費(fèi)執(zhí)行。

在現(xiàn)代微服務(wù)架構(gòu)中,任務(wù)解耦 和 異步處理 是系統(tǒng)擴(kuò)展能力的關(guān)鍵。本文將帶你使用 FastAPI + RabbitMQ 構(gòu)建一個(gè)簡(jiǎn)單的 異步任務(wù)隊(duì)列,模擬一個(gè)耗時(shí)的任務(wù)(如發(fā)送郵件),由后臺(tái)獨(dú)立 worker 消費(fèi)執(zhí)行。

技術(shù)棧

  • FastAPI(主服務(wù),負(fù)責(zé)接收請(qǐng)求)
  • RabbitMQ(消息隊(duì)列)
  • aio-pika(Python 異步 RabbitMQ 客戶端)

系統(tǒng)架構(gòu)簡(jiǎn)圖

用戶請(qǐng)求 --> FastAPI(推送任務(wù)) --> RabbitMQ
                    ↓
             Worker(消費(fèi)執(zhí)行)

安裝依賴

pip install fastapi uvicorn aio-pika pydantic

RabbitMQ 環(huán)境準(zhǔn)備

可以用 Docker 啟動(dòng) RabbitMQ 服務(wù):

docker run -d --hostname rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
  • 5672:RabbitMQ 消息端口
  • 15672:Web 管理后臺(tái)(默認(rèn)賬號(hào)密碼都是 guest)

FastAPI 應(yīng)用(producer)

# app/main.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
import aio_pika

app = FastAPI()

class TaskRequest(BaseModel):
    user_email: str
    message: str

RABBITMQ_URL = "amqp://guest:guest@localhost/"

@app.on_event("startup")
async def startup():
    app.state.rabbit_connection = await aio_pika.connect_robust(RABBITMQ_URL)

@app.on_event("shutdown")
async def shutdown():
    await app.state.rabbit_connection.close()

@app.post("/send-task")
async def send_task(task: TaskRequest):
    channel = await app.state.rabbit_connection.channel()
    queue = await channel.declare_queue("task_queue", durable=True)

    # 構(gòu)造消息
    msg_body = task.json().encode()
    message = aio_pika.Message(body=msg_body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT)

    await channel.default_exchange.publish(message, routing_key="task_queue")

    return {"status": "success", "msg": "任務(wù)已入隊(duì)"}

消費(fèi)者 Worker(consumer)

# worker.py
import asyncio
import json
import aio_pika

RABBITMQ_URL = "amqp://guest:guest@localhost/"

async def main():
    connection = await aio_pika.connect_robust(RABBITMQ_URL)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue = await channel.declare_queue("task_queue", durable=True)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            data = json.loads(message.body)
            print(f"?? 收到任務(wù):發(fā)送郵件至 {data['user_email']},內(nèi)容:{data['message']}")
            await asyncio.sleep(2)  # 模擬耗時(shí)任務(wù)
            print("? 郵件發(fā)送完成")

    print("?? Worker 正在等待任務(wù)...")
    await queue.consume(on_message)

if __name__ == "__main__":
    asyncio.run(main())

啟動(dòng)服務(wù)

啟動(dòng) FastAPI:

uvicorn app.main:app --reload

啟動(dòng) worker:

python worker.py

測(cè)試

發(fā)送 POST 請(qǐng)求到 /send-task:

POST http://localhost:8000/send-task
{
  "user_email": "test@example.com",
  "message": "歡迎使用 FastAPI + RabbitMQ!"
}

終端會(huì)看到 worker 消費(fèi)消息并執(zhí)行任務(wù)的輸出。

總結(jié)

通過(guò) FastAPI + RabbitMQ,可以輕松實(shí)現(xiàn)異步任務(wù)分發(fā)系統(tǒng):

  • 主服務(wù)響應(yīng)快速,避免卡頓
  • 異步 worker 后臺(tái)處理,任務(wù)解耦
  • RabbitMQ 提供可靠、高可用的消息傳遞機(jī)制
責(zé)任編輯:趙寧寧 來(lái)源: Ssoul肥魚(yú)
相關(guān)推薦

2025-09-08 06:10:00

FastAPI開(kāi)發(fā)web

2025-09-08 11:00:00

2025-09-09 07:00:00

數(shù)據(jù)庫(kù)FastAPI開(kāi)發(fā)

2024-10-09 11:31:51

2025-01-13 12:23:51

2023-11-06 08:32:17

FastAPIPython

2022-12-09 08:40:56

高性能內(nèi)存隊(duì)列

2025-04-15 08:20:00

FastAPI異步函數(shù)

2011-10-21 14:20:59

高性能計(jì)算HPC虛擬化

2011-10-25 13:13:35

HPC高性能計(jì)算Platform

2023-03-13 07:40:44

高并發(fā)golang

2022-10-08 07:55:33

DemoMongoDB異步

2023-10-28 09:05:38

2011-12-15 13:28:57

2025-03-04 08:00:00

機(jī)器學(xué)習(xí)Rust開(kāi)發(fā)

2020-06-05 07:20:41

測(cè)試自動(dòng)化環(huán)境

2012-04-17 16:48:43

應(yīng)用優(yōu)化負(fù)載均衡Array APV

2009-06-03 14:24:12

ibmdwWebSphere

2009-10-29 09:11:50

Juniper高性能網(wǎng)絡(luò)

2011-02-23 09:49:40

ASP.NET
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)