采用LangGraph集成多個(gè)MCP服務(wù)器的應(yīng)用
原創(chuàng)現(xiàn)代人工智能應(yīng)用程序通常需要對(duì)不同的語(yǔ)言模型和專門的服務(wù)器進(jìn)行復(fù)雜的編排,每個(gè)服務(wù)器在更大的工作流中處理特定的任務(wù)。然而,這種分布式方法引入了一個(gè)關(guān)鍵的挑戰(zhàn):保持上下文的連續(xù)性。
當(dāng)會(huì)話或任務(wù)在不同的模型或服務(wù)器之間轉(zhuǎn)換時(shí),上下文信息很容易丟失。用戶體驗(yàn)到的是,人工智能系統(tǒng)似乎 “忘記” 了對(duì)話的早期部分,導(dǎo)致體驗(yàn)支離破碎,效用降低。
1. MCP 的核心
模型上下文協(xié)議(Model Context Protocol, MCP)為在分布式人工智能系統(tǒng)中各組件之間一致地保存和高效傳輸豐富的上下文信息,提供了一個(gè)標(biāo)準(zhǔn)化的框架。MCP 的核心設(shè)計(jì)圍繞以下三大要素展開:
- 它定義了一種統(tǒng)一的上下文容器格式,用于封裝包括會(huì)話歷史記錄、用戶偏好設(shè)置以及任務(wù)相關(guān)元數(shù)據(jù)在內(nèi)的多種上下文信息。這種結(jié)構(gòu)化的容器使得上下文具備良好的可讀性和可操作性,能夠準(zhǔn)確反映交互過程中的狀態(tài)與意圖。
 - MCP 提供了一套規(guī)范的序列化標(biāo)準(zhǔn),確保上下文能夠在不同的系統(tǒng)、服務(wù)或 Agent 之間高效、可靠地傳輸。該標(biāo)準(zhǔn)不僅支持多種數(shù)據(jù)格式的轉(zhuǎn)換與解析,還兼顧了性能與兼容性,適用于從本地調(diào)用到跨網(wǎng)絡(luò)通信的多種場(chǎng)景。
 - MCP 還包含一套完整的上下文管理操作,支持上下文的創(chuàng)建、更新、合并與裁剪等關(guān)鍵操作。這些操作使得系統(tǒng)可以根據(jù)實(shí)際需求動(dòng)態(tài)調(diào)整上下文內(nèi)容,在保障信息完整性的同時(shí)避免冗余,提升處理效率與資源利用率。
 
