Agent-camel框架学习记录(4)


原作者链接🔗:https://fmhw1n4zpn.feishu.cn/docx/AF4XdOZpIo6TOaxzDK8cxInNnCe

检索增强生成 (RAG) 简介及关键组件

RAG 概述

检索增强生成(RAG)是一种增强生成式AI模型(特别是LLM)准确性和可靠性的技术。它通过从外部知识源中检索相关事实信息,并将这些信息提供给LLM,从而使模型能够生成更权威、有出处且减少“幻觉”的答案。

RAG 的核心优势

提供可信来源:为AI模型的输出提供“脚注”,用户可以查证,增强信任。
减少“幻觉”:弥补LLM在特定或最新信息方面的不足,降低模型“编造”答案的可能性。
实现简单高效:相对于重新训练模型,RAG实现成本更低、速度更快,且支持信息源的即时更新。
广泛应用性:可与各种数据存储库结合,应用于医疗、金融、客户服务、员工培训等几乎所有行业。

CAMEL 框架中的 RAG 组件

在 CAMEL 框架中,RAG 的实现主要依赖于数据加载和数据嵌入这两个核心环节,以构建智能问答系统和对话Agent等应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1. Loaders (数据加载器)
Loaders 是 CAMEL 框架中用于数据加载、预处理和结构化的模块,旨在将各种格式的数据转化为LLM可处理的形式。

**Base IO:
功能:处理与文件相关的基础输入/输出操作,能够读取PDF等多种文件格式,提取内容并表示为 File 对象。
示例:从本地PDF文件读取内容。

**Unstructured IO:
功能:专注于非结构化数据(如文件、URL内容)的解析、清洗、提取和分块。其核心在于高级 **ETL(提取、转换、加载)**能力,为RAG等应用准备数据。

**关键方法:
parse_file_or_url:从文件或URL加载并解析数据。
clean_text_data:执行多种文本清洗操作(如替换引号、清理破折号、去除多余空白)。
extract_data_from_text:从文本中提取特定类型信息(如邮件地址)。
chunk_elements:对解析后的内容进行分块,优化检索效率。
stage_elements:为不同平台或下游处理准备数据元素。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
# 设置示例脏文本
example_dirty_text = ("\x93Some dirty text ’ with extra spaces and – dashes.一些包含额外空格和破折号的脏文本’。")
# 设置清理选项
options = [
('replace_unicode_quotes', {}), # 替换Unicode引号
('clean_dashes', {}), # 清理破折号
('clean_non_ascii_chars', {}), # 清理非ASCII字符
('clean_extra_whitespace', {}), # 清理多余空白
]
cleaned_text = uio.clean_text_data(text=example_dirty_text,
clean_options=options)
print(cleaned_text)

目前支持的清理操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
- replace_unicode_quotes: 将Unicode引号替换为标准引号
- clean_dashes: 清理破折号,统一格式
- clean_non_ascii_chars: 清理非ASCII字符
- clean_extra_whitespace: 清理多余空白
- clean_bullets: 清理项目符号
- clean_ordered_bullets: 清理有序列表符号
- clean_postfix: 清理后缀
- clean_prefix: 清理前缀
- clean_trailing_punctuation: 清理尾部标点
- group_broken_paragraphs: 合并断开的段落
- remove_punctuation: 移除标点符号
- bytes_string_to_string: 将字节字符串转换为普通字符串
- translate_text: 翻译文本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2. Embeddings (嵌入)
概念:嵌入是将不同类型的数据(文本、图像、视频)转化为机器可理解的数值向量的过程,这些向量能够捕获数据的核心特征和语义含义。

**文本嵌入:
功能:将文本数据转换为数值向量,使机器能够基于语义相似性而非字面匹配来处理和比较文本。
意义:实现信息检索、问答系统、文本聚类与分类等高级语义处理任务。

常用模型技术:
OpenAIEmbedding / OpenAICompatibleEmbedding:基于大规模LLM生成嵌入,理解复杂语境。
SentenceTransformerEncoder:专为句子级别语义表示设计,适用于高效语义搜索。
MistralEmbedding:基于Mistral模型的嵌入方法。

**图像嵌入:
功能:将图像转化为数值向量,表示图像的视觉特征(形状、颜色、纹理等),使机器能够理解图像内容。
意义:应用于图像分类、相似性比较、图像检索(如以图搜图)。
实现:通常依赖CNN(如ResNet)或Vision Transformer模型。

CAMEL支持:VisionLanguageEmbedding 类(如基于CLIP),能同时处理图像和文本,生成多模态嵌入。

通过结合 Loaders 进行高效的数据准备和 Embeddings 进行语义理解,CAMEL 框架为构建强大且可靠的 RAG 系统提供了坚实的基础。

存储 (Storages) 与 检索 (Retrievers)组件

1. Storages (数据存储)

概念:CAMEL 框架中的 Storage 模块是一个统一的数据存储与管理层,提供标准接口处理多种数据类型,为 RAG 应用奠定基础。

  • 键值存储 (Key-Value Storage)
    • BaseKeyValueStorage:抽象基类,用于标准化键值数据的保存、加载和清除操作,通常通过Python字典接口交互。
    • InMemoryKeyValueStorage:具体实现,数据存储在内存中,适用于开发、测试和临时存储(数据易失)。
  • 向量存储 (Vector Store)
    • 目的:存储高维度向量数据(如嵌入表示),支持高效相似度计算和最近邻搜索,是 RAG 中检索相关信息的核心。
    • 特点:高性能、可扩展、灵活(支持多种相似度度量,如点积、余弦、欧氏距离)。
    • BaseVectorStorage:抽象基类,定义了添加、删除、查询向量和管理数据库状态等操作。
    • MilvusStorage:与云原生向量搜索引擎 Milvus 集成,优化大规模向量检索。
    • QdrantStorage:与 Qdrant 向量搜索引擎集成,提供高效的向量存储、管理和 ANN 搜索,适用于推荐系统、NLP嵌入查询等。
  • 图存储 (Graph Storage)
    • 目的:存储和管理图数据(节点、边),支持复杂的图查询和分析。
    • BaseGraphStorage:抽象基类,定义了获取客户端、模式管理、添加/删除三元组和执行图查询等操作。
    • NebulaGraph:与分布式图数据库 NebulaGraph 集成,适用于知识图谱、社交网络分析等大规模图数据场景。
    • Neo4jGraph:与行业领先的图数据库 Neo4j 集成,利用其关系建模和高效查询能力,适用于知识图谱、诈骗检测等。

2. Retrievers (信息检索器)

