使用 LangChain、LangGraph 和 RAGAS 構(gòu)建復(fù)雜的 RAG 系統(tǒng)
引言
想打造一個生產(chǎn)就緒的 RAG(Retrieval-Augmented Generation)系統(tǒng)?那可不是件簡單的事兒!得一步步來,精心設(shè)計,反復(fù)迭代。咱們得先把數(shù)據(jù)收拾干凈,然后試試不同的分塊策略——邏輯分塊和傳統(tǒng)分塊都得試試,找到最適合你的場景。接著,還要匿名化數(shù)據(jù),減少那些模型“胡思亂想”的情況(也就是所謂的 hallucination)。為了讓檢索更精準(zhǔn),可以用子圖(subgraphs)來聚焦最相關(guān)的信息,過濾掉那些沒用的“噪音”。在檢索層之上,還得加個計劃和執(zhí)行系統(tǒng),靠 LLM(大語言模型)驅(qū)動,像是派了個智能體,邊干邊學(xué),決定下一步咋走。最后,系統(tǒng)生成回答后,咱們得用一堆指標(biāo)來評估它表現(xiàn)如何。
這篇博客會帶你從頭開始,手把手教你用 LangChain、LangGraph 和 RAGAS(評估框架)構(gòu)建一個完整的 RAG 系統(tǒng),模擬真實世界的挑戰(zhàn),展示開發(fā)者在打造 RAG 機器人時會遇到的實際問題和解決方案。所有代碼都可以在 GitHub 倉庫里找到:https://github.com/FareedKhan-dev/complex-RAG-guide
目錄
? 理解 RAG 管道
? 環(huán)境配置
? 數(shù)據(jù)拆分(傳統(tǒng)/邏輯)
? 數(shù)據(jù)清洗
? 數(shù)據(jù)重組
? 數(shù)據(jù)向量化
? 創(chuàng)建上下文檢索器
? 過濾無關(guān)信息
? 查詢重寫
? 鏈?zhǔn)酵评恚–hain-of-Thought, COT)
? 相關(guān)性和事實核查
? 測試 RAG 管道
? 使用 LangGraph 可視化 RAG 管道
? 子圖方法與提煉驗證
? 創(chuàng)建檢索與提煉子圖
? 創(chuàng)建減少幻覺的子圖
? 創(chuàng)建并測試計劃執(zhí)行器
? 重新規(guī)劃邏輯
? 創(chuàng)建任務(wù)處理器
? 輸入問題的匿名化/去匿名化
? 編譯與可視化 RAG 管道
? 測試最終管道
? 使用 RAGAS 評估
? 總結(jié)
理解 RAG 管道
在動手寫代碼之前,咱們先來“畫”一張 RAG 管道的藍(lán)圖,方便后面逐步拆解每個部分。
首先,調(diào)用 anonymize_question,把具體名字(比如“Harry Potter”“Voldemort”)替換成占位符(Person X, Villain Y),避免 LLM 因預(yù)訓(xùn)練知識產(chǎn)生偏見。
接著,規(guī)劃器(planner)會制定一個高層次策略。比如,問題“How did X defeat Y?”可能會被規(guī)劃為:
1. 識別 X 和 Y
2. 找到他們的最終對決
3. 分析 X 的行動
4. 起草答案
然后,de_anonymize_plan 把占位符換回原名,讓計劃更具體。更新后的計劃交給 break_down_plan,將每個高層次步驟拆成具體任務(wù)。
task_handler 再為每個任務(wù)選擇合適的工具,比如:
- ?chosen_tool_is_retrieve_quotes:找具體對話或引用
- ?chosen_tool_is_retrieve_chunks:獲取通用信息和上下文
- ?chosen_tool_is_retrieve_summaries:總結(jié)整章內(nèi)容
- ?chosen_tool_is_answer:當(dāng)足夠上下文時直接回答
用完檢索工具(retrieve_book_quotes、retrieve_chunks 或 retrieve_summaries)后,新信息會送去 replan,它會根據(jù)進(jìn)展、目標(biāo)和新輸入決定是否更新計劃。
這個循環(huán)(task_handler -> 工具 -> replan)一直重復(fù),直到系統(tǒng)判斷問題可以直接回答(can_be_answered_already)。然后,get_final_answer 綜合所有證據(jù)生成最終回答。
最后,用 eval_using_RAGAS 檢查回答的準(zhǔn)確性和來源忠實度。如果通過,流程以 __end__ 結(jié)束,輸出一個經(jīng)過驗證、推理充分的答案。
環(huán)境配置
LangChain、LangGraph 這些模塊加起來是個完整的架構(gòu),所以咱們得按需導(dǎo)入,避免一下子加載太多東西,方便學(xué)習(xí)。
第一步是設(shè)置環(huán)境變量,存放 API 密鑰等敏感信息:
# 設(shè)置 OpenAI API 密鑰(用于 OpenAI LLMs)
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
# 設(shè)置 Together API 密鑰(用于 Together AI 模型)
os.environ["TOGETHER_API_KEY"] = os.getenv('TOGETHER_API_KEY')
# 獲取 Groq API 密鑰(用于 Groq LLMs)
groq_api_key = os.getenv('GROQ_API_KEY')
這里用了兩個 AI 模型提供商:Together AI 提供開源模型,成本低,性價比高;Groq 能生成結(jié)構(gòu)化輸出。如果你的 prompt 模板寫得好,能引導(dǎo) LLM 輸出結(jié)構(gòu)化結(jié)果,甚至可以不用 Groq,完全依賴 Together AI 或 Hugging Face 本地模型,畢竟 LangChain 生態(tài)功能很強大。
數(shù)據(jù)拆分(傳統(tǒng)/邏輯)
要開始,得先有數(shù)據(jù)集。RAG 管道通常處理大量原始文本數(shù)據(jù),比如 PDF、CSV 或 TXT 格式。但這些數(shù)據(jù)往往需要大量清洗,每個文件可能得用不同方法。
咱們用《哈利·波特》系列作為數(shù)據(jù)集,因為它很貼近現(xiàn)實場景,包含各種字符串格式問題。你可以從這里下載書。下載后,就可以開始拆分文檔了。
定義 PDF 路徑:
book_path = "Harry Potter - Book 1 - The Sorcerers Stone.pdf"
在預(yù)處理或清洗數(shù)據(jù)之前,最重要的一步是按邏輯和傳統(tǒng)方式拆分文檔。
對于《哈利·波特》,按章節(jié)拆分是最自然的邏輯方式。咱們先把 PDF 加載成一個完整的文本:
import re
import PyPDF2
from langchain.docstore.document import Document
with open(book_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
full_text = " ".join([page.extract_text() for page in pdf_reader.pages])
然后,用正則表達(dá)式按章節(jié)標(biāo)題拆分:
chapter_sections = re.split(r'(CHAPTER\s[A-Z]+(?:\s[A-Z]+)*)', full_text)
為每個章節(jié)創(chuàng)建 Document 對象:
chapters = []
for i in range(1, len(chapter_sections), 2):
chapter_text = chapter_sections[i] + chapter_sections[i + 1]
doc = Document(page_content=chapter_text, metadata={"chapter": i // 2 + 1})
chapters.append(doc)
print(f"總共提取的章節(jié)數(shù): {len(chapters)}")
輸出:
總共提取的章節(jié)數(shù): 17
除了章節(jié),引用(quotes)也是重要的斷點,因為它們往往概括了關(guān)鍵信息。對于金融文檔,表格或財務(wù)報表可能是關(guān)鍵斷點。咱們再按引用拆分:
quote_pattern_longer_than_min_length = re.compile(rf'"(.{{{min_length},}}?)"', re.DOTALL)
book_quotes_list = []
min_length = 50
for doc in tqdm(chapters, desc="提取引用"):
content = doc.page_content
found_quotes = quote_pattern_longer_than_min_length.findall(content)
for quote in found_quotes:
quote_doc = Document(page_content=quote)
book_quotes_list.append(quote_doc)
print(f"總共提取的引用數(shù): {len(book_quotes_list)}")
print(f"隨機引用內(nèi)容: {book_quotes_list[5].page_content[:500]}...")
輸出:
總共提取的引用數(shù): 1337
隨機引用內(nèi)容: Most mysterious. And now, over to JimMcGuffin ...
最后,用傳統(tǒng)的分塊方法:
from langchain.text_splitter import RecursiveCharacterTextSplitter
chunk_size = 1000
chunk_overlap = 200
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
)
document_splits = text_splitter.split_documents(documents)
print(f"分塊后的文檔數(shù): {len(document_splits)}")
輸出:
分塊后的文檔數(shù): 612
這樣,我們按章節(jié)、引用和傳統(tǒng)分塊三種方式拆分了數(shù)據(jù),接下來開始清洗。
數(shù)據(jù)清洗
看看第一個章節(jié)的內(nèi)容,發(fā)現(xiàn)字母之間有額外的空格(\t 制表符),得用正則表達(dá)式清理掉:
print(f"第一個章節(jié)內(nèi)容: {chapters[0].page_content[:500]}...")
輸出:
第一個章節(jié)內(nèi)容: CHAPTER ONE
THE BOY WHO LIVED
M
r. and M r s. D u r s l e y , o f n u m b e r ...
清理制表符:
tab_pattern = re.compile(r'\t')
for doc in chapters:
doc.page_content = tab_pattern.sub(' ', doc.page_content)
print(f"清理后的第一個章節(jié)內(nèi)容: {chapters[0].page_content[:500]}...")
輸出:
清理后的第一個章節(jié)內(nèi)容: CHAPTER ONE
THE BOY WHO LIVED
M
r. and Mrs. Dursley, of number f ...
還有換行符和多余空格,得繼續(xù)處理:
multiple_newlines_pattern = re.compile(r'\n\s*\n')
word_split_newline_pattern = re.compile(r'(\w)\n(\w)')
multiple_spaces_pattern = re.compile(r' +')
for doc in chapters:
page_content = multiple_newlines_pattern.sub('\n', doc.page_content)
page_content = word_split_newline_pattern.sub(r'\1\2', page_content)
page_content = page_content.replace('\n', ' ')
page_content = multiple_spaces_pattern.sub(' ', page_content)
doc.page_content = page_content
print(f"最終清理的章節(jié)內(nèi)容: {chapters[15].page_content[:500]}...")
輸出:
最終清理的章節(jié)內(nèi)容:
THE BOY WHO LIVED
Mr. and Mrs. Dursley, of number f ...
對傳統(tǒng)分塊數(shù)據(jù)也做同樣處理:
for doc in document_splits:
doc.page_content = tab_pattern.sub(' ', doc.page_content)
doc.page_content = multiple_newlines_pattern.sub('\n', doc.page_content)
doc.page_content = word_split_newline_pattern.sub(r'\1\2', doc.page_content)
doc.page_content = multiple_spaces_pattern.sub(' ', doc.page_content)
分析數(shù)據(jù):
chapter_word_counts = [len(doc.page_content.split()) for doc in chapters]
max_words = max(chapter_word_counts)
min_words = min(chapter_word_counts)
average_words = sum(chapter_word_counts) / len(chapter_word_counts)
print(f"章節(jié)最大詞數(shù): {max_words}")
print(f"章節(jié)最小詞數(shù): {min_words}")
print(f"章節(jié)平均詞數(shù): {average_words:.2f}")
輸出:
章節(jié)最大詞數(shù): 6343
章節(jié)最小詞數(shù): 2915
章節(jié)平均詞數(shù): 4402.18
章節(jié)詞數(shù)都在 LLM 上下文窗口限制內(nèi),暫時沒問題。
數(shù)據(jù)重組
引用數(shù)據(jù)已經(jīng)很精簡,但章節(jié)數(shù)據(jù)量大,包含很多不必要的對話??梢杂?LLM 總結(jié)章節(jié),保留關(guān)鍵信息:
from langchain.prompts import PromptTemplate
template = """Write an extensive summary of the following:
{text}
SUMMARY:"""
summarization_prompt = PromptTemplate(
template=template,
input_variables=["text"]
)
chain = load_summarize_chain(deepseek_v3, chain_type="stuff", prompt=summarization_prompt)
chapter_summaries = []
for chapter in chapters:
summary = chain.invoke([chapter])
cleaned_text = re.sub(r'\n\n', '\n', summary["output_text"])
doc_summary = Document(page_content=cleaned_text, metadata=chapter.metadata)
chapter_summaries.append(doc_summary)
這里用 stuff 鏈類型,因為章節(jié)最大詞數(shù)(6K)在 DeepSeek V3 的上下文窗口內(nèi)。如果數(shù)據(jù)超限,可以用 map_reduce 或 refine 鏈類型。
數(shù)據(jù)向量化
用 ML2 BERT 模型(32k 上下文窗口)向量化數(shù)據(jù),用 FAISS 存儲:
from langchain.vectorstores import FAISS
book_splits_vectorstore = FAISS.from_documents(document_splits, m2_bert_80M_32K)
chapter_summaries_vectorstore = FAISS.from_documents(chapter_summaries, m2_bert_80M_32K)
quotes_vectorstore = FAISS.from_documents(book_quotes_list, m2_bert_80M_32K)
quotes_vectorstore.save_local("quotes_vectorstore")
可以加載本地向量數(shù)據(jù)庫:
quotes_vectorstore = FAISS noctua2_bert_80M_32K, allow_dangerous_deserialization=True)
創(chuàng)建上下文檢索器
為每個數(shù)據(jù)集(章節(jié)摘要、引用、傳統(tǒng)分塊)創(chuàng)建檢索器:
book_chunks_retriever = book_splits_vectorstore.as_retriever(search_kwargs={"k": 1})
chapter_summaries_retriever = chapter_summaries_vectorstore.as_retriever(search_kwargs={"k": 1})
book_quotes_retriever = quotes_vectorstore.as_retriever(search_kwargs={"k": 10})
defretrieve_context_per_question(state):
question = state["question"]
docs = book_chunks_retriever.get_relevant_documents(question)
context = " ".join(doc.page_content for doc in docs)
docs_summaries = chapter_summaries_retriever.get_relevant_documents(state["question"])
context_summaries = " ".join(f"{doc.page_content} (Chapter {doc.metadata['chapter']})"for doc in docs_summaries)
docs_book_quotes = book_quotes_retriever.get_relevant_documents(state["question"])
book_qoutes = " ".join(doc.page_content for doc in docs_book_quotes)
all_contexts = context + context_summaries + book_qoutes
all_contexts = all_contexts.replace('"', '\\"').replace("'", "\\'")
return {"context": all_contexts, "question": question}
過濾無關(guān)信息
用 LLM 過濾無關(guān)內(nèi)容:
keep_only_relevant_content_prompt_template = """
You receive a query: {query} and retrieved documents: {retrieved_documents} from a vector store.
You need to filter out all the non-relevant information that does not supply important information regarding the {query}.
Your goal is to filter out the non-relevant information only.
You can remove parts of sentences that are not relevant to the query or remove whole sentences that are not relevant to the query.
DO NOT ADD ANY NEW INFORMATION THAT IS NOT IN THE RETRIEVED DOCUMENTS.
Output the filtered relevant content.
"""
classKeepRelevantContent(BaseModel):
relevant_content: str = Field(description="The relevant content from the retrieved documents that is relevant to the query.")
keep_only_relevant_content_prompt = PromptTemplate(
template=keep_only_relevant_content_prompt_template,
input_variables=["query", "retrieved_documents"],
)
keep_only_relevant_content_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
keep_only_relevant_content_chain = (
keep_only_relevant_content_prompt
| keep_only_relevant_content_llm.with_structured_output(KeepRelevantContent)
)
defkeep_only_relevant_content(state):
question = state["question"]
context = state["context"]
input_data = {"query": question, "retrieved_documents": context}
print("保留僅相關(guān)內(nèi)容...")
output = keep_only_relevant_content_chain.invoke(input_data)
relevant_content = output.relevant_content
relevant_content = "".join(relevant_content)
relevant_content = relevant_content.replace('"', '\\"').replace("'", "\\'")
return {"relevant_context": relevant_content, "context": context, "question": question}
查詢重寫
用戶查詢可能不夠明確,需用 LLM 重寫:
class RewriteQuestion(BaseModel):
rewritten_question: str = Field(description="優(yōu)化后的查詢")
explanation: str = Field(description="重寫說明")
rewrite_question_string_parser = JsonOutputParser(pydantic_object=RewriteQuestion)
rewrite_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=4000
)
rewrite_prompt_template = """You are a question re-writer that converts an input question to a better version optimized for vectorstore retrieval.
Analyze the input question {question} and try to reason about the underlying semantic intent / meaning.
{format_instructions}
"""
rewrite_prompt = PromptTemplate(
template=rewrite_prompt_template,
input_variables=["question"],
partial_variables={"format_instructions": rewrite_question_string_parser.get_format_instructions()},
)
question_rewriter = rewrite_prompt | rewrite_llm | rewrite_question_string_parser
defrewrite_question(state):
question = state["question"]
print("重寫查詢...")
result = question_rewriter.invoke({"question": question})
new_question = result["rewritten_question"]
return {"question": new_question}
鏈?zhǔn)酵评恚–OT)
用鏈?zhǔn)酵评恚–hain-of-Thought, COT)提高回答質(zhì)量:
class QuestionAnswerFromContext(BaseModel):
answer_based_on_content: str = Field(description="基于上下文的回答")
question_answer_from_context_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
question_answer_cot_prompt_template = """
Chain-of-Thought Reasoning Examples
Example 1
Context: Mary is taller than Jane. Jane is shorter than Tom. Tom is the same height as David.
Question: Who is the tallest person?
Reasoning:
Mary > Jane
Jane < Tom → Tom > Jane
Tom = David
So: Mary > Tom = David > Jane
Final Answer: Mary
...
Context: {context}
Question: {question}
"""
question_answer_from_context_cot_prompt = PromptTemplate(
template=question_answer_cot_prompt_template,
input_variables=["context", "question"],
)
question_answer_from_context_cot_chain = (
question_answer_from_context_cot_prompt
| question_answer_from_context_llm.with_structured_output(QuestionAnswerFromContext)
)
defanswer_question_from_context(state):
question = state["question"]
context = state["aggregated_context"] if"aggregated_context"in state else state["context"]
input_data = {"question": question, "context": context}
print("從檢索上下文回答問題...")
output = question_answer_from_context_cot_chain.invoke(input_data)
answer = output.answer_based_on_content
print(f'回答(未檢查幻覺): {answer}')
return {"answer": answer, "context": context, "question": question}
相關(guān)性和事實核查
進(jìn)一步檢查文檔相關(guān)性和事實依據(jù):
class Relevance(BaseModel):
is_relevant: bool = Field(description="文檔是否相關(guān)")
explanation: str = Field(description="相關(guān)性說明")
is_relevant_json_parser = JsonOutputParser(pydantic_object=Relevance)
is_relevant_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=2000
)
is_relevant_content_prompt = PromptTemplate(
template=is_relevant_content_prompt_template,
input_variables=["query", "context"],
partial_variables={"format_instructions": is_relevant_json_parser.get_format_instructions()},
)
is_relevant_content_chain = is_relevant_content_prompt | is_relevant_llm | is_relevant_json_parser
defis_relevant_content(state):
question = state["question"]
context = state["context"]
input_data = {"query": question, "context": context}
print("判斷文檔相關(guān)性...")
output = is_relevant_content_chain.invoke(input_data)
if output["is_relevant"]:
print("文檔相關(guān)。")
return"relevant"
else:
print("文檔不相關(guān)。")
return "not relevant"
事實核查:
class is_grounded_on_facts(BaseModel):
grounded_on_facts: bool = Field(description="回答是否基于事實")
is_grounded_on_facts_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
is_grounded_on_facts_prompt_template = """You are a fact-checker that determines if the given answer {answer} is grounded in the given context {context}...
"""
is_grounded_on_facts_prompt = PromptTemplate(
template=is_grounded_on_facts_prompt_template,
input_variables=["context", "answer"],
)
is_grounded_on_facts_chain = (
is_grounded_on_facts_prompt
| is_grounded_on_facts_llm.with_structured_output(is_grounded_on_facts)
)
defgrade_generation_v_documents_and_question(state):
context = state["context"]
answer = state["answer"]
question = state["question"]
grounded = is_grounded_on_facts_chain.invoke({"context": context, "answer": answer}).grounded_on_facts
ifnot grounded:
print("回答是幻覺。")
return"hallucination"
print("回答基于事實。")
can_be_answered = can_be_answered_chain.invoke({"question": question, "context": context})["can_be_answered"]
if can_be_answered:
print("問題可以完全回答。")
return"useful"
else:
print("問題無法完全回答。")
return "not_useful"
測試 RAG 管道
測試一個簡單問題:
init_state = {"question": "who is fluffy?"}
context_state = retrieve_context_per_question(init_state)
relevant_content_state = keep_only_relevant_content(context_state)
is_relevant_content_state = is_relevant_content(relevant_content_state)
answer_state = answer_question_from_context(relevant_content_state)
final_answer = grade_generation_v_documents_and_question(answer_state)
print(answer_state["answer"])
輸出:
檢索相關(guān)分塊...
檢索相關(guān)章節(jié)摘要...
保留僅相關(guān)內(nèi)容...
判斷文檔相關(guān)性...
文檔相關(guān)。
從檢索上下文回答問題...
回答(未檢查幻覺): Fluffy is a three-headed dog.
檢查回答是否基于事實...
回答基于事實。
判斷問題是否完全回答...
問題可以完全回答。
Fluffy is a three-headed dog.
Fluffy 是《哈利·波特》中的三頭犬,管道正確識別,說明運行正常。
使用 LangGraph 可視化 RAG 管道
用 LangGraph 可視化管道:
from typing import TypedDict
from langgraph.graph import END, StateGraph
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
classQualitativeRetievalAnswerGraphState(TypedDict):
question: str; context: str; answer: str
wf = StateGraph(QualitativeRetievalAnswerGraphState)
for n, f in [("retrieve", retrieve_context_per_question),
("filter", keep_only_relevant_content),
("rewrite", rewrite_question),
("answer", answer_question_from_context)]:
wf.add_node(n, f)
wf.set_entry_point("retrieve")
wf.add_edge("retrieve", "filter")
wf.add_conditional_edges("filter", is_relevant_content, {
"relevant": "answer",
"not relevant": "rewrite"
})
wf.add_edge("rewrite", "retrieve")
wf.add_conditional_edges("answer", grade_generation_v_documents_and_question, {
"hallucination": "answer",
"not_useful": "rewrite",
"useful": END
})
display(Image(wf.compile().get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
這個圖清晰展示了從檢索上下文到過濾、查詢重寫、回答生成和事實核查的流程。
子圖方法與提煉驗證
復(fù)雜任務(wù)需要子圖(subgraphs)來拆分功能,比如檢索、提煉和驗證:
is_distilled_content_grounded_on_content_prompt_template = """
You receive some distilled content: {distilled_content} and the original context: {original_context}.
You need to determine if the distilled content is grounded on the original context.
...
"""
classIsDistilledContentGroundedOnContent(BaseModel):
grounded: bool
explanation: str
is_distilled_content_grounded_on_content_json_parser = JsonOutputParser(
pydantic_object=IsDistilledContentGroundedOnContent
)
is_distilled_content_grounded_on_content_prompt = PromptTemplate(
template=is_distilled_content_grounded_on_content_prompt_template,
input_variables=["distilled_content", "original_context"],
partial_variables={"format_instructions": is_distilled_content_grounded_on_content_json_parser.get_format_instructions()},
)
is_distilled_content_grounded_on_content_llm = ChatGroq(
temperature=0,
model_name="llama3-70b-8192",
groq_api_key=groq_api_key,
max_tokens=4000
)
is_distilled_content_grounded_on_content_chain = (
is_distilled_content_grounded_on_content_prompt
| is_distilled_content_grounded_on_content_llm
| is_distilled_content_grounded_on_content_json_parser
)
defis_distilled_content_grounded_on_content(state):
print("判斷提煉內(nèi)容是否基于原始上下文...")
distilled_content = state["relevant_context"]
original_context = state["context"]
input_data = {"distilled_content": distilled_content, "original_context": original_context}
output = is_distilled_content_grounded_on_content_chain.invoke(input_data)
grounded = output["grounded"]
if grounded:
print("提煉內(nèi)容基于原始上下文。")
return"grounded on the original context"
else:
print("提煉內(nèi)容不基于原始上下文。")
return "not grounded on the original context"
創(chuàng)建檢索與提煉子圖
為章節(jié)摘要、引用和傳統(tǒng)分塊創(chuàng)建單獨的檢索函數(shù):
def retrieve_chunks_context_per_question(state):
print("檢索相關(guān)分塊...")
question = state["question"]
docs = book_chunks_retriever.get_relevant_documents(question)
context = " ".join(doc.page_content for doc in docs)
context = context.replace('"', '\\"').replace("'", "\\'")
return {"context": context, "question": question}
defretrieve_summaries_context_per_question(state):
print("檢索相關(guān)章節(jié)摘要...")
question = state["question"]
docs_summaries = chapter_summaries_retriever.get_relevant_documents(state["question"])
context_summaries = " ".join(f"{doc.page_content} (Chapter {doc.metadata['chapter']})"for doc in docs_summaries)
context_summaries = context_summaries.replace('"', '\\"').replace("'", "\\'")
return {"context": context_summaries, "question": question}
defretrieve_book_quotes_context_per_question(state):
print("檢索相關(guān)書籍引用...")
question = state["question"]
docs_book_quotes = book_quotes_retriever.get_relevant_documents(state["question"])
book_qoutes = " ".join(doc.page_content for doc in docs_book_quotes)
book_qoutes_context = book_qoutes.replace('"', '\\"').replace("'", "\\'")
return {"context": book_qoutes_context, "question": question}
classQualitativeRetrievalGraphState(TypedDict):
question: str
context: str
relevant_context: str
defbuild_retrieval_workflow(node_name, retrieve_fn):
graph = StateGraph(QualitativeRetrievalGraphState)
graph.add_node(node_name, retrieve_fn)
graph.add_node("keep_only_relevant_content", keep_only_relevant_content)
graph.set_entry_point(node_name)
graph.add_edge(node_name, "keep_only_relevant_content")
graph.add_conditional_edges(
"keep_only_relevant_content",
is_distilled_content_grounded_on_content,
{
"grounded on the original context": END,
"not grounded on the original context": "keep_only_relevant_content",
},
)
app = graph.compile()
display(Image(app.get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
return graph
build_retrieval_workflow("retrieve_chunks_context_per_question", retrieve_chunks_context_per_question)
build_retrieval_workflow("retrieve_summaries_context_per_question", retrieve_summaries_context_per_question)
build_retrieval_workflow("retrieve_book_quotes_context_per_question", retrieve_book_quotes_context_per_question)
創(chuàng)建減少幻覺的子圖
減少幻覺的子圖:
def is_answer_grounded_on_context(state):
print("檢查回答是否基于事實...")
context = state["context"]
answer = state["answer"]
result = is_grounded_on_facts_chain.invoke({"context": context, "answer": answer})
grounded_on_facts = result.grounded_on_facts
ifnot grounded_on_facts:
print("回答是幻覺。")
return"hallucination"
else:
print("回答基于事實。")
return"grounded on context"
classQualitativeAnswerGraphState(TypedDict):
question: str; context: str; answer: str
wf = StateGraph(QualitativeAnswerGraphState)
wf.add_node("answer", answer_question_from_context)
wf.set_entry_point("answer")
wf.add_conditional_edges("answer", is_answer_grounded_on_context, {
"hallucination": "answer",
"grounded on context": END
})
display(Image(wf.compile().get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.API)))
測試幻覺子圖:
question = "who is harry?"
context = "Harry Potter is a cat."
init_state = {"question": question, "context": context}
for output in qualitative_answer_workflow_app.stream(init_state):
for _, value in output.items():
pass
print("--------------------")
print(f'answer: {value["answer"]}')
輸出:
從檢索上下文回答問題...
回答(未檢查幻覺): Harry Potter is a cat.
檢查回答是否基于事實...
回答基于事實。
--------------------
answer: Harry Potter is a cat.
即使上下文錯誤,系統(tǒng)仍基于上下文回答,說明它不會憑空“捏造”。
創(chuàng)建并測試計劃執(zhí)行器
定義計劃執(zhí)行器:
class PlanExecute(TypedDict):
curr_state: str
question: str
anonymized_question: str
query_to_retrieve_or_answer: str
plan: List[str]
past_steps: List[str]
mapping: dict
curr_context: str
aggregated_context: str
tool: str
response: str
classPlan(BaseModel):
steps: List[str] = Field(description="按順序執(zhí)行的步驟")
planner_prompt = """For the given query {question}, come up with a simple step by step plan of how to figure out the answer. ..."""
planner_prompt = PromptTemplate(
template=planner_prompt,
input_variables=["question"],
)
planner_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
planner = planner_prompt | planner_llm.with_structured_output(Plan)
break_down_plan_prompt_template = """You receive a plan {plan} which contains a series of steps to follow in order to answer a query. ..."""
break_down_plan_prompt = PromptTemplate(
template=break_down_plan_prompt_template,
input_variables=["plan"],
)
break_down_plan_llm = ChatTogether(
temperature=0,
model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free",
api_key=together_api_key,
max_tokens=2000
)
break_down_plan_chain = break_down_plan_prompt | break_down_plan_llm.with_structured_output(Plan)
測試計劃執(zhí)行器:
question = {"question": "how did the main character beat the villain?"}
my_plan = planner.invoke(question)
print(my_plan)
refined_plan = break_down_plan_chain.invoke(my_plan.steps)
print(refined_plan)
輸出:
steps = [
'從向量存儲中識別英雄和反派。',
'從向量存儲中找到高潮或最終對決。',
'從向量存儲中分析英雄在此對決中的行動。',
'從向量存儲中確定擊敗反派的關(guān)鍵行動/策略。',
'使用檢索到的上下文總結(jié)英雄如何擊敗反派。'
]
重新規(guī)劃邏輯
更新計劃:
replanner_prompt_template = """
For the given objective, come up with a simple step by step plan of how to figure out the answer. ...
"""
classActPossibleResults(BaseModel):
plan: Plan = Field(description="未來計劃")
explanation: str = Field(description="行動說明")
act_possible_results_parser = JsonOutputParser(pydantic_object=ActPossibleResults)
replanner_prompt = PromptTemplate(
template=replanner_prompt_template,
input_variables=["question", "plan", "past_steps", "aggregated_context"],
partial_variables={"format_instructions": act_possible_results_parser.get_format_instructions()},
)
replanner_llm = ChatTogether(temperature=0, model_name="LLaMA-3.3-70B-Turbo-Free", max_tokens=2000)
replanner = replanner_prompt | replanner_llm | act_possible_results_parser
創(chuàng)建任務(wù)處理器
任務(wù)處理器決定使用哪個子圖:
tasks_handler_prompt_template = """
You are a task handler that receives a task: {curr_task} and must decide which tool to use to execute the task. ...
"""
class TaskHandlerOutput(BaseModel):
query: str = Field(description="用于檢索或回答的查詢")
curr_context: str = Field(description="回答查詢的上下文")
tool: str = Field(description="使用的工具:retrieve_chunks, retrieve_summaries, retrieve_quotes, 或 answer_from_context")
task_handler_prompt = PromptTemplate(
template=tasks_handler_prompt_template,
input_variables=["curr_task", "aggregated_context", "last_tool", "past_steps", "question"],
)
task_handler_llm = ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
task_handler_chain = task_handler_prompt | task_handler_llm.with_structured_output(TaskHandlerOutput)
輸入問題的匿名化/去匿名化
匿名化問題以避免 LLM 偏見:
class AnonymizeQuestion(BaseModel):
anonymized_question: str
mapping: dict
explanation: str
anonymize_question_chain = (
PromptTemplate(
input_variables=["question"],
partial_variables={"format_instructions": JsonOutputParser(pydantic_object=AnonymizeQuestion).get_format_instructions()},
template="""You anonymize questions by replacing named entities with variables. ...""",
)
| ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
| JsonOutputParser(pydantic_object=AnonymizeQuestion)
)
classDeAnonymizePlan(BaseModel):
plan: List
de_anonymize_plan_chain = (
PromptTemplate(
input_variables=["plan", "mapping"],
template="Replace variables in: {plan}, using: {mapping}. Output updated list as JSON."
)
| ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000).with_structured_output(DeAnonymizePlan)
)
編譯與可視化 RAG 管道
執(zhí)行計劃并打印步驟:
def execute_plan_and_print_steps(state):
state["curr_state"] = "task_handler"
curr_task = state["plan"].pop(0)
inputs = {
"curr_task": curr_task,
"aggregated_context": state.get("aggregated_context", ""),
"last_tool": state.get("tool"),
"past_steps": state.get("past_steps", []),
"question": state["question"]
}
output = task_handler_chain.invoke(inputs)
state["past_steps"].append(curr_task)
state["query_to_retrieve_or_answer"] = output.query
state["tool"] = output.tool if output.tool != "answer_from_context"else"answer"
if output.tool == "answer_from_context":
state["curr_context"] = output.curr_context
return state
整體流程:
- 1. 匿名化問題
- 2. 規(guī)劃器創(chuàng)建高層次策略
- 3. 去匿名化計劃
- 4. 拆分計劃為小任務(wù)
- 5. 任務(wù)處理器選擇工具
- 6. 檢索或回答
- 7. 根據(jù)新信息重新規(guī)劃
- 8. 生成最終回答
- 9. 結(jié)束
測試最終管道
測試無法回答的問題:
input = {"question": "盧平教授教了什么?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 數(shù)據(jù)中未找到答案。
測試復(fù)雜問題:
input = {"question": "幫助反派的教授教什么課?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 幫助反派的教授是奇洛教授,教黑魔法防御術(shù)。
測試推理問題:
input = {"question": "哈利如何擊敗奇洛?"}
final_answer, final_state = execute_plan_and_print_steps(input)
輸出:
...
最終回答: 哈利擊敗奇洛因為他母親的保護(hù)魔法使奇洛在接觸哈利時會被灼傷。
使用 RAGAS 評估
用 RAGAS 評估管道:
questions = [
"守護(hù)魔法石的三頭犬叫什么?",
"誰給了哈利·波特他的第一把飛天掃帚?",
"分院帽最初為哈利考慮哪個學(xué)院?"
]
ground_truth_answers = [
"Fluffy",
"麥格教授",
"斯萊特林"
]
data_samples = {
'question': questions,
'answer': generated_answers,
'contexts': retrieved_documents,
'ground_truth': ground_truth_answers
}
data_samples['contexts'] = [[context] ifisinstance(context, str) else context for context in data_samples['contexts']]
dataset = Dataset.from_dict(data_samples)
metrics = [
answer_correctness,
faithfulness,
answer_relevancy,
context_recall,
answer_similarity
]
llm = ChatTogether(temperature=0, model_name="meta-llama/Llama-3.3-70B-Instruct-Turbo-Free", api_key=together_api_key, max_tokens=2000)
score = evaluate(dataset, metrics=metrics, llm=llm)
results_df = score.to_pandas()
評估結(jié)果顯示管道在小規(guī)模測試中表現(xiàn)良好,部分指標(biāo)得分約 0.9。
總結(jié)
我們從零開始,清洗數(shù)據(jù)、拆分?jǐn)?shù)據(jù),創(chuàng)建檢索器、過濾器、查詢重寫器和 COT 管道。為了處理復(fù)雜查詢,引入了子圖方法,構(gòu)建了檢索、提煉等子圖,還開發(fā)了減少幻覺的組件,設(shè)計了規(guī)劃器和任務(wù)處理器,最后用 RAGAS 評估了系統(tǒng)。希望你學(xué)到了新東西!
本文轉(zhuǎn)載自??AI大模型觀察站??,作者:AI大模型觀察站