MCP 架構(gòu)的核心由上下文容器組成:
class ContextContainer:
    """Container for model context information."""
    def __init__(self, conversation_id=None):
        self.conversation_id = conversation_id or str(uuid.uuid4())
        self.messages = []
        self.metadata = {}
        self.created_at = datetime.now()
        self.updated_at = datetime.now()
    def add_message(self, role, content, metadata=None):
        """Add a message to the context."""
        message = {
            "id": str(uuid.uuid4()),
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        self.messages.append(message)
        self.updated_at = datetime.now()
    def to_dict(self):
        """Serialize the container to a dictionary."""
        return {
            "conversation_id": self.conversation_id,
            "messages": self.messages,
            "metadata": self.metadata,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat()
        }
    @classmethod
    def from_dict(cls, data):
        """Create a container from a dictionary."""
        container = cls(data.get("conversation_id"))
        container.messages = data.get("messages", [])
        container.metadata = data.get("metadata", {})
        container.created_at = datetime.fromisoformat(data.get("created_at"))
        container.updated_at = datetime.fromisoformat(data.get("updated_at"))
        return container2. LangGraph 集成
LangGraph 已逐漸成為構(gòu)建復(fù)雜人工智能工作流的首選框架,尤其適用于需要多步驟推理、狀態(tài)管理以及多 Agent 協(xié)作的場(chǎng)景。通過將模型上下文協(xié)議(MCP)與 LangGraph 深度集成,開發(fā)者能夠構(gòu)建起結(jié)構(gòu)更復(fù)雜、分布更廣泛的多服務(wù)系統(tǒng),同時(shí)保持上下文在各組件之間的無(wú)縫流轉(zhuǎn)與一致性。
以下是 MCP 在 LangGraph 環(huán)境中的實(shí)現(xiàn)方式:
import os
from typing import Dict, List, Any
import requests
from langchain_core.language_models import BaseLLM
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
# MCP Context Manager
class MCPContextManager:
    def __init__(self, mcp_server_url):
        self.mcp_server_url = mcp_server_url
    def create_context(self, initial_data=None):
        """Create a new context container."""
        response = requests.post(
            f"{self.mcp_server_url}/contexts",
            json=initial_data or {}
        )
        return response.json()
    def get_context(self, context_id):
        """Retrieve a context container."""
        response = requests.get(
            f"{self.mcp_server_url}/contexts/{context_id}"
        )
        return response.json()
    def update_context(self, context_id, data):
        """Update a context container."""
        response = requests.patch(
            f"{self.mcp_server_url}/contexts/{context_id}",
            json=data
        )
        return response.json()
# MCP-aware LLM Node
class MCPLLMNode:
    def __init__(self, llm: BaseLLM, context_manager: MCPContextManager):
        self.llm = llm
        self.context_manager = context_manager
    def __call__(self, state: Dict[str, Any]) -> Dict[str, Any]:
        # Extract the context ID from state
        context_id = state.get("context_id")
        if not context_id:
            # Create new context if none exists
            context = self.context_manager.create_context()
            context_id = context["conversation_id"]
            state["context_id"] = context_id
        else:
            # Retrieve existing context
            context = self.context_manager.get_context(context_id)
        # Convert MCP messages to LangChain messages
        messages = []
        for msg in context.get("messages", []):
            if msg["role"] == "user":
                messages.append(HumanMessage(content=msg["content"]))
            elif msg["role"] == "assistant":
                messages.append(AIMessage(content=msg["content"]))
        # Add the current message if present
        if "current_input" in state:
            messages.append(HumanMessage(content=state["current_input"]))
            # Update MCP context with the user message
            self.context_manager.update_context(
                context_id,
                {"add_message": {
                    "role": "user",
                    "content": state["current_input"]
                }}
            )
        # Generate response
        response = self.llm.generate_response(messages)
        # Update MCP context with the assistant response
        self.context_manager.update_context(
            context_id,
            {"add_message": {
                "role": "assistant",
                "content": response.content
            }}
        )
        # Update state
        state["current_output"] = response.content
        return state
# Building a multi-server LangGraph with MCP
def build_mcp_graph(llm_servers: List[Dict], mcp_server_url: str):
    """
    Build a LangGraph using multiple LLM servers with MCP integration.
    llm_servers: List of server configurations with name, url, and routing_criteria
    mcp_server_url: URL of the MCP server
    """
    context_manager = MCPContextManager(mcp_server_url)
    # Create LLM nodes for each server
    llm_nodes = {}
    for server in llm_servers:
        llm = RemoteLLM(server["url"])
        llm_nodes[server["name"]] = MCPLLMNode(llm, context_manager)
    # Router function to determine which LLM to use
    def router(state: Dict[str, Any]) -> str:
        input_text = state.get("current_input", "")
        for server in llm_servers:
            criteria = server.get("routing_criteria", "")
            if criteria in input_text:
                return server["name"]
        # Default to the first server
        return llm_servers[0]["name"]
    # Build the graph
    workflow = StateGraph()
    # Add nodes
    workflow.add_node("router", router)
    for name, node in llm_nodes.items():
        workflow.add_node(name, node)
    # Add edges
    workflow.add_edge("router", dynamic=True)
    for name in llm_nodes.keys():
        workflow.add_edge(name, END)
    # Set the entry point
    workflow.set_entry_point("router")
    return workflow.compile()這段代碼實(shí)現(xiàn)了一個(gè)基于 LangGraph 和 MCP(Model Context Protocol,模型上下文協(xié)議)的多服務(wù)器 AI 工作流系統(tǒng)。它展示了如何將 MCP 與 LangGraph 集成,從而在多個(gè) LLM(語(yǔ)言模型)服務(wù)之間協(xié)調(diào)任務(wù),并保持統(tǒng)一的會(huì)話上下文狀態(tài)。
3. 創(chuàng)建多服務(wù)器部署
現(xiàn)在,讓我們看看如何在現(xiàn)實(shí)世界中部署這個(gè)架構(gòu):
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI()
# Configuration for our LLM servers
llm_servers = [
    {
        "name": "general_llm",
        "url": "http://general-llm-server:8000/generate",
        "routing_criteria": ""  # Default server
    },
    {
        "name": "code_llm",
        "url": "http://code-llm-server:8001/generate",
        "routing_criteria": "code"
    },
    {
        "name": "creative_llm",
        "url": "http://creative-llm-server:8002/generate",
        "routing_criteria": "write" 
    }
]
# MCP server URL
mcp_server_url = "http://mcp-server:9000"
# Build our graph
graph = build_mcp_graph(llm_servers, mcp_server_url)
class ChatRequest(BaseModel):
    message: str
    context_id: str = None
class ChatResponse(BaseModel):
    response: str
    context_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    # Initialize state
    state = {
        "current_input": request.message
    }
    # Add context ID if provided
    if request.context_id:
        state["context_id"] = request.context_id
    # Process through the graph
    result = graph.invoke(state)
    return ChatResponse(
        response=result["current_output"],
        context_id=result["context_id"]
    )
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8080)4. MCP 服務(wù)器的實(shí)現(xiàn)
這種架構(gòu)的一個(gè)關(guān)鍵組件是管理上下文容器的專用 MCP 服務(wù)器:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from typing import Dict, List, Optional, Any
import uuid
from datetime import datetime
import json
import redis
app = FastAPI()
# Use Redis for persistence
redis_client = redis.Redis(host="redis", port=6379, db=0)
class Message(BaseModel):
    role: str
    content: str
    metadata: Optional[Dict[str, Any]] = {}