概念:Retrievers 模块充当“搜索引擎”,负责在海量数据中高效查找特定信息,支持向量检索和关键词检索。

  • 向量检索器 (Vector Retriever)

    • 原理:将数据(文本、图像)通过嵌入模型转化为高维向量,存储在向量存储中。查询时,将查询转换为向量,并在向量存储中寻找语义上最接近的匹配。
    • 优势:擅长语义搜索,理解自然语言的模糊关系,适用于推荐系统、语义查询和跨模态搜索。
    • VectorRetriever
      • 初始化:可选择传入嵌入模型(如 SentenceTransformerEncoder),否则默认使用 OpenAIEmbedding
      • process():处理内容(文件/URL),分块,并将嵌入存储到指定的向量存储(如 QdrantStorage)中。
      • query():根据查询字符串从存储中检索最匹配的信息。
  • 关键词检索器 (Keyword Retriever)

    • 原理:对文档进行预处理(如分词、建立索引),直接解析用户查询的关键词并匹配相应文档内容。
    • 优势:依赖关键词精确匹配,适合快速查找特定术语或短语。
    • 特点
      更细粒度的控制:你需要手动导入并初始化 QdrantStorage,明确指定 vector_dim、collection_name 和 path。

      分步操作:process() 方法用于内容的嵌入和存储,query() 方法用于查询。这两个步骤是分开的。

      适用场景:当你需要对向量存储的配置(如集合名称、存储路径)有更精确的控制,或者需要分步执行数据的处理和查询时,这种方式更合适。例如,你可能想先处理大量数据并存储,然后再在不同的时间点执行查询。
  • 自动化检索 (AutoRetriever)

    • 功能:简化检索流程,自动处理嵌入、数据存储和查询。

    • run_vector_retriever():接受多个内容输入路径和查询字符串,自动执行向量检索过程,并可返回详细元数据。适用于批量内容处理和查询。

    • 特点
      更高的抽象级别:AutoRetriever 封装了 VectorRetriever 和底层存储(如 QdrantStorage)的创建和配置细节。你只需指定 vector_storage_local_path 和 storage_type。

      一步到位:run_vector_retriever() 方法将内容的嵌入、存储和查询整合到一个函数调用中。这大大简化了代码。
      便捷性:更适合快速原型开发、演示或当你不需要对底层存储有过多自定义控制时。它提供了开箱即用的体验。
      简化的配置:不需要手动计算 vector_dim,AutoRetriever 会自动处理。它还会为你生成默认的集合名称。

  • 两种检索方式的区别与优劣

    • 如果你追求快速实现和代码简洁AutoRetriever 是更好的选择。如果你需要对向量存储的底层配置和操作流程有更多控制,那么使用 VectorRetriever 结合手动配置会更合适。

Storages 模块提供了 RAG 所需的底层数据基础设施,而 Retrievers 模块则在此基础上实现了高效的信息查找能力,两者协同工作,使得 RAG 能够精准地从外部知识中获取相关上下文,并将其反馈给LLM进行增强生成。

向量数据库及其在知识库搭建中的应用

1. 向量数据库 (Vector Database) 介绍

概念向量数据库是一种专门用于存储、管理和检索高维向量数据的数据库系统。它在现代AI和机器学习应用中至关重要,因为文本、图像、音频等数据常被转换为高维向量(即嵌入),以捕捉其语义或特征。

主要功能:

  • 向量存储:高效存储大规模高维向量数据,通常还可关联元数据(描述性信息)。
  • 相似度搜索:核心功能,通过高效的近似最近邻(ANN)搜索算法,快速找到与查询向量语义最相似的向量集合。
  • 扩展性:支持水平扩展,以应对不断增长的数据存储和计算需求。

常见应用:

  • 推荐系统:根据用户行为或偏好,检索并推荐相似的产品或内容。
  • 图像和视频搜索:通过内容(而非关键词)检索相似的图像或视频。
  • 自然语言处理(NLP):在嵌入空间中查找语义相似的文本或词语,支持语义搜索、问答系统等。

实现技术:

向量数据库通常采用以下算法和数据结构来实现高效的相似度搜索:

  • HNSW (Hierarchical Navigable Small World) 图:一种基于图的ANN搜索算法,以其高查询效率和准确性而闻名。
  • 局部敏感哈希 (LSH):通过哈希函数将相似向量映射到相同“桶”中,实现快速检索。
  • 产品量化 (PQ):通过将向量分解并量化来减少存储和计算成本。

常见向量数据库系统:

  • Milvus:开源向量数据库,支持亿级向量的高效存储和检索。
  • Pinecone:提供向量数据库即服务 (DBaaS),简化管理和查询。
  • Weaviate:开源向量搜索引擎,支持多种数据模式和扩展。

CAMEL 框架中的向量数据库集成:

CAMEL 框架通过 camel.storages.vectordb_storages 包提供了与多种向量数据库的交互能力:

  • base 模块:定义了向量存储的基础类和接口(如 BaseVectorStorageVectorDBQueryVectorDBQueryResult)。

  • milvus 模块:实现与 Milvus 数据库的交互(MilvusStorage)。

  • qdrant 模块:实现与 Qdrant 数据库的交互(QdrantStorage)。

  • weaviate 模块:实现与 Weaviate 数据库的交互(WeaviateStorage)。

    这种模块化设计使得 CAMEL 能够灵活适应不同的应用需求和向量数据库选择。


2. 知识库搭建流程中的数据处理

搭建高效的知识库是 RAG 应用成功的关键,其中数据预处理和嵌入模型选择至关重要。


Embedding 模型选择:

选择合适的嵌入模型是构建知识库的第一步。

  • Embedding API 服务 (e.g., OpenAI, Cohere, Mistral)
    • 优点:方便易用,无需本地部署和管理模型。
    • 适用场景:项目或工程中对数据隐私要求不高,且预算充足的情况。
  • 开源 Embedding 模型 (Open Embedding Model)
    • 优点:数据隐私性好,可本地部署,提供更多自定义和微调空间。
    • 适用场景:对数据隐私有严格要求,或需要针对特定任务优化模型性能的情况。
  • 选择标准:没有绝对的“最好”,只有“最适合”。建议构建应用场景专有评测集,通过量化评估来选择最适合自身数据的模型。

数据预处理:

数据预处理是保证 RAG 系统高质量输出的关键步骤,遵循“Garbage in, Garbage out”原则。

  • 目的:将原始、多样化的数据转化为LLM和嵌入模型能够高效处理的统一格式,并去除噪声。
  • 常用手段
    1. 数据读取:从各种来源(文件、数据库、API)获取原始数据。
    2. 数据格式转换:将不同格式(如PDF、TXT、DOC、PPT)的数据统一转换为便于模型处理的格式,Markdown 格式是常用的选择。
    3. 数据清洗
      • 去除噪声:移除不相关或干扰信息。
      • 纠正错误:修正拼写、语法错误等。
      • 信息筛选:提取核心信息,过滤冗余内容。

构建RAG应用:从基础到优化

RAG(Retrieval Augmented Generation,检索增强生成)是一种结合检索生成的技术路线,旨在增强大型语言模型(LLM)的能力,解决其在时效性和幻觉方面的问题。


1. Basic RAG

1.1 RAG 的核心思想

当LLM处理大量文本(例如数千篇文档、上亿个token)时,直接将所有文本作为上下文输入给LLM会迅速超出其上下文长度限制。RAG的核心思想是:

  1. 检索(Retrieval):从海量文本数据中智能地检索与用户查询最相关的少量信息片段。
  2. 增强生成(Augmented Generation):将检索到的相关信息作为上下文合并到用户Prompt中,然后交付给LLM进行回复生成。

1.2 CAMEL 框架中的检索器

