100行代碼搞定多智能體?這個(gè)極簡(jiǎn)AI框架PocketFlow有點(diǎn)東西 原創(chuàng)
現(xiàn)在各種工作流框架太多了,看不過(guò)來(lái),也沒(méi)有什么精力去學(xué)習(xí)。最近無(wú)意中刷到一個(gè)微型框架:PocketFlow,這個(gè)框架非常小巧,看了下只有 100 行左右的代碼,很容易看懂。我非常喜歡,寫個(gè)教程介紹一下。
對(duì)比其他框架
抽象 | 應(yīng)用特定包裝器 | 供應(yīng)商特定包裝器 | 代碼行數(shù) | 大小 | |
LangChain | Agent, Chain | 很多 (例如,QA, 摘要) | 很多 (例如,OpenAI, Pinecone等) | 405K | +166MB |
CrewAI | Agent, Chain | 很多 (例如,F(xiàn)ileReadTool, SerperDevTool) | 很多 (例如,OpenAI, Anthropic, Pinecone等) | 18K | +173MB |
SmolAgent | Agent | 一些 (例如,CodeAgent, VisitWebTool) | 一些 (例如,DuckDuckGo, Hugging Face等) | 8K | +198MB |
LangGraph | Agent, Graph | 一些 (例如,語(yǔ)義搜索) | 一些 (例如,PostgresStore, SqliteSaver等) | 37K | +51MB |
AutoGen | Agent | 一些 (例如,Tool Agent, Chat Agent) | 很多 [可選] (例如,OpenAI, Pinecone等) | 7K (僅核心) | +26MB (僅核心) |
PocketFlow | Graph | 無(wú) | 無(wú) | 100 | +56KB |
通過(guò)對(duì)比可以看到,PocketFlow 沒(méi)有很多融合的功能,只抽象出Graph,就能完成常見(jiàn)的RAG和Agent相關(guān)的功能了。
應(yīng)用場(chǎng)景
PocketFlow 設(shè)計(jì)了多種應(yīng)用場(chǎng)景的示例,從基礎(chǔ)的聊天機(jī)器人到復(fù)雜的多智能體系統(tǒng)。以下是一些基礎(chǔ)示例:
- 聊天:基礎(chǔ)聊天機(jī)器人,帶有對(duì)話歷史
- 結(jié)構(gòu)化輸出:通過(guò)提示從簡(jiǎn)歷中提取結(jié)構(gòu)化數(shù)據(jù)
- 工作流:寫作工作流,包括大綱、內(nèi)容編寫和樣式應(yīng)用
- 智能體:可以搜索網(wǎng)絡(luò)并回答問(wèn)題的研究智能體
- RAG:簡(jiǎn)單的檢索增強(qiáng)生成過(guò)程
- 批處理:將 Markdown 內(nèi)容翻譯成多種語(yǔ)言的批處理器
更復(fù)雜的應(yīng)用包括:多智能體通信、監(jiān)督流程、并行執(zhí)行、思維鏈推理、短期和長(zhǎng)期記憶聊天機(jī)器人等。
多智能體實(shí)戰(zhàn)
現(xiàn)在我們用PocketFlow實(shí)現(xiàn)一個(gè)AI版"你畫我猜"——Taboo游戲
為了直觀地展示PocketFlow的威力,我們來(lái)看一個(gè)用它實(shí)現(xiàn)的多智能體協(xié)作案例:Taboo(禁忌語(yǔ))游戲。
游戲規(guī)則:
* 提示者 (Hinter): 知道一個(gè)目標(biāo)詞和幾個(gè)"禁忌詞"。它的任務(wù)是給出提示,引導(dǎo)猜詞者猜出目標(biāo)詞,但提示中不能包含任何禁忌詞。
* 猜詞者 (Guesser): 根據(jù)提示者的提示,猜出目標(biāo)詞。
在這個(gè)案例中,兩個(gè)LLM將分別扮演提示者和猜詞者,它們需要通過(guò)不斷的異步通信來(lái)協(xié)作完成游戲。
PocketFlow使用??AsyncNode?
??來(lái)定義異步任務(wù)。我們的兩個(gè)智能體??AsyncHinter?
??和??AsyncGuesser?
?都繼承自它。
提示者 ??AsyncHinter?
?
class AsyncHinter(AsyncNode):
asyncdef prep_async(self, shared):
# 1. 從隊(duì)列中等待猜詞者的消息
guess = await shared["hinter_queue"].get()
if guess == "GAME_OVER":
returnNone
# 2. 準(zhǔn)備LLM的輸入
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
asyncdef exec_async(self, inputs):
# ... (調(diào)用LLM生成提示)
target, forbidden, past_guesses = inputs
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
if past_guesses:
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
hint = call_llm(prompt)
print(f"\nHinter: Here's your hint - {hint}")
return hint
asyncdef post_async(self, shared, prep_res, exec_res):
if exec_res isNone:
return"end"
# 3. 將生成的提示放入猜詞者的隊(duì)列
await shared["guesser_queue"].put(exec_res)
return"continue"# 返回Action,驅(qū)動(dòng)流程繼續(xù)
猜詞者 ??AsyncGuesser?
?
class AsyncGuesser(AsyncNode):
asyncdef prep_async(self, shared):
# 1. 從隊(duì)列中等待提示者的提示
hint = await shared["guesser_queue"].get()
return hint, shared.get("past_guesses", [])
asyncdef exec_async(self, inputs):
# ... (調(diào)用LLM生成猜測(cè))
hint, past_guesses = inputs
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess."
guess = call_llm(prompt)
print(f"Guesser: I guess it's - {guess}")
return guess
asyncdef post_async(self, shared, prep_res, exec_res):
# 2. 檢查答案
if exec_res.lower() == shared["target_word"].lower():
print("Game Over - Correct guess!")
await shared["hinter_queue"].put("GAME_OVER")
return"end"
# 3. 如果猜錯(cuò),將錯(cuò)誤答案發(fā)回給提示者,以便其給出更好的提示
shared.setdefault("past_guesses", []).append(exec_res)
await shared["hinter_queue"].put(exec_res)
return"continue"
兩個(gè)智能體通過(guò)共享存儲(chǔ)??shared?
??中的兩個(gè)??asyncio.Queue?
??(??hinter_queue?
??和??guesser_queue?
?)進(jìn)行異步通信,一個(gè)用于接收信息,一個(gè)用于發(fā)送信息,實(shí)現(xiàn)了完美的解耦。
async def main():
# ... (初始化shared, 包括target_word, forbidden_words, 和兩個(gè)queue)
# 創(chuàng)建節(jié)點(diǎn)和流
hinter = AsyncHinter()
guesser = AsyncGuesser()
hinter_flow = AsyncFlow(start=hinter)
guesser_flow = AsyncFlow(start=guesser)
# 定義循環(huán):當(dāng)post返回"continue"時(shí),節(jié)點(diǎn)會(huì)再次執(zhí)行自己
hinter - "continue" >> hinter
guesser - "continue" >> guesser
# 使用asyncio.gather并發(fā)運(yùn)行兩個(gè)智能體流
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)
每個(gè)智能體都被包裝在一個(gè)獨(dú)立的??AsyncFlow?
??中,并通過(guò)??"continue"?
?這個(gè)Action實(shí)現(xiàn)自我循環(huán),不斷地接收、處理、發(fā)送消息。
核心理念
PocketFlow 把 LLM 工作流抽象為:
+-----------+
Shared | |
Store <--| Node |<-- Params(僅 Batch 用)
+-----------+
|
Action
v
+-----------+
| Node |
+-----------+
- Node:執(zhí)行 prep → exec → post 三段式。
- Action:post() 返回字符串,決定流向哪一個(gè) successor。
- Flow:負(fù)責(zé)“根據(jù) Action 走圖”的調(diào)度器。
- Shared Store:跨 Node 的全局?jǐn)?shù)據(jù)約定。
核心源碼剖析 (pocketflow/__init__.py)
PocketFlow的強(qiáng)大之處在于其簡(jiǎn)約的核心抽象。讓我們深入其僅有100行的源碼,逐一拆解其精妙設(shè)計(jì)。
BaseNode
??BaseNode?
?是所有節(jié)點(diǎn)的基石,它定義了節(jié)點(diǎn)最核心的兩個(gè)屬性和兩個(gè)方法:
* ??self.successors?
??: 一個(gè)字典,形態(tài)為??{'action_name': next_node}?
??。這是PocketFlow流程控制的脈搏。??post?
??方法返回的??action?
?字符串就是在這個(gè)字典里查找下一個(gè)要執(zhí)行的節(jié)點(diǎn)。
* ??self.params?
?: 另一個(gè)字典,用于接收外部傳入的、節(jié)點(diǎn)級(jí)別的參數(shù),在批處理場(chǎng)景(Batch)中尤其重要。
* ??next(self, node, actinotallow="default")?
??: 這個(gè)方法負(fù)責(zé)填充??successors?
??字典。??node_a.next(node_b, "some_action")?
??就相當(dāng)于??node_a.successors["some_action"] = node_b?
?。
* ??_run(self, shared)?
??: 這是節(jié)點(diǎn)的生命周期方法,它嚴(yán)格按照??prep -> _exec -> post?
??的順序執(zhí)行,并將??prep?
??的結(jié)果傳遞給??_exec?
??,再將兩者的結(jié)果傳遞給??post?
?。
class BaseNode:
def __init__(self): self.params,self.successors={},{}
def set_params(self,params): self.params=params
def next(self,node,actinotallow="default"):
if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
self.successors[action]=node; return node
def prep(self,shared): pass
def exec(self,prep_res): pass
def post(self,shared,prep_res,exec_res): pass
def _exec(self,prep_res): return self.exec(prep_res)
def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)
def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use Flow.")
return self._run(shared)
DSL: >> 和 - 語(yǔ)法糖
PocketFlow中最令人驚艷的莫過(guò)于其定義流程的方式,如 ??node_a - "action" >> node_b?
?。這其實(shí)是巧妙地利用了Python的魔法方法實(shí)現(xiàn)的:
* ??__sub__(self, action)?
??: 當(dāng)我們寫??node_a - "action"?
??時(shí),Python會(huì)調(diào)用??node_a?
??的??__sub__?
??方法。這個(gè)方法并不做減法,而是返回一個(gè)臨時(shí)的??_ConditionalTransition?
??對(duì)象,這個(gè)對(duì)象保存了??node_a?
??和??"action"?
?。
* ??__rshift__(self, other)?
??: 當(dāng)我們寫??... >> node_b?
??時(shí),Python會(huì)調(diào)用??__rshift__?
?方法。
* 如果直接是??node_a >> node_b?
??,??node_a?
??的??__rshift__?
??被調(diào)用,它等價(jià)于??node_a.next(node_b, "default")?
?。
* 如果是??_ConditionalTransition(...) >> node_b?
??,那么臨時(shí)對(duì)象的??__rshift__?
??被調(diào)用,它執(zhí)行的是??source_node.next(node_b, "saved_action")?
?。
class BaseNode:
......
def __rshift__(self,other): return self.next(other)
def __sub__(self,action):
if isinstance(action,str): return _ConditionalTransition(self,action)
raise TypeError("Action must be a string")
class _ConditionalTransition:
def __init__(self,src,action): self.src,self.actinotallow=src,action
def __rshift__(self,tgt): return self.src.next(tgt,self.action)
通過(guò)這短短幾行代碼,PocketFlow就創(chuàng)造出了一種極具表現(xiàn)力的領(lǐng)域特定語(yǔ)言(DSL),讓流程定義變得像寫詩(shī)一樣自然。
Node
??Node?
??類在??BaseNode?
?的基礎(chǔ)上,增加了至關(guān)重要的容錯(cuò)機(jī)制。
class Node(BaseNode):
def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait
def exec_fallback(self,prep_res,exc): raise exc
def _exec(self,prep_res):
for self.cur_retry in range(self.max_retries):
try: return self.exec(prep_res)
except Exception as e:
if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e)
if self.wait>0: time.sleep(self.wait)
這里的??_exec?
??方法覆蓋了??BaseNode?
??的版本。它不再是簡(jiǎn)單地調(diào)用??self.exec?
??,而是用一個(gè)??for?
??循環(huán)包裹了??try...except?
?塊。
* 循環(huán): ??max_retries?
?參數(shù)決定了循環(huán)次數(shù)。
* ??try?
??: 嘗試執(zhí)行開(kāi)發(fā)者定義的??self.exec(prep_res)?
?。如果成功,直接返回結(jié)果。
* ??except?
?: 如果捕獲到任何異常,它會(huì)檢查是否是最后一次重試。
* 如果是,則調(diào)用??exec_fallback?
?方法,讓開(kāi)發(fā)者有機(jī)會(huì)進(jìn)行優(yōu)雅的失敗處理(默認(rèn)是直接拋出異常)。
* 如果不是,則根據(jù)??wait?
?參數(shù)等待一段時(shí)間后,進(jìn)入下一次循環(huán)重試。
Flow
??Flow?
??是整個(gè)工作流的驅(qū)動(dòng)引擎,其核心是??_orch?
?(orchestrate,編排)方法。
class Flow(BaseNode):
def __init__(self,start=None): super().__init__(); self.start_node=start
# ...
def get_next_node(self,curr,action):
nxt=curr.successors.get(action or"default")
ifnot nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
return nxt
def _orch(self,shared,params=None):
curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
while curr:
curr.set_params(p)
last_actinotallow=curr._run(shared)
curr=copy.copy(self.get_next_node(curr,last_action))
return last_action
??_orch?
?的邏輯非常清晰:
- 初始化:?
?curr?
??指針指向??start_node?
?。 - ?
?while?
?循環(huán): 只要??curr?
??不為??None?
?,循環(huán)就繼續(xù)。 - 執(zhí)行: 調(diào)用?
?curr._run(shared)?
??來(lái)執(zhí)行當(dāng)前節(jié)點(diǎn)的??prep->exec->post?
??生命周期,并將其??post?
??方法的返回值存為??last_action?
?。 - 尋路: 調(diào)用?
?get_next_node(curr, last_action)?
??,在當(dāng)前節(jié)點(diǎn)的??successors?
??字典中尋找??last_action?
?對(duì)應(yīng)的新節(jié)點(diǎn)。 - 前進(jìn): 將?
?curr?
??指針更新為找到的新節(jié)點(diǎn)。如果找不到,??get_next_node?
??返回??None?
?,循環(huán)將在下一次檢查時(shí)終止。 - ?
?copy.copy()?
?的使用確保了每個(gè)節(jié)點(diǎn)的實(shí)例在flow的單次運(yùn)行中是獨(dú)立的,避免了狀態(tài)污染。
在我看來(lái),PocketFlow的源碼展現(xiàn)了"少即是多"的原則。它沒(méi)有試圖成為一個(gè)包羅萬(wàn)象的巨型框架,而是專注于提供一套最核心、最靈活的構(gòu)建塊,將其他的一切(如工具調(diào)用、API封裝)都交由開(kāi)發(fā)者在節(jié)點(diǎn)內(nèi)部自由實(shí)現(xiàn)。
總結(jié)
說(shuō)實(shí)話,PocketFlow目前的易用性還有待提升,不如許多框架那樣開(kāi)箱即用。但正是這種精簡(jiǎn)設(shè)計(jì)賦予了它更大的靈活性,開(kāi)發(fā)者可以根據(jù)自己的需求進(jìn)行DIY(詳細(xì)實(shí)現(xiàn)方案可參考官方Cookbook:https://github.com/The-Pocket/PocketFlow/tree/main/cookbook)
本文轉(zhuǎn)載自??AI 博物院?? 作者:longyunfeigu