class CreateContextRequest(BaseModel):
    metadata: Optional[Dict[str, Any]] = {}
    messages: Optional[List[Dict[str, Any]]] = []
class UpdateContextRequest(BaseModel):
    metadata: Optional[Dict[str, Any]] = None
    add_message: Optional[Dict[str, Any]] = None
    replace_messages: Optional[List[Dict[str, Any]]] = None
@app.post("/contexts")
async def create_context(request: CreateContextRequest):
    context_id = str(uuid.uuid4())
    now = datetime.now().isoformat()
    context = {
        "conversation_id": context_id,
        "messages": request.messages,
        "metadata": request.metadata,
        "created_at": now,
        "updated_at": now
    }
    redis_client.set(f"context:{context_id}", json.dumps(context))
    return context
@app.get("/contexts/{context_id}")
async def get_context(context_id: str):
    context_data = redis_client.get(f"context:{context_id}")
    if not context_data:
        raise HTTPException(status_code=404, detail="Context not found")
    return json.loads(context_data)
@app.patch("/contexts/{context_id}")
async def update_context(context_id: str, request: UpdateContextRequest):
    context_data = redis_client.get(f"context:{context_id}")
    if not context_data:
        raise HTTPException(status_code=404, detail="Context not found")
    context = json.loads(context_data)
    context["updated_at"] = datetime.now().isoformat()
    # Update metadata if provided
    if request.metadata is not None:
        context["metadata"].update(request.metadata)
    # Add a message if provided
    if request.add_message is not None:
        message = {
            "id": str(uuid.uuid4()),
            "role": request.add_message["role"],
            "content": request.add_message["content"],
            "timestamp": datetime.now().isoformat(),
            "metadata": request.add_message.get("metadata", {})
        }
        context["messages"].append(message)
    # Replace messages if provided
    if request.replace_messages is not None:
        context["messages"] = request.replace_messages
    redis_client.set(f"context:{context_id}", json.dumps(context))
    return context
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=9000)5. 為部署撰寫 Docker
為了把一切聯(lián)系起來(lái),可以使用 Docker Compose 來(lái)部署我們的多服務(wù)器系統(tǒng):
version: '3'
services:
  # Main API Gateway
  api_gateway:
    build: ./api_gateway
    ports:
      - "8080:8080"
    environment:
      - MCP_SERVER_URL=http://mcp_server:9000
    depends_on:
      - mcp_server
      - general_llm
      - code_llm
      - creative_llm
  # MCP Server
  mcp_server:
    build: ./mcp_server
    ports:
      - "9000:9000"
    depends_on:
      - redis
  # Redis for MCP persistence
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
  # LLM Servers
  general_llm:
    build: ./llm_servers/general
    ports:
      - "8000:8000"
  code_llm:
    build: ./llm_servers/code
    ports:
      - "8001:8000"
  creative_llm:
    build: ./llm_servers/creative
    ports:
      - "8002:8000"6. 性能關(guān)注