CAMEL 框架通过 camel.retrievers 模块实现了检索功能,主要分为两大类:

  • VectorRetriever (向量检索器)
    • 工作流
      1. 分块:将大型文档分解为较小的文本块。
      2. 嵌入:使用嵌入模型(如 SentenceTransformerEncoder)将每个文本块转换为高维向量。
      3. 存储:将这些向量存储在向量数据库(如 QdrantStorage)中。
      4. 检索:当用户查询时,将查询转换为向量,在向量数据库中搜索相似度高(通常使用余弦相似度)的匹配向量,并返回最相关的信息片段。
    • process() 函数:CAMEL 封装了文件读取、切块、嵌入和存储的整个流程,简化了RAG应用的搭建。
  • BM25Retriever (关键词检索器)
    • 工作原理:从稀疏的关键词维度进行文本召回,通过关键词匹配算法筛选出最相关的文本片段。

1.3 Basic RAG 实践步骤

  1. 读取示例数据:下载并准备作为知识库的原始文档(例如PDF文件)。
  2. 实例化嵌入模型:选择并加载合适的嵌入模型(例如本地的 SentenceTransformerEncoder)。
  3. 向量嵌入和存储数据
    • 创建并初始化向量数据库实例(如 QdrantStorage)。
    • 实例化 VectorRetriever,并将其与嵌入模型和向量存储关联。
    • 调用 vr.process(content=...) 方法,将文档读取、分块、嵌入并存储到向量数据库中。
  4. 执行检索
    • 使用 vr.query(query=..., top_k=..., similarity_threshold=...) 方法,根据用户查询从向量数据库中检索最相关的信息片段。
    • top_k:指定返回最相关片段的数量。
    • similarity_threshold:设置相似度阈值,确保检索内容的相关性。
    • 对于不相关查询,检索器在达到相似度阈值时会返回“No suitable information retrieved”等提示。
  5. 结合LLM生成回复
    • 实例化一个LLM模型或Agent(例如 CAMEL 中的 ChatAgent)。
    • 将检索到的信息片段(通常是 results[0]["text"])作为上下文传递给Agent。
    • 通过 Agent 的 step() 方法,LLM结合检索到的上下文生成最终回复。
    • 可以将整个过程封装成一个函数,实现自动检索和生成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
import requests

os.makedirs('local_data', exist_ok=True)

url = "https://arxiv.org/pdf/2303.17760.pdf"
response = requests.get(url)
with open('local_data/camel_paper.pdf', 'wb') as file:
file.write(response.content)

from camel.embeddings import SentenceTransformerEncoder
from camel.retrievers import VectorRetriever

# 初始化一个 SentenceTransformerEncoder 模型,选用 'intfloat/e5-large-v2' 模型,用于将文本转化为向量嵌入。
embedding_model=SentenceTransformerEncoder(model_name='intfloat/e5-large-v2')


# 创建并初始化一个向量数据库 (以QdrantStorage为例)
from camel.storages.vectordb_storages import QdrantStorage

# 创建 QdrantStorage 实例,配置向量维度(由嵌入模型决定)、
# 集合名称("demo_collection")、存储路径("storage_customized_run")和集合别名("论文")。
vector_storage = QdrantStorage(
vector_dim=embedding_model.get_output_dim(),
collection="demo_collection",
path="storage_customized_run",
collection_name="论文"
)
# 初始化VectorRetriever实例并使用本地模型作为嵌入模型
vr = VectorRetriever(embedding_model= embedding_model,storage=vector_storage)
# 将文件读取、切块、嵌入并储存在向量数据库中,这大概需要1-2分钟
vr.process(
content="local_data/camel_paper.pdf"
)

# 设定一个查询语句
query = "CAMEL是什么"

# 执行查询并获取结果
results = vr.query(query=query, top_k=1)
print(results)

print(results[0]["text"])

retrieved_info_irrevelant = vr.query(
query="Compared with dumpling and rice, which should I take for dinner?",
top_k=1,
similarity_threshold=0.8
)

print(retrieved_info_irrevelant)


from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType

from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv('MODELSCOPE_SDK_TOKEN')


retrieved_info = vr.query(
query="what is roleplaying?",
top_k=1,
)

assistant_sys_msg = """
你是一个帮助回答问题的助手,
我会给你原始查询和检索到的上下文,
根据检索到的上下文回答原始查询,
如果你无法回答问题就说我不知道。
"""

model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL,
model_type="Qwen/Qwen2.5-72B-Instruct",
url='https://api-inference.modelscope.cn/v1/',
api_key=api_key
)

# 获取查询“什么是角色扮演”的检索结果的文本内容作为用户消息。
user_msg = retrieved_info[0]['text']
print(user_msg + '\n')

agent = ChatAgent(assistant_sys_msg,model=model)
# 使用step方法获得最终的检索增强生成的回复并打印
assistant_response = agent.step(user_msg)
print(assistant_response.msg.content)


2. Rewriting (查询改写/澄清)

在实际应用中,用户查询可能存在错别字、语义模糊或表达不准确等问题,这会严重影响RAG系统的检索和生成质量。Rewriting 模块旨在优化和提高原始查询的质量。

2.1 优化策略

  • 错字校正:修正用户输入中的拼写错误。
  • 句式调整:重构查询,使其表达更通顺严谨。
  • LLM 自我调整:利用LLM(通过提示词工程)对原始查询进行优化和重写。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
RAG查询重写测试脚本

该脚本用于测试RAG(检索增强生成)系统中的查询重写功能。
主要功能是接收用户的原始查询,通过AI模型对查询进行优化和重写,
修正错别字并提高查询的准确性和表达质量。
"""

# 用户的原始查询(包含错别字和表达不清晰的问题)
# 注意:这里的查询包含了一些错别字,如"盖"应为"该","冲服"应为"冲突"
original_query = "我盖如何解决CAMEL中文档冲服的问题问题呢,几个版本的文档可能存在代码结构的冲突"

# 系统消息:定义AI助手的角色和任务
# 该消息告诉AI它是一个专门用于查询重写的助手
sys_msg = '你是RAG模块中的Rewriting助手,目的是理解用户的提问,并且重新组织和优化用户的提问表达,修正用户输入中可能存在的错别字的情况并重构提问来使得句子表达更加通顺严谨'

# 导入CAMEL框架相关模块
from camel.models import ModelFactory # 模型工厂,用于创建不同类型的模型
from camel.types import ModelPlatformType # 模型平台类型的枚举
from camel.agents import ChatAgent # 聊天代理类

# 导入环境变量和操作系统相关模块
from dotenv import load_dotenv # 用于加载.env文件中的环境变量
import os # 操作系统接口模块

# 加载环境变量文件(.env)
load_dotenv()

# 从环境变量中获取ModelScope的API密钥
# ModelScope是阿里云推出的机器学习模型平台
api_key = os.getenv('MODELSCOPE_SDK_TOKEN')

# 创建AI模型实例
model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL, # 使用OpenAI兼容的模型平台
model_type="Qwen/Qwen2.5-72B-Instruct", # 指定具体的模型类型
url='https://api-inference.modelscope.cn/v1/', # ModelScope API的推理端点
api_key=api_key # API密钥用于身份验证
)

# 创建聊天代理
# 将系统消息和模型绑定到代理上
agent = ChatAgent(system_message=sys_msg, model=model)

# 构造用户消息,要求AI对原始查询进行重写
# 这里使用f-string格式化字符串,将原始查询嵌入到提示中
usr_msg = f'用户的原始提问如下:{original_query},请优化重写并直接输出新的Query。新的Query: '

# 让AI代理处理用户消息并生成响应
response = agent.step(usr_msg)

# 输出AI重写后的查询结果
# response.msgs[0].content 包含了AI生成的重写后的查询
print(response.msgs[0].content)

2.2 其他高级 Rewriting 技巧

  • 子问题策略(Sub-Query):将主问题分解为更具体的子问题,帮助系统深入理解并提高检索准确性。
  • HyDE (Hypothetical Document Embeddings) 查询转换
    • 利用LLM为用户查询生成假设性文档(可能包含错误但与原始查询相关)。
    • 通过这些假设文档的嵌入向量,在RAG知识库中检索具有相似向量的真实文档,从而提高检索精度。

以下代码是基于RAGBasic上构建的HyDE模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""
增强版RAG系统:集成查询重写(Rewriting)和假设文档嵌入(HyDE)

该脚本实现了一个增强的RAG系统,结合了以下技术:
1. 查询重写(Query Rewriting):修正用户查询中的错别字和表达问题
2. HyDE (Hypothetical Document Embeddings):生成假设文档来改善检索效果
3. 传统RAG检索和生成

HyDE方法的核心思想:
- 使用LLM为用户查询生成假设文档
- 这些假设文档虽然可能包含错误,但与知识库中的真实文档相关联
- 通过假设文档的向量表示来检索相似的真实文档,提高检索准确性
"""

