FastAPI 背景任務(wù)全解析:異步執(zhí)行長(zhǎng)任務(wù)的正確姿勢(shì)!
你是否遇到這種場(chǎng)景:
- 注冊(cè)接口響應(yīng)太慢,只因要發(fā)送郵件?
- 每次寫(xiě)日志都阻塞了響應(yīng)返回?
- 想跑個(gè)后臺(tái)任務(wù),卻不知從哪開(kāi)始?
這篇文章,帶你徹底掌握 FastAPI 的后臺(tái)任務(wù)機(jī)制(Background Tasks):它不是 async,不是 Celery,但輕巧高效,正是你項(xiàng)目中最實(shí)用的異步利器之一!
什么是 Background Tasks?
Background Tasks 是 FastAPI 提供的一種輕量異步任務(wù)執(zhí)行方式。
它的核心思路:
- 在 HTTP 請(qǐng)求響應(yīng)返回之后,自動(dòng)執(zhí)行一段你注冊(cè)好的函數(shù)
- 適用于不需要立即返回結(jié)果、也不強(qiáng)依賴(lài)成功率的任務(wù)
例如:
- 發(fā)郵件(用戶(hù)不需要等發(fā)完郵件才得到響應(yīng))
- 寫(xiě)日志(用戶(hù)不需要日志寫(xiě)完才看到結(jié)果)
- 異步清理操作、緩存刷新、埋點(diǎn)統(tǒng)計(jì)等
如何使用 BackgroundTasks?
基本用法:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/log")
async def log_operation(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "用戶(hù)訪問(wèn)了 /log 接口")
return {"message": "操作已記錄(異步執(zhí)行)"}
參數(shù)說(shuō)明:
參數(shù) | 說(shuō)明 |
BackgroundTasks | FastAPI 自動(dòng)注入的后臺(tái)任務(wù)對(duì)象 |
add_task(func, *args, **kwargs) | 注冊(cè)后臺(tái)執(zhí)行的函數(shù)和參數(shù) |
你的函數(shù)(非 async) | 必須是普通函數(shù)(同步),不能是 async 函數(shù)! |
為什么函數(shù)不能是 async 的?
因?yàn)?FastAPI 的 BackgroundTasks 是基于 Starlette 的同步回調(diào)機(jī)制,它在響應(yīng)后通過(guò)事件循環(huán)注冊(cè)任務(wù),并同步執(zhí)行你傳入的函數(shù)。
如果你寫(xiě)成 async def,它不會(huì)被 await,也不會(huì)進(jìn)入事件循環(huán),最終無(wú)法運(yùn)行。
如果我一定要運(yùn)行 async 函數(shù)怎么辦?
可以使用 Python 的 asyncio.create_task() 方法間接執(zhí)行:
import asyncio
asyncdefasync_job():
print("異步任務(wù)開(kāi)始")
await asyncio.sleep(2)
print("異步任務(wù)結(jié)束")
defwrapper_async_job():
asyncio.create_task(async_job()) # 間接運(yùn)行 async 函數(shù)
@app.get("/run")
asyncdefrun(background_tasks: BackgroundTasks):
background_tasks.add_task(wrapper_async_job)
return {"msg": "異步任務(wù)已觸發(fā)"}
這樣就能兼顧 async def 與 BackgroundTasks!
真實(shí)案例:注冊(cè)后異步發(fā)送郵件
文件結(jié)構(gòu)如下:
project/
├── main.py
└── tasks/
└── email.py
tasks/email.py:
import time
def send_welcome_email(email: str):
time.sleep(3) # 模擬耗時(shí)任務(wù)
print(f"?? 歡迎郵件已發(fā)送到 {email}")
main.py:
from fastapi import FastAPI, BackgroundTasks
from tasks.email import send_welcome_email
app = FastAPI()
@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
# 假設(shè)已完成注冊(cè)流程
background_tasks.add_task(send_welcome_email, email)
return {"msg": "注冊(cè)成功,歡迎郵件將在后臺(tái)發(fā)送"}
用戶(hù)幾乎瞬間得到響應(yīng),而郵件將在后臺(tái)異步執(zhí)行,提升體驗(yàn)。
支持多個(gè)后臺(tái)任務(wù)?
當(dāng)然可以!只需多次調(diào)用 add_task():
background_tasks.add_task(write_log, "用戶(hù)注冊(cè)")
background_tasks.add_task(send_welcome_email, email)
background_tasks.add_task(refresh_cache, user_id)
它們會(huì)按順序在響應(yīng)后依次執(zhí)行。
與 Depends 依賴(lài)注入結(jié)合使用
你可以在依賴(lài)函數(shù)里注入 BackgroundTasks,實(shí)現(xiàn)邏輯更解耦:
def audit(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "訪問(wèn)審計(jì)")
return
@app.get("/data")
async def get_data(dep=Depends(audit)):
return {"msg": "數(shù)據(jù)已獲取"}
更高級(jí)的場(chǎng)景中,你可以把審計(jì)、埋點(diǎn)、通知等邏輯都封裝進(jìn) Depends,主業(yè)務(wù)邏輯保持干凈清晰。
BackgroundTasks 適合什么場(chǎng)景?
適合:
場(chǎng)景 | 示例 |
郵件通知 | 注冊(cè)成功、忘記密碼等 |
日志記錄 | 用戶(hù)行為埋點(diǎn)、接口訪問(wèn)日志 |
文件清理 | 上傳后刪除舊文件 |
緩存更新 | 數(shù)據(jù)變動(dòng)后刷新緩存 |
不適合:
- 需要 重試、隊(duì)列控制、任務(wù)狀態(tài)追蹤 的任務(wù)
- 耗時(shí)非常長(zhǎng)、可能失敗的操作
對(duì)此建議使用 Celery、Dramatiq、APScheduler 等專(zhuān)業(yè)方案。
多任務(wù)并發(fā)支持嗎?
支持,但有限制:
- 每個(gè)請(qǐng)求的任務(wù)是串行執(zhí)行的,不并發(fā)
- 如果任務(wù)內(nèi)部使用了 asyncio.create_task(),可以實(shí)現(xiàn)并發(fā)邏輯
- 要實(shí)現(xiàn)更強(qiáng)大的并發(fā)處理,建議交給專(zhuān)門(mén)的任務(wù)系統(tǒng)處理(如 Celery)
與 Celery 的對(duì)比
特性 | BackgroundTasks | Celery |
部署復(fù)雜度 | 簡(jiǎn)單,無(wú)需額外組件 | 依賴(lài) Redis/RabbitMQ |
是否異步 | ? | ? |
重試機(jī)制 | ? 無(wú) | ? 自動(dòng)重試 |
任務(wù)隊(duì)列 | ? 無(wú) | ? 內(nèi)置隊(duì)列系統(tǒng) |
狀態(tài)追蹤 | ? 無(wú) | ? 支持狀態(tài)與監(jiān)控 |
適用場(chǎng)景 | 簡(jiǎn)單后臺(tái)邏輯 | 復(fù)雜異步任務(wù) |
總結(jié)
FastAPI 的 BackgroundTasks 非常適合:
- 小而輕的異步任務(wù)(如發(fā)郵件、寫(xiě)日志)
- 不要求任務(wù)可追蹤、重試的場(chǎng)景
- 快速開(kāi)發(fā)原型、簡(jiǎn)潔易用的后端服務(wù)