在生產(chǎn)環(huán)境中部署模型上下文協(xié)議(MCP)時(shí),性能優(yōu)化是確保系統(tǒng)高效穩(wěn)定運(yùn)行的關(guān)鍵。以下是幾個(gè)需要重點(diǎn)關(guān)注的性能考量及應(yīng)對(duì)策略:
上下文大小管理隨著會(huì)話持續(xù)進(jìn)行,上下文容器可能會(huì)不斷增長(zhǎng),導(dǎo)致存儲(chǔ)開銷和傳輸延遲增加。為避免這一問題,應(yīng)實(shí)施有效的剪枝策略,例如僅保留最近的幾輪對(duì)話或通過語(yǔ)義分析提取關(guān)鍵信息,從而保持上下文的精簡(jiǎn)與相關(guān)性。
序列化效率在高吞吐量的系統(tǒng)中,頻繁的上下文序列化與反序列化可能成為性能瓶頸。建議采用更高效的序列化格式,如 Protocol Buffers、MessagePack 或 FlatBuffers,以替代傳統(tǒng)的 JSON,從而減少數(shù)據(jù)體積并加快編解碼速度,提升整體處理效率。
緩存策略為了降低對(duì)后端 MCP 服務(wù)的訪問頻率,減少重復(fù)查詢帶來(lái)的延遲,應(yīng)在多個(gè)層級(jí)(如客戶端、網(wǎng)關(guān)或服務(wù)端)引入緩存機(jī)制??删彺娈?dāng)前活躍的上下文片段,僅在必要時(shí)同步更新或拉取最新狀態(tài),從而顯著提升響應(yīng)速度并減輕系統(tǒng)負(fù)載。
通過合理設(shè)計(jì)這些性能優(yōu)化策略,可以有效保障 MCP 在大規(guī)模、高并發(fā) AI 應(yīng)用場(chǎng)景下的穩(wěn)定性與可擴(kuò)展性,使其更好地服務(wù)于復(fù)雜的智能工作流需求。
class CachedMCPContextManager(MCPContextManager):
    def __init__(self, mcp_server_url, cache_ttl=300):
        super().__init__(mcp_server_url)
        self.cache = {}
        self.cache_ttl = cache_ttl
        self.last_access = {}
    def get_context(self, context_id):
        current_time = time.time()
        # Check cache
        if context_id in self.cache:
            self.last_access[context_id] = current_time
            return self.cache[context_id]
        # Retrieve from server
        context = super().get_context(context_id)
        # Update cache
        self.cache[context_id] = context
        self.last_access[context_id] = current_time
        # Prune cache if needed
        self._prune_cache()
        return context
    def _prune_cache(self):
        current_time = time.time()
        expired_keys = [
            k for k, v in self.last_access.items() 
            if current_time - v > self.cache_ttl
        ]
        for key in expired_keys:
            if key in self.cache:
                del self.cache[key]
            if key in self.last_access:
                del self.last_access[key]7. 安全性考量