import os
import requests
from typing import List, Dict, Any

# 导入CAMEL框架相关模块
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.agents import ChatAgent
from camel.embeddings import SentenceTransformerEncoder
from camel.retrievers import VectorRetriever
from camel.storages.vectordb_storages import QdrantStorage

# 导入环境变量模块
from dotenv import load_dotenv

class EnhancedRAGSystem:
"""
增强版RAG系统类

集成了查询重写、HyDE和传统RAG功能
"""

def __init__(self, api_key: str):
"""
初始化增强版RAG系统

Args:
api_key: ModelScope API密钥
"""
self.api_key = api_key

# 初始化嵌入模型
self.embedding_model = SentenceTransformerEncoder(
model_name='intfloat/e5-large-v2'
)

# 初始化向量存储
self.vector_storage = QdrantStorage(
vector_dim=self.embedding_model.get_output_dim(),
collection="enhanced_rag_collection",
path="storage_enhanced_rag",
collection_name="增强RAG知识库"
)

# 初始化向量检索器
self.vector_retriever = VectorRetriever(
embedding_model=self.embedding_model,
storage=self.vector_storage
)

# 初始化LLM模型
self.model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL,
model_type="Qwen/Qwen2.5-72B-Instruct",
url='https://api-inference.modelscope.cn/v1/',
api_key=self.api_key
)

# 初始化各种代理
self._init_agents()

def _init_agents(self):
"""初始化不同功能的AI代理"""

# 查询重写代理
rewriting_sys_msg = """
你是RAG模块中的Rewriting助手,目的是理解用户的提问,并且重新组织和优化用户的提问表达,
修正用户输入中可能存在的错别字的情况并重构提问来使得句子表达更加通顺严谨。
请直接输出重写后的查询,不要添加额外的解释。
"""
self.rewriting_agent = ChatAgent(
system_message=rewriting_sys_msg,
model=self.model
)

# HyDE假设文档生成代理
hyde_sys_msg = """
你是一个专门生成假设文档的助手。根据用户的查询,生成一个相关的假设文档片段。
这个文档应该:
1. 直接回答用户的问题
2. 包含相关的技术细节和概念
3. 使用专业但清晰的语言
4. 长度适中(200-400字)

请直接输出假设文档内容,不要添加额外的解释或格式。
"""
self.hyde_agent = ChatAgent(
system_message=hyde_sys_msg,
model=self.model
)

# RAG回答生成代理
rag_sys_msg = """
你是一个帮助回答问题的助手。
我会给你原始查询和检索到的上下文信息。
请根据检索到的上下文回答原始查询。
如果上下文信息不足以回答问题,请说"根据提供的信息,我无法完全回答这个问题"。
请确保回答准确、完整且有条理。
"""
self.rag_agent = ChatAgent(
system_message=rag_sys_msg,
model=self.model
)

def setup_knowledge_base(self, pdf_url: str = None, pdf_path: str = None):
"""
设置知识库

Args:
pdf_url: PDF文件的URL
pdf_path: 本地PDF文件路径
"""
# 创建本地数据目录
os.makedirs('local_data', exist_ok=True)

if pdf_url:
# 下载PDF文件
print("正在下载PDF文件...")
response = requests.get(pdf_url)
pdf_path = 'local_data/knowledge_base.pdf'
with open(pdf_path, 'wb') as file:
file.write(response.content)
print(f"PDF文件已下载到: {pdf_path}")

if pdf_path:
# 处理PDF文件并建立向量数据库
print("正在处理PDF文件并建立向量数据库...")
self.vector_retriever.process(content=pdf_path)
print("知识库设置完成!")

def rewrite_query(self, original_query: str) -> str:
"""
重写用户查询

Args:
original_query: 原始用户查询

Returns:
重写后的查询
"""
rewrite_prompt = f"用户的原始提问如下:{original_query}"
response = self.rewriting_agent.step(rewrite_prompt)
rewritten_query = response.msgs[0].content.strip()

print(f"原始查询: {original_query}")
print(f"重写查询: {rewritten_query}")

return rewritten_query

def generate_hypothetical_document(self, query: str) -> str:
"""
生成假设文档 (HyDE方法)

Args:
query: 用户查询

Returns:
生成的假设文档
"""
hyde_prompt = f"请为以下查询生成一个相关的假设文档:{query}"
response = self.hyde_agent.step(hyde_prompt)
hypothetical_doc = response.msgs[0].content.strip()

print(f"生成的假设文档:\n{hypothetical_doc}")

return hypothetical_doc

