偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

超實用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 實現(xiàn)一個異步任務工作流

數(shù)據(jù)庫 其他數(shù)據(jù)庫
今天分享一份代碼,使用 Celery、RabbitMQ 和 MongoDB 實現(xiàn)一個異步任務工作流,你可以修改 task.py 來實現(xiàn)你自己的異步任務。

異步任務,是 Web 開發(fā)中經常遇到的問題,比如說用戶提交了一個請求,雖然這個請求對應的任務非常耗時,但是不能讓用戶等在這里,通常需要立即返回結果,告訴用戶任務已提交。任務可以在后續(xù)慢慢完成,完成后再給用戶發(fā)一個完成的通知。

今天分享一份代碼,使用 Celery、RabbitMQ 和 MongoDB 實現(xiàn)一個異步任務工作流,你可以修改 task.py 來實現(xiàn)你自己的異步任務。

架構圖如下:

圖片

其中 Celery 來執(zhí)行異步任務,RabbitMQ 作為消息隊列,MongoDB 存儲任務執(zhí)行結果,F(xiàn)astAPI 提供 Web 接口。

以上所有模塊均可使用 Docker 一鍵部署。

下面為 Demo 使用方法:

1、確保本機已安裝 Docker、Git

2、下載源代碼:

git clone https://github.com/aarunjith/async-demo.git

3、部署并啟動:

cd async-demo
docker compose up --build

4、啟動一個異步任務:

$ curl -X POST http://localhost:8080/process

任務會發(fā)送到消息隊列,同時會立即返回一個任務 id:

? curl -X POST http://localhost:8080/process
{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%

5、查詢任務狀態(tài):

curl -X POST http://localhost:8080/check_progress/<task_id>

任務完成后的返回結果如下:

 ? curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{"status":"SUCEESS","data":"\"hello\""}%

代碼目錄結構如下:

圖片

其中 app.py 如下:

from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn

# Lets create a connection to our backend where celery stores the results
client = MongoClient("mongodb://mongodb:27017")

# Default database and collection names that Celery create
db = client['task_results']
coll = db["celery_taskmeta"]

app = FastAPI()


@app.post('/process')
async def process_text_file():
'''
Process endpoint to trigger the start of a process
'''
try:
result = start_processing.delay()
logger.info(f'Started processing the task with id {result.id}')
return {
"status": result.state,
'id': result.id,
'error': ''
}
except Exception as e:
logger.info(f'Task Execution failed: {e}')
return {
"status": "FAILURE",
'id': None,
'error': e
}


@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
'''
Endpoint to check the task progress and fetch the results if the task is
complete.
'''
try:
result = AsyncResult(task_id)
if result.ready():
data = coll.find({'_id': task_id})[0]
return {'status': 'SUCEESS', 'data': data['result']}
else:
return {"status": result.state, "error": ''}
except Exception as e:
data = coll.find({'_id': task_id})[0]
if data:
return {'status': 'SUCEESS', 'data': data['result']}
return {'status': 'Task ID invalid', 'error': e}

if __name__ == "__main__":
uvicorn.run("app:app", host='0.0.0.0', port='8080')

如果要實現(xiàn)自己的任務隊列,就修改 task.py 來添加自己的異步任務,可以整合到自己的項目中。

責任編輯:武曉燕 來源: Python七號
相關推薦

2025-06-03 08:15:00

微服務架構異步任務隊列

2023-06-05 08:14:17

RabbitMQ兔子MQ開源

2009-06-11 14:48:48

jbpm工作流引擎jbpm例子

2021-12-08 07:06:54

命令 Fastapi Celery

2021-10-14 11:34:05

技術工作流引擎

2021-03-05 07:47:07

工作流引擎節(jié)點

2009-03-03 09:13:36

工作流BPM業(yè)務流程

2023-12-07 18:02:38

RabbitMQ異步通信

2025-05-14 03:20:00

AgenticAIMCP

2023-03-31 13:01:31

PythonCelery驗證

2013-09-29 17:13:59

PowerShell工作流

2022-10-26 08:00:43

Activiti工作流BPM

2025-04-15 08:20:00

FastAPI異步函數(shù)

2009-09-22 12:15:06

ibmdwLotus

2025-08-28 18:08:18

多模型設置異步

2010-11-26 10:59:28

SharePoint

2009-06-11 14:43:34

jbpm工作流引擎jBPM搭建

2013-04-23 10:28:08

IBeamMDAAWF

2024-04-25 08:00:00

DevOps架構軟件開發(fā)

2015-03-23 11:17:55

docker高效開發(fā)工作流
點贊
收藏

51CTO技術棧公眾號