在實(shí)施模型上下文協(xié)議(MCP)的過程中,安全性是一個(gè)不可忽視的關(guān)鍵環(huán)節(jié)。由于上下文容器可能包含用戶的敏感信息,如對(duì)話歷史、個(gè)人偏好或業(yè)務(wù)數(shù)據(jù),因此必須采取全面的安全措施來(lái)保障系統(tǒng)的可靠性和用戶隱私。
在數(shù)據(jù)保護(hù)方面,應(yīng)確保上下文數(shù)據(jù)在存儲(chǔ)和傳輸過程中始終處于加密狀態(tài)。同時(shí),建立嚴(yán)格的訪問控制機(jī)制,僅允許經(jīng)過身份驗(yàn)證和授權(quán)的實(shí)體讀取或修改上下文內(nèi)容。此外,還需制定合理的數(shù)據(jù)保留策略,明確上下文信息的存儲(chǔ)時(shí)限,并在任務(wù)完成后及時(shí)清理或歸檔,以降低數(shù)據(jù)泄露的風(fēng)險(xiǎn)。
身份驗(yàn)證與授權(quán)是保障系統(tǒng)安全的核心環(huán)節(jié)。所有試圖訪問或更新上下文的請(qǐng)求都必須通過可信的身份認(rèn)證流程,例如使用 API 密鑰、OAuth 令牌或基于角色的訪問控制(RBAC)。這可以有效防止未經(jīng)授權(quán)的系統(tǒng)或用戶篡改上下文內(nèi)容,從而保障整個(gè) MCP 系統(tǒng)的數(shù)據(jù)完整性和操作合法性。
為防范潛在的攻擊行為,還必須重視輸入驗(yàn)證與內(nèi)容消毒。任何進(jìn)入上下文的數(shù)據(jù)都應(yīng)經(jīng)過嚴(yán)格校驗(yàn),避免惡意構(gòu)造的內(nèi)容引發(fā)注入攻擊或其他安全漏洞。建議在數(shù)據(jù)寫入上下文之前進(jìn)行格式檢查、轉(zhuǎn)義處理或使用沙箱機(jī)制,從源頭上杜絕安全隱患。
通過在數(shù)據(jù)保護(hù)、訪問控制和輸入過濾等方面構(gòu)建多層次的安全防護(hù)體系,可以有效提升 MCP 在實(shí)際應(yīng)用中的安全性,為其在復(fù)雜、高要求的生產(chǎn)環(huán)境中的部署提供堅(jiān)實(shí)保障。
class SecureMCPContextManager(MCPContextManager):
    def __init__(self, mcp_server_url, api_key):
        super().__init__(mcp_server_url)
        self.api_key = api_key
    def _get_headers(self):
        return {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
    def create_context(self, initial_data=None):
        """Create a new context container with authentication."""
        response = requests.post(
            f"{self.mcp_server_url}/contexts",
            headers=self._get_headers(),
            json=initial_data or {}
        )
        return response.json()
    # Similarly update other methods to use authentication小結(jié)
MCP的出現(xiàn),標(biāo)志著分布式人工智能系統(tǒng)體系架構(gòu)取得了重大的突破與進(jìn)展。它提供了一種標(biāo)準(zhǔn)化的方法,用于管理和傳輸不同組件之間的上下文信息,從而為創(chuàng)建能夠保持上下文連續(xù)性的復(fù)雜多服務(wù)器系統(tǒng)提供了有力支持。
在當(dāng)今時(shí)代,人工智能系統(tǒng)正變得愈發(fā)復(fù)雜。在這樣的背景下,像MCP這樣的協(xié)議對(duì)于打造無(wú)縫的用戶體驗(yàn)而言,其重要性將日益凸顯。開發(fā)人員通過實(shí)現(xiàn)本文所闡述的模式和代碼,便能夠構(gòu)建出健壯的分布式AI系統(tǒng),進(jìn)而確保跨服務(wù)器和模型的上下文得以有效維護(hù)。
MCP的真正優(yōu)勢(shì)在于它的簡(jiǎn)潔性與靈活性。它將上下文管理與單個(gè)AI組件相分離,這使得更模塊化、易于維護(hù)且具備可伸縮性的體系架構(gòu)成為可能。隨著人工智能生態(tài)系統(tǒng)的持續(xù)發(fā)展,我們有理由期待看到更多類似的標(biāo)準(zhǔn)化協(xié)議出現(xiàn)。















 
 
 



 
 
 
 