def retrieve_with_hyde(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
"""
使用HyDE方法进行检索

Args:
query: 用户查询
top_k: 返回的文档数量

Returns:
检索到的文档列表
"""
# 生成假设文档
hypothetical_doc = self.generate_hypothetical_document(query)

# 使用假设文档进行检索
hyde_results = self.vector_retriever.query(
query=hypothetical_doc,
top_k=top_k
)

# 同时使用原始查询进行检索作为对比
original_results = self.vector_retriever.query(
query=query,
top_k=top_k
)

print(f"\n=== HyDE检索结果 ===")
for i, result in enumerate(hyde_results):
print(f"文档 {i+1} (相似度: {result.get('similarity', 'N/A')}):")
print(f"{result['text'][:200]}...\n")

print(f"=== 原始查询检索结果 ===")
for i, result in enumerate(original_results):
print(f"文档 {i+1} (相似度: {result.get('similarity', 'N/A')}):")
print(f"{result['text'][:200]}...\n")

return hyde_results, original_results

def generate_answer(self, query: str, retrieved_docs: List[Dict[str, Any]]) -> str:
"""
基于检索到的文档生成答案

Args:
query: 用户查询
retrieved_docs: 检索到的文档列表

Returns:
生成的答案
"""
# 构建上下文
context = "\n\n".join([doc['text'] for doc in retrieved_docs])

# 构建提示
prompt = f"""
原始查询: {query}

检索到的上下文信息:
{context}

请根据上述上下文信息回答原始查询。
"""

response = self.rag_agent.step(prompt)
answer = response.msgs[0].content.strip()

return answer

def enhanced_query(self, original_query: str, use_rewriting: bool = True,
use_hyde: bool = True, top_k: int = 3) -> Dict[str, Any]:
"""
执行增强版RAG查询

Args:
original_query: 原始用户查询
use_rewriting: 是否使用查询重写
use_hyde: 是否使用HyDE方法
top_k: 检索文档数量

Returns:
包含各步骤结果的字典
"""
print("=" * 60)
print("开始执行增强版RAG查询")
print("=" * 60)

results = {
'original_query': original_query,
'rewritten_query': None,
'hypothetical_document': None,
'retrieved_docs': None,
'final_answer': None
}

# 步骤1: 查询重写(可选)
if use_rewriting:
print("\n步骤1: 查询重写")
print("-" * 30)
rewritten_query = self.rewrite_query(original_query)
results['rewritten_query'] = rewritten_query
query_to_use = rewritten_query
else:
query_to_use = original_query

# 步骤2: 文档检索
print(f"\n步骤2: 文档检索")
print("-" * 30)

if use_hyde:
print("使用HyDE方法进行检索...")
hyde_results, original_results = self.retrieve_with_hyde(query_to_use, top_k)
results['retrieved_docs'] = hyde_results
results['original_retrieval_docs'] = original_results
retrieved_docs = hyde_results
else:
print("使用传统方法进行检索...")
retrieved_docs = self.vector_retriever.query(query=query_to_use, top_k=top_k)
results['retrieved_docs'] = retrieved_docs

# 步骤3: 答案生成
print(f"\n步骤3: 答案生成")
print("-" * 30)
final_answer = self.generate_answer(query_to_use, retrieved_docs)
results['final_answer'] = final_answer

print(f"\n最终答案:\n{final_answer}")

return results


def main():
"""主函数:演示增强版RAG系统的使用"""

# 加载环境变量
load_dotenv()
api_key = os.getenv('MODELSCOPE_SDK_TOKEN')

if not api_key:
print("错误:请设置MODELSCOPE_SDK_TOKEN环境变量")
return

# 初始化增强版RAG系统
print("初始化增强版RAG系统...")
rag_system = EnhancedRAGSystem(api_key)

# 设置知识库(使用CAMEL论文)
pdf_url = "https://arxiv.org/pdf/2303.17760.pdf"
rag_system.setup_knowledge_base(pdf_url=pdf_url)

# 测试查询(包含错别字)
test_queries = [
"我盖如何解决CAMEL中文档冲服的问题问题呢,几个版本的文档可能存在代码结构的冲突",
"CAMEL是什么东东?它有什么特点吗",
"角色扮演在CAMEL中是怎么实现的"
]

for query in test_queries:
print("\n" + "=" * 80)
print(f"测试查询: {query}")
print("=" * 80)

# 执行增强版RAG查询
results = rag_system.enhanced_query(
original_query=query,
use_rewriting=True,
use_hyde=True,
top_k=2
)

print("\n" + "=" * 80)


if __name__ == "__main__":
main()


3. Rerank (重排)

Rerank 模块用于对初步检索结果进行重新排序,以提高检索精确性和相关性,尤其在多路召回(结合向量相似度、关键词匹配、规则匹配等多种检索方式)场景中尤为重要。多路召回可能导致召回片段数量多且相关度参差不齐。

3.1 重排步骤

  1. 初步检索:获取多路召回的初始文档片段。
  2. 特征计算:评估每个文档片段与查询的相关性得分。
  3. 重新排序:根据特征得分对文档片段进行排序。
  4. 选择最佳结果:从重排后的结果中选取前 TOP-K 个作为最终最相关的片段,传递给LLM进行生成。

3.2 评估指标

  • 命中率 (Hit Rate):在前 k 个检索文档中找到正确答案的查询比例,衡量系统在几次尝试中找到正确答案的频率。
  • 平均倒数排名 (MRR - Mean Reciprocal Rank):评估系统准确性,根据排名最高的相关文档的倒数排名计算平均值(第一个相关文档排名为1,倒数排名为1;第二个为1/2,以此类推)。MRR和Hit Rate越高,表示Rerank效果越好。

3.3 RRF (Reciprocal Rank Fusion) 算法实践

  • 原理:一种融合多个不同相关性指标结果集的方法,仅依赖排名计算,不要求相关性分数相互关联。
  • 实现:通过遍历来自不同检索器(例如 VectorRetrieverBM25Retriever)的结果,计算每个文档的融合分数,然后按分数降序排序并返回前 k 个结果。

实现RRF算法的系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
"""
增强版RAG系统:集成查询重写(Rewriting)和假设文档嵌入(HyDE)

该脚本实现了一个增强的RAG系统,结合了以下技术:
1. 查询重写(Query Rewriting):修正用户查询中的错别字和表达问题
2. HyDE (Hypothetical Document Embeddings):生成假设文档来改善检索效果
3. 传统RAG检索和生成

HyDE方法的核心思想:
- 使用LLM为用户查询生成假设文档
- 这些假设文档虽然可能包含错误,但与知识库中的真实文档相关联
- 通过假设文档的向量表示来检索相似的真实文档,提高检索准确性
"""

import os
import requests
from typing import List, Dict, Any

# 导入CAMEL框架相关模块
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.agents import ChatAgent
from camel.embeddings import SentenceTransformerEncoder
from camel.retrievers import VectorRetriever, BM25Retriever
from camel.storages.vectordb_storages import QdrantStorage

# 导入环境变量模块
from dotenv import load_dotenv

class EnhancedRAGSystem:
"""
增强版RAG系统类

集成了查询重写、HyDE、ReRank(RRF算法)和传统RAG功能
"""

def __init__(self, api_key: str):
"""
初始化增强版RAG系统

Args:
api_key: ModelScope API密钥
"""
self.api_key = api_key

# 初始化嵌入模型
self.embedding_model = SentenceTransformerEncoder(
model_name='intfloat/e5-large-v2'
)

# 初始化向量存储
self.vector_storage = QdrantStorage(
vector_dim=self.embedding_model.get_output_dim(),
collection="enhanced_rag_collection",
path="storage_enhanced_rag",
collection_name="增强RAG知识库"
)

# 初始化向量检索器
self.vector_retriever = VectorRetriever(
embedding_model=self.embedding_model,
storage=self.vector_storage
)

# 初始化BM25检索器(用于ReRank)
self.bm25_retriever = BM25Retriever()

# 初始化LLM模型
self.model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI_COMPATIBLE_MODEL,
model_type="Qwen/Qwen2.5-72B-Instruct",
url='https://api-inference.modelscope.cn/v1/',
api_key=self.api_key
)

# 初始化各种代理
self._init_agents()

def rrf(self, vector_results: List[Dict], text_results: List[Dict], k: int = 10, m: int = 60) -> List[tuple]:
"""
使用RRF (Reciprocal Rank Fusion) 算法对两组检索结果进行重排序

RRF算法通过结合不同检索方法的排名来提高检索效果。
公式:RRF_score = Σ(1/(rank + m)),其中m是超参数

Args:
vector_results: 向量召回的结果列表,每个元素是包含'text'的字典
text_results: 文本召回的结果列表,每个元素是包含'text'的字典
k: 排序后返回前k个结果
m: RRF算法的超参数,用于平滑排名分数

Returns:
重排序后的结果列表,每个元素是(文档内容, 融合分数)的元组
"""
doc_scores = {}

