RAG:7個檢索增強生成技術(shù)的解析(含實現(xiàn)代碼) 原創(chuàng) 精華
在當今數(shù)字化時代,自然語言處理(NLP)和生成式人工智能(AI)正以前所未有的速度發(fā)展。其中,檢索增強生成(Retrieval-Augmented Generation,簡稱 RAG)技術(shù)脫穎而出,成為這一領(lǐng)域的明星。RAG 通過結(jié)合信息檢索和語言模型的文本生成能力,打造出更精準、更實時且更可靠的智能系統(tǒng)。今天,我們將深入探討 RAG 的各種高級技術(shù),看看它們是如何塑造未來智能對話、智能客服和語義搜索系統(tǒng)的。
一、什么是 RAG?基礎(chǔ)架構(gòu)與原理
RAG 是一種機器學(xué)習架構(gòu),它由以下三個核心部分組成:
- 檢索系統(tǒng):從知識庫中檢索相關(guān)信息。
- 生成模型:基于檢索到的信息生成回答。
- 融合機制:將外部知識與生成能力相結(jié)合。
這種架構(gòu)讓 RAG 能夠在回答問題時,既利用語言模型的強大生成能力,又借助外部數(shù)據(jù)的豐富性和準確性,從而避免了傳統(tǒng)生成模型可能出現(xiàn)的“幻覺”問題,即生成與事實不符的內(nèi)容。
以下是 RAG 的基礎(chǔ)實現(xiàn)代碼:
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from transformers import pipeline
class BasicRAG:
def __init__(self, documents, model_name="all-MiniLM-L6-v2"):
self.documents = documents
self.encoder = SentenceTransformer(model_name)
self.generator = pipeline("text-generation", model="microsoft/DialoGPT-medium")
self.build_index()
def build_index(self):
"""構(gòu)建 FAISS 索引以實現(xiàn)語義搜索"""
embeddings = self.encoder.encode(self.documents)
self.index = faiss.IndexFlatIP(embeddings.shape[1])
self.index.add(embeddings.astype('float32'))
def retrieve(self, query, k=3):
"""檢索相關(guān)文檔"""
query_embedding = self.encoder.encode([query])
scores, indices = self.index.search(query_embedding.astype('float32'), k)
return [self.documents[i] for i in indices[0]]
def generate(self, query, context):
"""基于上下文生成回答"""
prompt = f"Context: {context}\nQuestion: {query}\nAnswer:"
response = self.generator(prompt, max_length=200)
return response[0]['generated_text']
二、RAG 的高級技術(shù)實現(xiàn)
1. CRAG(Corrective Retrieval-Augmented Generation):糾錯式檢索增強生成
CRAG 是 RAG 的一種進化版本,它引入了糾錯機制,以提高信息檢索的質(zhì)量。這對于需要高精度的企業(yè)問答系統(tǒng)和虛擬助手來說至關(guān)重要。例如,在處理復(fù)雜的客戶咨詢時,CRAG 能夠自動糾正不準確的回答,確保提供的信息既可靠又相關(guān)。
以下是 CRAG 的實現(xiàn)代碼:
import torch
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity
class CRAG:
def __init__(self, documents, confidence_threshold=0.7):
self.documents = documents
self.threshold = confidence_threshold
self.tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
self.model = AutoModel.from_pretrained('bert-base-uncased')
self.build_embeddings()
def build_embeddings(self):
"""為文檔構(gòu)建嵌入向量"""
self.doc_embeddings = []
for doc in self.documents:
inputs = self.tokenizer(doc, return_tensors='pt', truncation=True, padding=True)
with torch.no_grad():
outputs = self.model(**inputs)
embedding = outputs.last_hidden_state.mean(dim=1)
self.doc_embeddings.append(embedding.numpy())
def corrective_retrieve(self, query, k=5):
"""基于置信度的糾錯檢索"""
query_inputs = self.tokenizer(query, return_tensors='pt', truncation=True, padding=True)
with torch.no_grad():
query_outputs = self.model(**query_inputs)
query_embedding = query_outputs.last_hidden_state.mean(dim=1)
similarities = []
for doc_emb in self.doc_embeddings:
sim = cosine_similarity(query_embedding.numpy(), doc_emb)[0][0]
similarities.append(sim)
top_indices = np.argsort(similarities)[-k:][::-1]
corrected_docs = []
for idx in top_indices:
if similarities[idx] > self.threshold:
corrected_docs.append({
'document': self.documents[idx],
'confidence': similarities[idx]
})
ifnot corrected_docs:
return self.external_search(query)
return corrected_docs
def external_search(self, query):
"""當置信度低時進行外部搜索"""
return [{'document': f"External search result for: {query}", 'confidence': 0.5}]
2. CAG(Chain-of-Thought Augmented Generation):思維鏈增強生成
CAG 將思維鏈推理與信息檢索相結(jié)合,能夠處理復(fù)雜的多步驟問題。這對于需要解釋能力的 AI 系統(tǒng)和智能輔導(dǎo)系統(tǒng)來說非常關(guān)鍵。例如,一個虛擬導(dǎo)師可以利用 CAG 來逐步分解復(fù)雜問題,為學(xué)生提供清晰的解答路徑。
以下是 CAG 的實現(xiàn)代碼:
class ChainOfThoughtRAG:
def __init__(self, knowledge_base):
self.kb = knowledge_base
self.reasoning_steps = []
def decompose_query(self, complex_query):
"""將復(fù)雜問題分解為多個子問題"""
decomposition_prompt = f"""
Decompose the following complex question into smaller steps:
Question: {complex_query}
Steps:
1.
2.
3.
"""
steps = self.llm_decompose(decomposition_prompt)
return steps
def chain_retrieve_and_reason(self, query):
"""執(zhí)行思維鏈檢索與推理"""
steps = self.decompose_query(query)
reasoning_chain = []
for i, step in enumerate(steps):
relevant_docs = self.retrieve_for_step(step)
step_reasoning = self.reason_step(step, relevant_docs, reasoning_chain)
reasoning_chain.append({
'step': i + 1,
'question': step,
'evidence': relevant_docs,
'reasoning': step_reasoning
})
final_answer = self.synthesize_chain(reasoning_chain, query)
return final_answer, reasoning_chain
def reason_step(self, step, evidence, previous_reasoning):
"""對當前步驟進行推理"""
context = "\n".join([doc['content'] for doc in evidence])
previous_context = "\n".join([r['reasoning'] for r in previous_reasoning])
reasoning_prompt = f"""
Previous context: {previous_context}
Evidence: {context}
Current question: {step}
Step-by-step reasoning:
"""
return self.generate_reasoning(reasoning_prompt)
def synthesize_chain(self, reasoning_chain, original_query):
"""合成最終答案"""
chain_summary = "\n".join([f"Step {r['step']}: {r['reasoning']}"for r in reasoning_chain])
synthesis_prompt = f"""
Original question: {original_query}
Reasoning chain:
{chain_summary}
Integrated final answer:
"""
return self.generate_final_answer(synthesis_prompt)
3. Graph RAG:知識圖譜中的智能導(dǎo)航
Graph RAG 利用知識圖譜捕捉實體之間的復(fù)雜關(guān)系,提供更豐富的語境檢索和高級語義導(dǎo)航。例如,在藥物發(fā)現(xiàn)領(lǐng)域,Graph RAG 可以通過分子網(wǎng)絡(luò)的智能導(dǎo)航,幫助研究人員快速找到潛在的藥物靶點。
以下是 Graph RAG 的實現(xiàn)代碼:
import networkx as nx
from neo4j import GraphDatabase
import torch
from torch_geometric.nn import GCNConv
class GraphRAG:
def __init__(self, neo4j_uri, username, password):
self.driver = GraphDatabase.driver(neo4j_uri, auth=(username, password))
self.graph = nx.Graph()
self.entity_embeddings = {}
self.build_graph()
def build_graph(self):
"""構(gòu)建知識圖譜"""
with self.driver.session() as session:
nodes_query = "MATCH (n) RETURN n.id as id, n.type as type, n.properties as props"
edges_query = "MATCH (a)-[r]->(b) RETURN a.id as source, b.id as target, type(r) as relation"
nodes = session.run(nodes_query)
edges = session.run(edges_query)
for node in nodes:
self.graph.add_node(node['id'], type=node['type'], **node['props'])
for edge in edges:
self.graph.add_edge(edge['source'], edge['target'], relation=edge['relation'])
def graph_walk_retrieve(self, query_entities, max_hops=3):
"""基于圖遍歷的檢索"""
relevant_subgraph = nx.Graph()
visited = set()
queue = [(entity, 0) for entity in query_entities]
while queue:
current_entity, depth = queue.pop(0)
if depth > max_hops or current_entity in visited:
continue
visited.add(current_entity)
if current_entity in self.graph:
relevant_subgraph.add_node(current_entity, **self.graph.nodes[current_entity])
for neighbor in self.graph.neighbors(current_entity):
edge_data = self.graph.edges[current_entity, neighbor]
if self.is_relevant_relation(edge_data['relation']):
relevant_subgraph.add_edge(current_entity, neighbor, **edge_data)
queue.append((neighbor, depth + 1))
return relevant_subgraph
def gnn_enhanced_retrieval(self, subgraph, query_embedding):
"""使用 GNN 提升檢索效果"""
node_features = self.extract_node_features(subgraph)
edge_index = self.get_edge_index(subgraph)
gcn = GCNConv(node_features.size(1), 128)
enhanced_embeddings = gcn(node_features, edge_index)
similarities = torch.cosine_similarity(query_embedding.unsqueeze(0), enhanced_embeddings)
top_nodes = torch.topk(similarities, k=10)
return top_nodes
def multi_hop_reasoning(self, start_entities, target_concept):
"""多跳推理"""
paths = []
for start in start_entities:
try:
shortest_paths = nx.shortest_path(self.graph, start, target_concept)
path_knowledge = self.extract_path_knowledge(shortest_paths)
paths.append(path_knowledge)
except nx.NetworkXNoPath:
continue
return self.synthesize_multi_hop_answer(paths)
4. Agentic RAG:自主決策與動態(tài)適應(yīng)
Agentic RAG 引入了自主代理,這些代理可以根據(jù)上下文動態(tài)決策何時以及如何檢索信息。例如,在金融市場分析中,Agentic RAG 可以實時分析數(shù)據(jù),并根據(jù)市場變化動態(tài)調(diào)整策略。
以下是 Agentic RAG 的實現(xiàn)代碼:
from enum import Enum
import asyncio
from typing import List, Dict, Any
class AgentAction(Enum):
RETRIEVE = "retrieve"
GENERATE = "generate"
VERIFY = "verify"
SEARCH_EXTERNAL = "search_external"
DECOMPOSE = "decompose"
class RAGAgent:
def __init__(self, tools, memory_size=1000):
self.tools = tools
self.memory = []
self.memory_size = memory_size
self.state = "idle"
self.confidence_threshold = 0.8
asyncdef plan_and_execute(self, user_query):
"""自主規(guī)劃與執(zhí)行"""
query_complexity = self.analyze_query_complexity(user_query)
action_plan = self.create_action_plan(user_query, query_complexity)
results = []
for action in action_plan:
result = await self.execute_action(action, user_query, results)
results.append(result)
if self.should_replan(result):
new_plan = self.replan(user_query, results)
action_plan.extend(new_plan)
final_answer = self.synthesize_results(results, user_query)
self.update_memory(user_query, final_answer, results)
return final_answer
def create_action_plan(self, query, complexity):
"""根據(jù)復(fù)雜度創(chuàng)建行動計劃"""
if complexity == "simple":
return [{"action": AgentAction.RETRIEVE, "params": {"k": 3}},
{"action": AgentAction.GENERATE, "params": {"style": "direct"}}]
elif complexity == "complex":
return [{"action": AgentAction.DECOMPOSE, "params": {}},
{"action": AgentAction.RETRIEVE, "params": {"k": 5}},
{"action": AgentAction.VERIFY, "params": {"threshold": 0.7}},
{"action": AgentAction.GENERATE, "params": {"style": "detailed"}}]
else: # very_complex
return [{"action": AgentAction.DECOMPOSE, "params": {}},
{"action": AgentAction.RETRIEVE, "params": {"k": 10}},
{"action": AgentAction.SEARCH_EXTERNAL, "params": {}},
{"action": AgentAction.VERIFY, "params": {"threshold": 0.9}},
{"action": AgentAction.GENERATE, "params": {"style": "comprehensive"}}]
asyncdef execute_action(self, action_config, query, previous_results):
"""執(zhí)行特定動作"""
action = action_config["action"]
params = action_config["params"]
if action == AgentAction.RETRIEVE:
returnawait self.tools["retriever"].retrieve(query, **params)
elif action == AgentAction.GENERATE:
context = self.build_context(previous_results)
returnawait self.tools["generator"].generate(query, context, **params)
elif action == AgentAction.VERIFY:
returnawait self.verify_information(previous_results, **params)
elif action == AgentAction.SEARCH_EXTERNAL:
returnawait self.tools["Web Search"].search(query, **params)
elif action == AgentAction.DECOMPOSE:
returnawait self.decompose_complex_query(query)
def should_replan(self, result):
"""根據(jù)結(jié)果決定是否重新規(guī)劃"""
if hasattr(result, 'confidence') and result.confidence < self.confidence_threshold:
returnTrue
if hasattr(result, 'error') and result.error:
returnTrue
returnFalse
def adaptive_learning(self, feedback):
"""基于反饋的自適應(yīng)學(xué)習"""
if feedback['success']:
self.confidence_threshold *= 0.95# 更加自信
else:
self.confidence_threshold *= 1.05# 更加謹慎
self.update_strategy_memory(feedback)
5. Adaptive RAG:個性化與持續(xù)學(xué)習
Adaptive RAG 實現(xiàn)了持續(xù)學(xué)習和動態(tài)個性化,能夠根據(jù)用戶的偏好和特定上下文進行調(diào)整。例如,在個性化學(xué)習平臺上,Adaptive RAG 可以根據(jù)學(xué)生的學(xué)習風格和進度,提供定制化的學(xué)習內(nèi)容。
以下是 Adaptive RAG 的實現(xiàn)代碼:
import numpy as np
from sklearn.cluster import KMeans
from collections import defaultdict
import pickle
class AdaptiveRAG:
def __init__(self, base_retriever, user_profile_path=None):
self.base_retriever = base_retriever
self.user_profiles = defaultdict(dict)
self.adaptation_history = []
self.context_clusters = None
self.load_user_profiles(user_profile_path)
def adapt_to_user(self, user_id, query, feedback_history):
"""根據(jù)用戶偏好調(diào)整系統(tǒng)"""
if user_id notin self.user_profiles:
self.user_profiles[user_id] = self.create_user_profile()
profile = self.user_profiles[user_id]
self.update_preferences(profile, feedback_history)
adapted_strategy = self.adapt_retrieval_strategy(profile, query)
return adapted_strategy
def create_user_profile(self):
"""創(chuàng)建用戶初始偏好配置"""
return {
'domain_preferences': {},
'complexity_preference': 'medium',
'response_style': 'balanced',
'topic_interests': [],
'feedback_patterns': [],
'success_metrics': {
'accuracy': 0.5,
'relevance': 0.5,
'completeness': 0.5
}
}
def update_preferences(self, profile, feedback_history):
"""根據(jù)反饋更新用戶偏好"""
for feedback in feedback_history[-10:]:
for metric, value in feedback['metrics'].items():
current = profile['success_metrics'][metric]
profile['success_metrics'][metric] = 0.9 * current + 0.1 * value
domain = feedback.get('domain')
if domain:
if domain notin profile['domain_preferences']:
profile['domain_preferences'][domain] = 0.5
if feedback['rating'] > 3:
profile['domain_preferences'][domain] *= 1.1
else:
profile['domain_preferences'][domain] *= 0.9
def adapt_retrieval_strategy(self, profile, query):
"""根據(jù)用戶偏好調(diào)整檢索策略"""
query_domain = self.detect_domain(query)
k = self.calculate_adaptive_k(profile, query_domain)
similarity_threshold = self.calculate_threshold(profile)
preferred_sources = self.select_sources(profile, query_domain)
return {
'k': k,
'threshold': similarity_threshold,
'sources': preferred_sources,
'reranking_weights': self.get_reranking_weights(profile)
}
def contextual_adaptation(self, context_type, session_history):
"""基于會話上下文的動態(tài)調(diào)整"""
if self.context_clusters isNone:
self.build_context_clusters()
current_cluster = self.identify_context_cluster(context_type)
similar_sessions = self.find_similar_sessions(current_cluster)
adaptation_params = self.learn_from_similar_sessions(similar_sessions)
return adaptation_params
def meta_learning_update(self, task_performance):
"""元學(xué)習:根據(jù)任務(wù)表現(xiàn)優(yōu)化調(diào)整策略"""
performance_patterns = self.analyze_performance_patterns(task_performance)
for pattern in performance_patterns:
if pattern['success_rate'] > 0.8:
self.promote_strategy(pattern['strategy'])
elif pattern['success_rate'] < 0.4:
self.demote_strategy(pattern['strategy'])
self.save_adaptation_knowledge()
def real_time_adaptation(self, query, initial_results, user_interaction):
"""實時動態(tài)調(diào)整"""
interaction_signals = self.extract_interaction_signals(user_interaction)
if interaction_signals['needs_more_detail']:
enhanced_results = self.enhance_detail(initial_results)
return enhanced_results
elif interaction_signals['needs_simplification']:
simplified_results = self.simplify_results(initial_results)
return simplified_results
elif interaction_signals['needs_different_perspective']:
alternative_results = self.find_alternative_perspective(query)
return alternative_results
return initial_results
6. Multi Modal RAG:多模態(tài)信息整合
Multi Modal RAG 處理并整合文本、圖像、音頻和視頻等多種模態(tài)的信息,提供全面的跨模態(tài)檢索。例如,在醫(yī)療診斷中,Multi Modal RAG 可以同時分析病人的病歷文本、醫(yī)學(xué)影像和生理信號,為醫(yī)生提供更全面的診斷支持。
以下是 Multi Modal RAG 的實現(xiàn)代碼:
import torch
import torchvision.transforms as transforms
from transformers import CLIPModel, CLIPProcessor
import librosa
import cv2
from PIL import Image
class MultiModalRAG:
def __init__(self):
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.text_index = None
self.image_index = None
self.audio_index = None
self.video_index = None
self.multimodal_embeddings = {}
def process_multimodal_document(self, document):
"""處理多模態(tài)文檔"""
processed_doc = {
'id': document['id'],
'embeddings': {},
'content': {}
}
if'text'in document:
text_embedding = self.encode_text(document['text'])
processed_doc['embeddings']['text'] = text_embedding
processed_doc['content']['text'] = document['text']
if'images'in document:
image_embeddings = []
for img_path in document['images']:
img_embedding = self.encode_image(img_path)
image_embeddings.append(img_embedding)
processed_doc['embeddings']['images'] = image_embeddings
processed_doc['content']['images'] = document['images']
if'audio'in document:
audio_embedding = self.encode_audio(document['audio'])
processed_doc['embeddings']['audio'] = audio_embedding
processed_doc['content']['audio'] = document['audio']
if'video'in document:
video_embedding = self.encode_video(document['video'])
processed_doc['embeddings']['video'] = video_embedding
processed_doc['content']['video'] = document['video']
return processed_doc
def encode_text(self, text):
"""使用 CLIP 編碼文本"""
inputs = self.clip_processor(text=text, return_tensors="pt")
with torch.no_grad():
text_features = self.clip_model.get_text_features(**inputs)
return text_features.numpy()
def encode_image(self, image_path):
"""使用 CLIP 編碼圖像"""
image = Image.open(image_path)
inputs = self.clip_processor(images=image, return_tensors="pt")
with torch.no_grad():
image_features = self.clip_model.get_image_features(**inputs)
return image_features.numpy()
def encode_audio(self, audio_path):
"""提取音頻特征"""
y, sr = librosa.load(audio_path)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
chroma = librosa.feature.chroma_stft(y=y, sr=sr)
audio_features = np.concatenate([np.mean(mfccs, axis=1), np.mean(spectral_centroids, axis=1), np.mean(chroma, axis=1)])
return audio_features
def encode_video(self, video_path):
"""提取視頻的關(guān)鍵幀并編碼"""
cap = cv2.VideoCapture(video_path)
frame_embeddings = []
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
interval = max(1, frame_count // 10)
for i in range(0, frame_count, interval):
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
frame_embedding = self.encode_image(pil_image)
frame_embeddings.append(frame_embedding)
cap.release()
if frame_embeddings:
return np.mean(frame_embeddings, axis=0)
else:
return np.zeros(512)
def cross_modal_retrieve(self, query, modalities=['text', 'image'], k=5):
"""跨模態(tài)檢索"""
query_embeddings = {}
if'text'in modalities and isinstance(query, str):
query_embeddings['text'] = self.encode_text(query)
if'image'in modalities and hasattr(query, 'image_path'):
query_embeddings['image'] = self.encode_image(query.image_path)
all_results = []
for modality, query_emb in query_embeddings.items():
modal_results = self.search_modality(modality, query_emb, k)
all_results.extend(modal_results)
fused_results = self.multimodal_fusion(all_results, query_embeddings)
return fused_results[:k]
def multimodal_fusion(self, results, query_embeddings):
"""多模態(tài)結(jié)果融合"""
scored_results = []
for result in results:
total_score = 0
modality_count = 0
for modality, query_emb in query_embeddings.items():
if modality in result['embeddings']:
similarity = self.calculate_similarity(query_emb, result['embeddings'][modality])
total_score += similarity
modality_count += 1
if modality_count > 0:
avg_score = total_score / modality_count
multimodal_bonus = 1 + (modality_count - 1) * 0.1
final_score = avg_score * multimodal_bonus
scored_results.append({
'document': result,
'score': final_score
})
scored_results.sort(key=lambda x: x['score'], reverse=True)
return [r['document'] for r in scored_results]
def generate_multimodal_response(self, query, retrieved_docs):
"""生成多模態(tài)回答"""
response = {
'text': '',
'images': [],
'audio': None,
'video': None
}
text_content = []
for doc in retrieved_docs:
if'text'in doc['content']:
text_content.append(doc['content']['text'])
if text_content:
response['text'] = self.generate_text_response(query, text_content)
relevant_images = []
for doc in retrieved_docs:
if'images'in doc['content']:
relevant_images.extend(doc['content']['images'])
response['images'] = relevant_images[:3]
return response
7. W-RAG(Web-Enhanced RAG):與網(wǎng)絡(luò)的實時整合
W-RAG 將實時網(wǎng)絡(luò)搜索與本地檢索相結(jié)合,提供最新信息和更廣泛的知識覆蓋。例如,在新聞報道中,W-RAG 可以實時獲取最新的新聞動態(tài),并將其整合到智能寫作系統(tǒng)中。
以下是 W-RAG 的實現(xiàn)代碼:
from bs4 import BeautifulSoup
import asyncio
import aiohttp
from datetime import datetime, timedelta
class WebEnhancedRAG:
def __init__(self, local_retriever, web_apis):
self.local_retriever = local_retriever
self.web_apis = web_apis
self.cache = {}
self.cache_ttl = timedelta(hours=1)
self.web_sources = {
'news': ['reuters.com', 'bbc.com', 'cnn.com'],
'academic': ['arxiv.org', 'scholar.google.com'],
'technical': ['stackoverflow.com', 'github.com'],
'general': ['wikipedia.org']
}
asyncdef hybrid_retrieve(self, query, local_weight=0.6, web_weight=0.4):
"""混合檢索:本地 + 網(wǎng)絡(luò)"""
local_results = await self.local_retriever.retrieve(query)
web_needed = self.assess_web_necessity(query, local_results)
if web_needed:
web_results = await self.web_search_async(query)
combined_results = self.combine_local_web(local_results, web_results, local_weight, web_weight)
else:
combined_results = local_results
return combined_results
def assess_web_necessity(self, query, local_results):
"""評估是否需要網(wǎng)絡(luò)搜索"""
temporal_indicators = ['latest', 'recent', 'current', 'today', 'news']
has_temporal = any(indicator in query.lower() for indicator in temporal_indicators)
local_confidence = self.calculate_local_confidence(local_results)
dynamic_topics = ['stock', 'weather', 'news', 'covid', 'election']
is_dynamic = any(topic in query.lower() for topic in dynamic_topics)
return has_temporal or local_confidence < 0.7or is_dynamic
asyncdef web_search_async(self, query):
"""異步網(wǎng)絡(luò)搜索"""
cache_key = f"web_{hash(query)}"
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
return cached_result
search_type = self.classify_query_type(query)
tasks = []
if'google'in self.web_apis:
tasks.append(self.google_search(query))
if'bing'in self.web_apis:
tasks.append(self.bing_search(query))
if search_type == 'news':
tasks.append(self.news_search(query))
elif search_type == 'academic':
tasks.append(self.academic_search(query))
elif search_type == 'technical':
tasks.append(self.technical_search(query))
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_results = self.process_web_results(results, query)
self.cache[cache_key] = (processed_results, datetime.now())
return processed_results
asyncdef google_search(self, query):
"""使用 Google Custom Search API 搜索"""
api_key = self.web_apis['google']['api_key']
cx = self.web_apis['google']['cx']
url = "https://www.googleapis.com/customsearch/v1"
params = {
'key': api_key,
'cx': cx,
'q': query,
'num': 10
}
asyncwith aiohttp.ClientSession() as session:
asyncwith session.get(url, params=params) as response:
data = await response.json()
results = []
for item in data.get('items', []):
results.append({
'title': item['title'],
'url': item['link'],
'snippet': item['snippet'],
'source': 'google'
})
return results
asyncdef news_search(self, query):
"""在新聞源中搜索"""
news_results = []
if'newsapi'in self.web_apis:
api_key = self.web_apis['newsapi']['api_key']
url = "https://newsapi.org/v2/everything"
params = {
'apiKey': api_key,
'q': query,
'sortBy': 'publishedAt',
'pageSize': 10
}
asyncwith aiohttp.ClientSession() as session:
asyncwith session.get(url, params=params) as response:
data = await response.json()
for article in data.get('articles', []):
news_results.append({
'title': article['title'],
'url': article['url'],
'snippet': article['description'],
'published': article['publishedAt'],
'source': 'news'
})
return news_results
def real_time_fact_checking(self, claim, sources):
"""實時事實核查"""
fact_check_results = []
for source in sources:
source_info = self.extract_source_info(source)
credibility_score = self.assess_source_credibility(source_info)
consistency_score = self.check_claim_consistency(claim, source['content'])
fact_check_results.append({
'source': source,
'credibility': credibility_score,
'consistency': consistency_score,
'verdict': self.determine_verdict(credibility_score, consistency_score)
})
return self.aggregate_fact_check(fact_check_results)
def temporal_aware_retrieval(self, query, time_sensitivity='medium'):
"""時間感知檢索"""
time_windows = {
'high': timedelta(hours=1),
'medium': timedelta(days=1),
'low': timedelta(weeks=1)
}
cutoff_time = datetime.now() - time_windows[time_sensitivity]
recent_results = [result for result in self.all_results if result.get('timestamp', datetime.min) > cutoff_time]
if len(recent_results) < 3:
cutoff_time = datetime.now() - time_windows['low']
recent_results = [r for r in self.all_results if r.get('timestamp', datetime.min) > cutoff_time]
return recent_results
def web_content_extraction(self, url):
"""智能提取網(wǎng)頁內(nèi)容"""
try:
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'nav', 'footer', 'aside']):
element.decompose()
main_content = self.extract_main_content(soup)
metadata = self.extract_metadata(soup)
return {
'content': main_content,
'metadata': metadata,
'url': url,
'extracted_at': datetime.now()
}
except Exception as e:
return {'error': str(e), 'url': url}
三、RAG 的未來趨勢
RAG 的未來發(fā)展方向令人期待,主要包括以下幾個方面:
- 高級多模態(tài)整合:無縫處理所有模態(tài)的信息,讓 AI 能夠同時理解文本、圖像、音頻和視頻。
- 因果推理:理解因果關(guān)系,而不僅僅是相關(guān)性,使 AI 的回答更具邏輯性和說服力。
- 極致個性化:實時適應(yīng)個體需求,為每個用戶提供完全定制化的體驗。
- 計算效率優(yōu)化:通過優(yōu)化算法和硬件加速,支持大規(guī)模部署,讓 RAG 技術(shù)能夠應(yīng)用于更廣泛的場景。
- 完全可解釋性:決策過程完全透明,用戶可以清楚地了解 AI 是如何得出結(jié)論的。
四、RAG 的實際應(yīng)用
RAG 的各種高級技術(shù)已經(jīng)在多個領(lǐng)域得到了廣泛應(yīng)用,以下是一些典型的應(yīng)用場景:
- 智能客服:自動糾正不準確的回答,提升客戶滿意度。
- 虛擬醫(yī)療助手:驗證關(guān)鍵信息,輔助醫(yī)生進行診斷。
- 個性化學(xué)習平臺:根據(jù)學(xué)生的學(xué)習風格提供定制化教學(xué)內(nèi)容。
- 智能寫作系統(tǒng):實時獲取最新信息,輔助新聞報道和內(nèi)容創(chuàng)作。
五、總結(jié)
RAG 的高級技術(shù)為智能系統(tǒng)的發(fā)展帶來了新的可能性。從糾錯機制到多模態(tài)整合,從因果推理到極致個性化,這些技術(shù)不僅提升了系統(tǒng)的性能,還為用戶提供了更智能、更貼心的服務(wù)。未來,隨著技術(shù)的不斷進步,RAG 將在更多領(lǐng)域發(fā)揮更大的作用,讓我們拭目以待!
本文轉(zhuǎn)載自??Halo咯咯?? 作者:基咯咯