# 处理向量检索结果
# 为每个文档分配基于排名的分数:1/(rank + m)
for rank, result in enumerate(vector_results):
text = result['text']
doc_scores[text] = doc_scores.get(text, 0) + 1 / (rank + m)

# 处理文本检索结果
# 累加来自不同检索方法的分数
for rank, result in enumerate(text_results):
text = result['text']
doc_scores[text] = doc_scores.get(text, 0) + 1 / (rank + m)

# 按融合分数降序排序并返回前k个结果
sorted_results = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[:k]

print(f"RRF融合完成,共处理 {len(doc_scores)} 个唯一文档,返回前 {len(sorted_results)} 个结果")

return sorted_results

def _init_agents(self):
"""初始化不同功能的AI代理"""

# 查询重写代理
rewriting_sys_msg = """
你是RAG模块中的Rewriting助手,目的是理解用户的提问,并且重新组织和优化用户的提问表达,
修正用户输入中可能存在的错别字的情况并重构提问来使得句子表达更加通顺严谨。
请直接输出重写后的查询,不要添加额外的解释。
"""
self.rewriting_agent = ChatAgent(
system_message=rewriting_sys_msg,
model=self.model
)

# HyDE假设文档生成代理
hyde_sys_msg = """
你是一个专门生成假设文档的助手。根据用户的查询,生成一个相关的假设文档片段。
这个文档应该:
1. 直接回答用户的问题
2. 包含相关的技术细节和概念
3. 使用专业但清晰的语言
4. 长度适中(200-400字)

请直接输出假设文档内容,不要添加额外的解释或格式。
"""
self.hyde_agent = ChatAgent(
system_message=hyde_sys_msg,
model=self.model
)

# RAG回答生成代理
rag_sys_msg = """
你是一个帮助回答问题的助手。
我会给你原始查询和检索到的上下文信息。
请根据检索到的上下文回答原始查询。
如果上下文信息不足以回答问题,请说"根据提供的信息,我无法完全回答这个问题"。
请确保回答准确、完整且有条理。
"""
self.rag_agent = ChatAgent(
system_message=rag_sys_msg,
model=self.model
)

def setup_knowledge_base(self, pdf_url: str = None, pdf_path: str = None):
"""
设置知识库,同时初始化向量检索器和BM25检索器

Args:
pdf_url: PDF文件的URL
pdf_path: 本地PDF文件路径
"""
# 创建本地数据目录
os.makedirs('local_data', exist_ok=True)

if pdf_url:
# 下载PDF文件
print("正在下载PDF文件...")
response = requests.get(pdf_url)
pdf_path = 'local_data/knowledge_base.pdf'
with open(pdf_path, 'wb') as file:
file.write(response.content)
print(f"PDF文件已下载到: {pdf_path}")

if pdf_path:
# 处理PDF文件并建立向量数据库
print("正在处理PDF文件并建立向量数据库...")
self.vector_retriever.process(content=pdf_path)

# 同时为BM25检索器处理文档
print("正在为BM25检索器处理文档...")
self.bm25_retriever.process(content_input_path=pdf_path)

print("知识库设置完成!(向量检索器 + BM25检索器)")

def rewrite_query(self, original_query: str) -> str:
"""
重写用户查询

Args:
original_query: 原始用户查询

Returns:
重写后的查询
"""
rewrite_prompt = f"用户的原始提问如下:{original_query}"
response = self.rewriting_agent.step(rewrite_prompt)
rewritten_query = response.msgs[0].content.strip()

print(f"原始查询: {original_query}")
print(f"重写查询: {rewritten_query}")

return rewritten_query

def generate_hypothetical_document(self, query: str) -> str:
"""
生成假设文档 (HyDE方法)

Args:
query: 用户查询

Returns:
生成的假设文档
"""
hyde_prompt = f"请为以下查询生成一个相关的假设文档:{query}"
response = self.hyde_agent.step(hyde_prompt)
hypothetical_doc = response.msgs[0].content.strip()

print(f"生成的假设文档:\n{hypothetical_doc}")

return hypothetical_doc

def retrieve_with_rrf(self, query: str, top_k: int = 10, rrf_k: int = 5, m: int = 60) -> List[Dict[str, Any]]:
"""
使用RRF算法结合向量检索和BM25检索的结果

Args:
query: 用户查询
top_k: 每个检索器返回的文档数量
rrf_k: RRF融合后返回的文档数量
m: RRF算法的超参数

Returns:
RRF融合后的检索结果列表
"""
print(f"使用RRF算法进行混合检索...")

# 向量检索
print("执行向量检索...")
vector_results = self.vector_retriever.query(query=query, top_k=top_k)

# BM25检索
print("执行BM25检索...")
bm25_results = self.bm25_retriever.query(query=query, top_k=top_k)

# 使用RRF算法融合结果
print("使用RRF算法融合检索结果...")
rrf_results = self.rrf(vector_results, bm25_results, k=rrf_k, m=m)

# 转换RRF结果格式以保持与其他检索方法的一致性
formatted_results = []
for text, score in rrf_results:
formatted_results.append({
'text': text,
'rrf_score': score,
'source': 'RRF_fusion'
})

print(f"RRF检索完成,返回 {len(formatted_results)} 个文档")
for i, result in enumerate(formatted_results):
print(f"文档 {i+1} (RRF分数: {result['rrf_score']:.4f}):")
print(f"{result['text'][:200]}...\n")

return formatted_results

def retrieve_with_hyde_and_rrf(self, query: str, top_k: int = 10, rrf_k: int = 5, m: int = 60) -> List[Dict[str, Any]]:
"""
结合HyDE和RRF的高级检索方法

Args:
query: 用户查询
top_k: 每个检索器返回的文档数量
rrf_k: RRF融合后返回的文档数量
m: RRF算法的超参数

Returns:
HyDE+RRF融合后的检索结果列表
"""
print(f"使用HyDE+RRF组合方法进行检索...")

# 生成假设文档
hypothetical_doc = self.generate_hypothetical_document(query)

# 使用假设文档进行向量检索
print("使用假设文档进行向量检索...")
hyde_vector_results = self.vector_retriever.query(query=hypothetical_doc, top_k=top_k)

# 使用原始查询进行BM25检索
print("使用原始查询进行BM25检索...")
bm25_results = self.bm25_retriever.query(query=query, top_k=top_k)

# 使用RRF算法融合HyDE向量检索和BM25检索的结果
print("使用RRF算法融合HyDE和BM25检索结果...")
rrf_results = self.rrf(hyde_vector_results, bm25_results, k=rrf_k, m=m)

# 转换结果格式
formatted_results = []
for text, score in rrf_results:
formatted_results.append({
'text': text,
'rrf_score': score,
'source': 'HyDE_RRF_fusion'
})

print(f"HyDE+RRF检索完成,返回 {len(formatted_results)} 个文档")
for i, result in enumerate(formatted_results):
print(f"文档 {i+1} (RRF分数: {result['rrf_score']:.4f}):")
print(f"{result['text'][:200]}...\n")

return formatted_results

def retrieve_with_hyde(self, query: str, top_k: int = 3) -> List[Dict[str, Any]]:
"""
使用HyDE方法进行检索

Args:
query: 用户查询
top_k: 返回的文档数量

Returns:
检索到的文档列表
"""
# 生成假设文档
hypothetical_doc = self.generate_hypothetical_document(query)

# 使用假设文档进行检索
hyde_results = self.vector_retriever.query(
query=hypothetical_doc,
top_k=top_k
)

# 同时使用原始查询进行检索作为对比
original_results = self.vector_retriever.query(
query=query,
top_k=top_k
)

print(f"\n=== HyDE检索结果 ===")
for i, result in enumerate(hyde_results):
print(f"文档 {i+1} (相似度: {result.get('similarity', 'N/A')}):")
print(f"{result['text'][:200]}...\n")

print(f"=== 原始查询检索结果 ===")
for i, result in enumerate(original_results):
print(f"文档 {i+1} (相似度: {result.get('similarity', 'N/A')}):")
print(f"{result['text'][:200]}...\n")

return hyde_results, original_results

def generate_answer(self, query: str, retrieved_docs: List[Dict[str, Any]]) -> str:
"""
基于检索到的文档生成答案

Args:
query: 用户查询
retrieved_docs: 检索到的文档列表

Returns:
生成的答案
"""
# 构建上下文
context = "\n\n".join([doc['text'] for doc in retrieved_docs])

# 构建提示
prompt = f"""
原始查询: {query}

检索到的上下文信息:
{context}

请根据上述上下文信息回答原始查询。
"""

response = self.rag_agent.step(prompt)
answer = response.msgs[0].content.strip()

return answer

def enhanced_query(self, original_query: str, use_rewriting: bool = True,
use_hyde: bool = True, use_rrf: bool = True, top_k: int = 10, rrf_k: int = 5) -> Dict[str, Any]:
"""
执行增强版RAG查询

Args:
original_query: 原始用户查询
use_rewriting: 是否使用查询重写
use_hyde: 是否使用HyDE方法
use_rrf: 是否使用RRF重排序算法
top_k: 每个检索器返回的文档数量
rrf_k: RRF融合后返回的文档数量

Returns:
包含各步骤结果的字典
"""
print("=" * 60)
print("开始执行增强版RAG查询")
print("=" * 60)

results = {
'original_query': original_query,
'rewritten_query': None,
'hypothetical_document': None,
'retrieved_docs': None,
'final_answer': None
}

# 步骤1: 查询重写(可选)
if use_rewriting:
print("\n步骤1: 查询重写")
print("-" * 30)
rewritten_query = self.rewrite_query(original_query)
results['rewritten_query'] = rewritten_query
query_to_use = rewritten_query
else:
query_to_use = original_query

# 步骤2: 文档检索
print(f"\n步骤2: 文档检索")
print("-" * 30)

if use_rrf and use_hyde:
print("使用HyDE+RRF组合方法进行检索...")
retrieved_docs = self.retrieve_with_hyde_and_rrf(query_to_use, top_k=top_k, rrf_k=rrf_k)
results['retrieved_docs'] = retrieved_docs
results['retrieval_method'] = 'HyDE+RRF'
elif use_rrf:
print("使用RRF方法进行检索...")
retrieved_docs = self.retrieve_with_rrf(query_to_use, top_k=top_k, rrf_k=rrf_k)
results['retrieved_docs'] = retrieved_docs
results['retrieval_method'] = 'RRF'
elif use_hyde:
print("使用HyDE方法进行检索...")
hyde_results, original_results = self.retrieve_with_hyde(query_to_use, top_k)
results['retrieved_docs'] = hyde_results
results['original_retrieval_docs'] = original_results
results['retrieval_method'] = 'HyDE'
retrieved_docs = hyde_results
else:
print("使用传统向量检索方法...")
retrieved_docs = self.vector_retriever.query(query=query_to_use, top_k=top_k)
results['retrieved_docs'] = retrieved_docs
results['retrieval_method'] = 'Traditional'

# 步骤3: 答案生成
print(f"\n步骤3: 答案生成")
print("-" * 30)
final_answer = self.generate_answer(query_to_use, retrieved_docs)
results['final_answer'] = final_answer

print(f"\n最终答案:\n{final_answer}")

return results


def main():
"""主函数:演示增强版RAG系统的使用"""

# 加载环境变量
load_dotenv()
api_key = os.getenv('MODELSCOPE_SDK_TOKEN')

if not api_key:
print("错误:请设置MODELSCOPE_SDK_TOKEN环境变量")
return

# 初始化增强版RAG系统
print("初始化增强版RAG系统...")
rag_system = EnhancedRAGSystem(api_key)

# 设置知识库(使用CAMEL论文)
pdf_url = "https://arxiv.org/pdf/2303.17760.pdf"
rag_system.setup_knowledge_base(pdf_url=pdf_url)

# 测试查询(包含错别字)
test_queries = [
"我盖如何解决CAMEL中文档冲服的问题问题呢,几个版本的文档可能存在代码结构的冲突",
"CAMEL是什么东东?它有什么特点吗",
"角色扮演在CAMEL中是怎么实现的"
]

# 测试不同的检索方法组合
test_methods = [
{"name": "传统RAG", "rewriting": False, "hyde": False, "rrf": False},
{"name": "查询重写+传统RAG", "rewriting": True, "hyde": False, "rrf": False},
{"name": "HyDE增强RAG", "rewriting": False, "hyde": True, "rrf": False},
{"name": "RRF重排序RAG", "rewriting": False, "hyde": False, "rrf": True},
{"name": "完整增强RAG (Rewriting+HyDE+RRF)", "rewriting": True, "hyde": True, "rrf": True}
]

for i, query in enumerate(test_queries):
print("\n" + "=" * 100)
print(f"测试查询 {i+1}: {query}")
print("=" * 100)

# 只对第一个查询测试所有方法,其他查询使用完整增强方法
methods_to_test = test_methods if i == 0 else [test_methods[-1]]

for method in methods_to_test:
print(f"\n{'='*20} {method['name']} {'='*20}")

# 执行增强版RAG查询
results = rag_system.enhanced_query(
original_query=query,
use_rewriting=method['rewriting'],
use_hyde=method['hyde'],
use_rrf=method['rrf'],
top_k=10,
rrf_k=3
)

print(f"检索方法: {results.get('retrieval_method', 'Unknown')}")
print("-" * 50)

print("\n" + "=" * 100)


if __name__ == "__main__":
main()


通过结合 Basic RAG,以及RewritingRerank等优化模块,我们可以构建出更强大、更准确的RAG应用,从而显著提升LLM在特定知识领域的表现。

以下是对你提供的内容的系统性中文整理总结,涵盖了向量检索模块的评估与优化流程,包括 TF-IDF 与嵌入模型对比、评估指标定义、检索流程实现、优化方向等内容,适合撰写文档或教学材料使用。


向量检索模块评估与优化

一、背景说明

在构建基于向量检索(Vector Retrieval, VR)的智能问答系统时,评估检索质量成为关键一步。本实践以 CAMEL AI 框架为基础,通过标准问题及预期答案的方式,对检索器返回结果进行自动化评估,并尝试不同方法进行优化。


二、测试样例定义

我们通过一组测试查询(test_queries)及预期标准答案(expected_answers)来建立基本评估集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
test_queries = [
{
"query": "什么是CAMEL AI?",
"expected_answers": ["CAMEL AI 是一个开源的、社区驱动的AI框架。"]
},
{
"query": "如何开始使用CAMEL AI?",
"expected_answers": ["首先安装框架:`pip install camel-ai`,然后引入必要的模块。"]
},
{
"query": "CAMEL AI 的主要特点是什么?",
"expected_answers": ["模块化设计、易用性和扩展性。"]
}
]

三、评估指标设计

系统定义了以下核心评估指标:

指标名 含义
Precision(精确率) 正确返回结果 / 实际返回结果
Recall(召回率) 正确返回结果 / 所有应返回的正确结果
F1 分数 精确率和召回率的调和平均
平均相似度 所有预期-实际配对之间的相似度均值

其中,相似度计算方法有两种实现方式:


四、相似度计算方法

方法1:TF-IDF + 余弦相似度(初始方法)

1
2
3
4
5
def compute_similarity(expected, retrieved):
vectorizer = TfidfVectorizer()
tfidf = vectorizer.fit_transform([expected, retrieved])
similarity_matrix = cosine_similarity(tfidf, tfidf)
return similarity_matrix[0, 1]

方法2:Embedding + 余弦相似度(改进方案)

1
2
3
def compute_similarity(expected, retrieved):
embeddings = embedding_model.embed_list([expected, retrieved])
return cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]

优点:Embedding 模型具备更强的语义理解能力,能处理“车辆≈汽车”这类同义关系。


五、整体评估流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def evaluate_retrieval(query, expected_answers, threshold=0.5, top_k=1):
results = vr.query(query=query, top_k=top_k)
retrieved_texts = [result["text"] for result in results]

precision = calculate_precision(retrieved_texts, expected_answers, threshold)
recall = calculate_recall(retrieved_texts, expected_answers, threshold)
f1 = calculate_f1(precision, recall)

similarities = [
compute_similarity(expected, retrieved)
for expected, retrieved in zip(expected_answers, retrieved_texts)
]
avg_similarity = np.mean(similarities) if similarities else 0

return {
"precision": precision,
"recall": recall,
"f1": f1,
"avg_similarity": avg_similarity,
"retrieved_texts": retrieved_texts
}

六、优化路径探索

1. 使用 Embedding 替代 TF-IDF

  • 提升语义对齐效果
  • 平均相似度显著提高(如从 0.36 提升至 0.91

2. 调整文档切片粒度(chunking)

1
2
3
4
5
vr.process(
content="example_document.md",
max_characters=100, # 更小的切片单元
should_chunk=True,
)
  • 优化检索粒度,避免一次性返回长段内容
  • 实现更加针对性的文本匹配与评分

3. 模型与参数优化方向

  • 嵌入模型:尝试不同的 encoder(如 BGE、MiniLM)
  • 相似度度量:替换余弦相似度为欧几里得/点积等
  • 检索策略:TF-IDF + Embedding 混合策略

七、生成模块评估建议(BLEU / ROUGE)

1
2
3
4
5
6
7
8
9
10
11
12
13
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu
from nltk.tokenize import word_tokenize

reference = "RAG combines retrieval and generation for QA."
generated = "RAG integrates retrieval and generation for question answering."

scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
scores = scorer.score(reference, generated)

reference_tokens = word_tokenize(reference)
generated_tokens = word_tokenize(generated)
bleu_score = sentence_bleu([reference_tokens], generated_tokens)

输出示例:

1
2
3
ROUGE-1: 0.86
ROUGE-L: 0.82
BLEU Score: 0.71

八、结论

本评估流程展示了向量检索系统从初步搭建 → 精度评估 → 嵌入优化 → 文档切片的迭代式优化路径,并为后续生成式问答系统提供了评估参考。

后续可探索方向包括:

  • 检索器多阶段混合策略(关键词初筛 + 嵌入复检)
  • 基于用户反馈的自动优化(在线学习)
  • 微调专用 Embedding 模型以适配垂类语义

当然可以!下面是针对你这个 Graph RAG 实战项目的详细任务分解(spec → task → controller)设计,适用于你当前使用的 CAMEL 框架 + Qwen2.5 模型 + Firecrawl 网页抓取 + Neo4j 图数据库 + 混合 RAG 推理的场景:


Graph RAG 实战系统任务分解


Spec(规范说明)

构建一个以知识图谱驱动的 Graph RAG 系统,实现从网页抓取 → 实体关系抽取 → 知识图谱构建 → 混合检索 → 回答生成的闭环流程,核心目标为提升复杂问题回答的准确性与可解释性

  • 用户输入自然语言问题
  • 系统自动进行相关网页抓取与处理
  • 抽取结构化实体关系,写入 Neo4j 图谱
  • 使用图谱结构 + 向量语义检索综合选取回答依据
  • 调用 Qwen2.5 模型生成最终响应

Task(模块拆解)

  1. 网页抓取模块(web_crawler)
    • 使用 Firecrawl API 抓取用户问题相关的网页
    • 输出结构化网页内容(标题、正文、链接等)
  2. 信息抽取模块(entity_relation_extractor)
    • 对抓取的网页内容进行实体识别与关系抽取(NER + RE)
    • 输出三元组(subject, relation, object)
  3. 图谱构建模块(graph_builder)
    • 将三元组写入 Neo4j 数据库(防止重复写入)
    • 实现节点合并、关系合并策略
  4. 混合检索模块(hybrid_retriever)
    • 图谱检索:基于 Cypher 查询获取相关实体路径
    • 向量检索:基于 CAMEL 的向量模块进行语义召回
    • 融合策略:Rank/Fuse/Rerank 等融合两类召回
  5. 问答生成模块(graph_rag_qa)
    • 将图谱结果 + 文档结果合并后作为上下文
    • 使用 Qwen2.5 模型生成回答
    • 支持来源追踪(source attribution)
  6. LLM Agent 调度模块(camel_controller)
    • 封装整个流程为 CAMEL Agent 的工具调用链
    • 根据用户输入智能调度抓取 → 构图 → QA 全链路

Controller(模块控制逻辑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# controller.py 示例(简化伪码)

class GraphRAGController:
def __init__(self):
self.web_crawler = FirecrawlClient()
self.extractor = EntityRelationExtractor()
self.graph_db = Neo4jClient()
self.vector_search = VectorRetriever()
self.qa_model = QwenRAG()

def process_user_query(self, user_input):
# Step 1: 网页抓取
docs = self.web_crawler.search_and_parse(user_input)

# Step 2: 实体关系抽取
triples = self.extractor.extract_triples(docs)

# Step 3: 构建知识图谱
self.graph_db.add_triples(triples)

# Step 4: 混合检索
graph_context = self.graph_db.query_relevant(user_input)
vector_context = self.vector_search.query(user_input)
context = self.merge_context(graph_context, vector_context)

# Step 5: RAG 问答生成
answer = self.qa_model.generate_answer(user_input, context)

return answer

def merge_context(self, graph_ctx, vector_ctx):
# 可定制融合策略,例如先rank再合并
return graph_ctx + vector_ctx[:2]

其它

  • 引入 RAG-FusionColBERT-RAG 提高融合性能
  • 使用 LangGraph/Flowise 等构建 DAG 状态机调度
  • 多跳推理能力强化:通过图谱路径进行 reasoning chain 构建
  • 可视化图谱展示:Neo4j Bloom / D3.js 前端图谱可视化

其它实践部分都在同章节的练习部分,请跳转Agent-camel框架练习(4)查看更为详细的实践步骤。