0%

RAG 2.0——检索增强生成技术新进展

最近研究RAG技术,发现它就像图书馆的智能管理员,不仅能帮你找到需要的书籍,还能理解你真正想要什么,甚至在找不到完全匹配时给出最佳近似答案…

介绍

  检索增强生成(Retrieval-Augmented Generation,RAG)技术正在经历重大升级,RAG 2.0在原有基础上加入了更智能的语义理解、动态知识库管理、以及多模态信息处理能力。本文将深入探讨RAG 2.0的核心技术、架构设计和实际应用。

RAG 2.0核心架构

检索组件增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# RAG 2.0检索增强实现
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModel
import torch
import asyncio
from typing import List, Dict, Optional, Tuple
import hashlib
from dataclasses import dataclass
from enum import Enum

class SearchStrategy(Enum):
SEMANTIC = "semantic"
HYBRID = "hybrid"
KEYWORD = "keyword"
MULTI_MODAL = "multi_modal"

@dataclass
class DocumentChunk:
"""文档块数据结构"""
id: str
content: str
embedding: Optional[np.ndarray] = None
metadata: Dict[str, any] = None
score: float = 0.0

class SemanticSearchEngine:
"""语义搜索引擎"""
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.model_name = model_name
self.encoder = SentenceTransformer(model_name)
self.dimension = self.encoder.get_sentence_embedding_dimension()

# FAISS索引
self.index = faiss.IndexFlatIP(self.dimension) # 内积相似度
self.doc_chunks = {} # 存储文档块信息

# 文本清理器
self.text_cleaner = TextCleaner()

def encode_texts(self, texts: List[str]) -> np.ndarray:
"""编码文本为向量"""
embeddings = self.encoder.encode(texts, convert_to_numpy=True, normalize_embeddings=True)
return embeddings.astype('float32')

def add_documents(self, chunks: List[DocumentChunk]):
"""添加文档块到索引"""
texts = [chunk.content for chunk in chunks]
embeddings = self.encode_texts(texts)

# 添加到FAISS索引
self.index.add(embeddings)

# 存储文档块信息
for i, chunk in enumerate(chunks):
chunk.embedding = embeddings[i]
self.doc_chunks[len(self.doc_chunks)] = chunk

def search(self, query: str, top_k: int = 10) -> List[DocumentChunk]:
"""语义搜索"""
query_embedding = self.encode_texts([query])[0]

# 使用FAISS进行高效相似度搜索
scores, indices = self.index.search(np.array([query_embedding]), top_k)

results = []
for score, idx in zip(scores[0], indices[0]):
if idx != -1 and idx in self.doc_chunks:
chunk = self.doc_chunks[idx]
chunk.score = float(score)
results.append(chunk)

return results

def batch_search(self, queries: List[str], top_k: int = 10) -> List[List[DocumentChunk]]:
"""批量搜索"""
query_embeddings = self.encode_texts(queries)
scores, indices = self.index.search(query_embeddings, top_k)

results_batch = []
for i, (query_scores, query_indices) in enumerate(zip(scores, indices)):
results = []
for score, idx in zip(query_scores, query_indices):
if idx != -1 and idx in self.doc_chunks:
chunk = self.doc_chunks[idx]
chunk.score = float(score)
results.append(chunk)
results_batch.append(results)

return results_batch

class HybridSearchEngine:
"""混合搜索引擎"""
def __init__(self, semantic_engine: SemanticSearchEngine, alpha: float = 0.7):
self.semantic_engine = semantic_engine
self.alpha = alpha # 语义搜索权重
self.keyword_engine = KeywordSearchEngine()

def search(self, query: str, top_k: int = 10) -> List[DocumentChunk]:
"""混合搜索:结合语义和关键词"""
# 语义搜索结果
semantic_results = self.semantic_engine.search(query, top_k * 2)

# 关键词搜索结果
keyword_results = self.keyword_engine.search(query, top_k * 2)

# 结果融合
combined_results = self.combine_search_results(
semantic_results, keyword_results, self.alpha
)

return combined_results[:top_k]

def combine_search_results(
self,
semantic_results: List[DocumentChunk],
keyword_results: List[DocumentChunk],
alpha: float
) -> List[DocumentChunk]:
"""融合搜索结果"""
# 创建文档ID到得分的映射
semantic_scores = {chunk.id: chunk.score for chunk in semantic_results}
keyword_scores = {chunk.id: chunk.score for chunk in keyword_results}

# 合并得分(使用加权平均)
all_ids = set(semantic_scores.keys()) | set(keyword_scores.keys())
combined_chunks = []

for doc_id in all_ids:
sem_score = semantic_scores.get(doc_id, 0.0)
kw_score = keyword_scores.get(doc_id, 0.0)

# 归一化并加权合并
normalized_sem = sem_score / (max(semantic_scores.values()) if semantic_scores else 1.0)
normalized_kw = kw_score / (max(keyword_scores.values()) if keyword_scores else 1.0)

combined_score = alpha * normalized_sem + (1 - alpha) * normalized_kw

# 创建新的文档块(如果不存在的话)
if doc_id in self.semantic_engine.doc_chunks:
chunk = self.semantic_engine.doc_chunks[doc_id]
else:
# 这是一个关键词搜索的结果,创建新的块
chunk = DocumentChunk(id=doc_id, content="", score=combined_score)

chunk.score = combined_score
combined_chunks.append(chunk)

# 按分数排序
combined_chunks.sort(key=lambda x: x.score, reverse=True)
return combined_chunks

class KeywordSearchEngine:
"""关键词搜索引擎"""
def __init__(self):
self.inverted_index = {}
self.doc_lengths = {}
self.avg_doc_length = 0
self.vocab = set()

def add_documents(self, chunks: List[DocumentChunk]):
"""添加文档到倒排索引"""
total_length = 0

for chunk in chunks:
doc_id = chunk.id
tokens = self.tokenize(chunk.content)

# 计算文档长度
self.doc_lengths[doc_id] = len(tokens)
total_length += len(tokens)

# 构建倒排索引
term_freq = {}
for token in tokens:
self.vocab.add(token)
term_freq[token] = term_freq.get(token, 0) + 1

for term, freq in term_freq.items():
if term not in self.inverted_index:
self.inverted_index[term] = {}
self.inverted_index[term][doc_id] = freq

# 计算平均文档长度
if len(self.doc_lengths) > 0:
self.avg_doc_length = total_length / len(self.doc_lengths)

def search(self, query: str, top_k: int = 10) -> List[DocumentChunk]:
"""关键词搜索(使用BM25算法)"""
query_tokens = self.tokenize(query)
doc_scores = {}

k1 = 1.5 # BM25参数
b = 0.75 # BM25参数

for term in query_tokens:
if term in self.inverted_index:
# 计算IDF
n_qi = len(self.inverted_index[term])
idf = np.log((len(self.doc_lengths) - n_qi + 0.5) / (n_qi + 0.5) + 1)

for doc_id, tf in self.inverted_index[term].items():
# 计算BM25分数
doc_len = self.doc_lengths.get(doc_id, 0)
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_len / self.avg_doc_length)
score = idf * (numerator / denominator)

doc_scores[doc_id] = doc_scores.get(doc_id, 0) + score

# 转换为文档块并排序
results = []
for doc_id, score in sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]:
chunk = DocumentChunk(id=doc_id, content="", score=score)
results.append(chunk)

return results

def tokenize(self, text: str) -> List[str]:
"""简单的标记化"""
import re
# 移除标点符号并转换为小写
tokens = re.findall(r'\b\w+\b', text.lower())
return tokens

class TextCleaner:
"""文本清洗器"""
def __init__(self):
self.stop_words = set([
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could',
'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those'
])

def clean(self, text: str) -> str:
"""清洗文本"""
import re

# 移除特殊字符,保留中文、英文、数字和基本标点
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:]', ' ', text)

# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text).strip()

return text

向量化技术升级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# RAG 2.0向量化增强
class AdvancedVectorizer:
"""高级向量化器"""
def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2"):
self.primary_encoder = SentenceTransformer(model_name)
self.secondary_encoders = {}
self.ensemble_weights = {}

# 初始化多语言支持
self.setup_multilingual_models()

def setup_multilingual_models(self):
"""设置多语言模型"""
# 多语言模型
multilingual_models = [
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
"sentence-transformers/distiluse-base-multilingual-cased-v2"
]

for model_name in multilingual_models:
try:
encoder = SentenceTransformer(model_name)
lang_code = model_name.split('-')[-1][:2] # 简单提取语言代码
self.secondary_encoders[lang_code] = encoder
self.ensemble_weights[lang_code] = 0.3
except:
continue

# 主模型权重
self.ensemble_weights['primary'] = 0.4

def encode_with_ensemble(self, texts: List[str]) -> np.ndarray:
"""使用模型集成进行编码"""
primary_embeddings = self.primary_encoder.encode(texts, normalize_embeddings=True)

if len(self.secondary_encoders) == 0:
return primary_embeddings.astype('float32')

# 获取所有编码器的结果
all_embeddings = [primary_embeddings * self.ensemble_weights['primary']]

for lang_code, encoder in self.secondary_encoders.items():
if lang_code in self.ensemble_weights:
lang_embeddings = encoder.encode(texts, normalize_embeddings=True)
weighted_embeddings = lang_embeddings * self.ensemble_weights[lang_code]
all_embeddings.append(weighted_embeddings)

# 平均融合
final_embeddings = np.mean(all_embeddings, axis=0)
return final_embeddings.astype('float32')

def detect_language(self, text: str) -> str:
"""检测文本语言"""
import langdetect
try:
detected_lang = langdetect.detect(text)
return detected_lang[:2] # 返回语言代码的前两个字符
except:
return 'en'

class ContextualVectorizer:
"""上下文向量化器"""
def __init__(self, model_name: str = "bert-base-uncased"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
self.max_length = 512

def encode_with_context(self, texts: List[str], context: Optional[str] = None) -> np.ndarray:
"""使用上下文进行编码"""
all_embeddings = []

for text in texts:
# 组合文本和上下文
input_text = f"{context} [SEP] {text}" if context else text

inputs = self.tokenizer(
input_text,
max_length=self.max_length,
padding=True,
truncation=True,
return_tensors="pt"
)

with torch.no_grad():
outputs = self.model(**inputs)
# 使用[CLS]标记的嵌入作为句子表示
embeddings = outputs.last_hidden_state[:, 0, :].numpy()
all_embeddings.extend(embeddings)

return np.vstack(all_embeddings).astype('float32')

class SparseDenseHybridEncoder:
"""稀疏密集混合编码器"""
def __init__(self):
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer

self.sparse_encoder = TfidfVectorizer(max_features=10000, ngram_range=(1, 2))
self.dense_encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.fit_complete = False

def fit(self, texts: List[str]):
"""训练稀疏编码器"""
self.sparse_encoder.fit(texts)
self.fit_complete = True

def encode(self, texts: List[str]) -> Tuple[np.ndarray, np.ndarray]:
"""编码为稀疏和密集向量"""
if not self.fit_complete:
self.fit(texts)

sparse_vectors = self.sparse_encoder.transform(texts).toarray().astype('float32')
dense_vectors = self.dense_encoder.encode(texts, normalize_embeddings=True).astype('float32')

return sparse_vectors, dense_vectors

知识库管理系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# RAG 2.0知识库管理
import sqlite3
import pickle
from datetime import datetime
import hashlib

class KnowledgeBase:
"""知识库"""
def __init__(self, db_path: str = "rag_knowledge.db"):
self.db_path = db_path
self.init_database()

def init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

# 创建文档表
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
title TEXT,
source TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata_json TEXT
)
''')

# 创建向量表
cursor.execute('''
CREATE TABLE IF NOT EXISTS vectors (
doc_id TEXT,
chunk_index INTEGER,
embedding BLOB,
FOREIGN KEY (doc_id) REFERENCES documents (id)
)
''')

# 创建标签表
cursor.execute('''
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
description TEXT
)
''')

# 创建文档标签关联表
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_tags (
doc_id TEXT,
tag_id INTEGER,
FOREIGN KEY (doc_id) REFERENCES documents (id),
FOREIGN KEY (tag_id) REFERENCES tags (id)
)
''')

conn.commit()
conn.close()

def add_document(self, content: str, title: str = "", source: str = "",
metadata: dict = None, tags: List[str] = None) -> str:
"""添加文档"""
doc_id = hashlib.md5(content.encode()).hexdigest()

conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

try:
# 插入文档
cursor.execute('''
INSERT OR REPLACE INTO documents
(id, content, title, source, metadata_json)
VALUES (?, ?, ?, ?, ?)
''', (doc_id, content, title, source, str(metadata or {})))

# 处理标签
if tags:
for tag_name in tags:
# 获取或创建标签
cursor.execute('INSERT OR IGNORE INTO tags (name) VALUES (?)', (tag_name,))
cursor.execute('SELECT id FROM tags WHERE name = ?', (tag_name,))
tag_id = cursor.fetchone()[0]

# 关联文档和标签
cursor.execute('''
INSERT OR IGNORE INTO document_tags (doc_id, tag_id) VALUES (?, ?)
''', (doc_id, tag_id))

conn.commit()
return doc_id
finally:
conn.close()

def get_document(self, doc_id: str) -> Optional[Dict]:
"""获取文档"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

cursor.execute('''
SELECT id, content, title, source, created_at, updated_at, metadata_json
FROM documents WHERE id = ?
''', (doc_id,))

row = cursor.fetchone()
if row:
return {
'id': row[0],
'content': row[1],
'title': row[2],
'source': row[3],
'created_at': row[4],
'updated_at': row[5],
'metadata': eval(row[6]) if row[6] else {}
}

conn.close()
return None

def search_by_tags(self, tags: List[str], limit: int = 10) -> List[Dict]:
"""按标签搜索"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()

tag_placeholders = ','.join(['?' for _ in tags])
cursor.execute(f'''
SELECT DISTINCT d.*
FROM documents d
JOIN document_tags dt ON d.id = dt.doc_id
JOIN tags t ON dt.tag_id = t.id
WHERE t.name IN ({tag_placeholders})
LIMIT ?
''', (*tags, limit))

rows = cursor.fetchall()
documents = []
for row in rows:
documents.append({
'id': row[0],
'content': row[1],
'title': row[2],
'source': row[3],
'created_at': row[4],
'updated_at': row[5],
'metadata': eval(row[6]) if row[6] else {}
})

conn.close()
return documents

class DocumentChunker:
"""文档分块器"""
def __init__(self, max_chunk_size: int = 512, overlap: int = 50):
self.max_chunk_size = max_chunk_size
self.overlap = overlap

def chunk_document(self, content: str, doc_id: str = None) -> List[DocumentChunk]:
"""将文档分块"""
import re

# 按句子分割
sentences = re.split(r'[.!?。!?]+', content)

chunks = []
current_chunk = ""
chunk_index = 0

for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue

# 检查添加当前句子是否会超过最大长度
if len(current_chunk) + len(sentence) > self.max_chunk_size:
# 保存当前块
if current_chunk.strip():
chunk = DocumentChunk(
id=f"{doc_id}_{chunk_index}" if doc_id else f"chunk_{chunk_index}",
content=current_chunk.strip()
)
chunks.append(chunk)
chunk_index += 1

# 开始新块,包含重叠部分
if self.overlap > 0 and chunks:
prev_chunk = chunks[-1].content
overlap_text = prev_chunk[-self.overlap:]
current_chunk = overlap_text + " " + sentence
else:
current_chunk = sentence
else:
current_chunk += " " + sentence

# 添加最后一个块
if current_chunk.strip():
chunk = DocumentChunk(
id=f"{doc_id}_{chunk_index}" if doc_id else f"chunk_{chunk_index}",
content=current_chunk.strip()
)
chunks.append(chunk)

return chunks

class KnowledgeBaseManager:
"""知识库管理器"""
def __init__(self, knowledge_base: KnowledgeBase, vectorizer: AdvancedVectorizer):
self.kb = knowledge_base
self.vectorizer = vectorizer
self.search_engine = SemanticSearchEngine()

def index_document(self, content: str, title: str = "", source: str = "",
metadata: dict = None, tags: List[str] = None):
"""索引文档"""
# 添加到知识库
doc_id = self.kb.add_document(content, title, source, metadata, tags)

# 分块
chunker = DocumentChunker()
chunks = chunker.chunk_document(content, doc_id)

# 编码向量
texts = [chunk.content for chunk in chunks]
embeddings = self.vectorizer.encode_with_ensemble(texts)

# 更新搜索索引
for i, chunk in enumerate(chunks):
chunk.embedding = embeddings[i]

self.search_engine.add_documents(chunks)

return doc_id

def update_document(self, doc_id: str, new_content: str):
"""更新文档"""
# 获取旧文档信息
old_doc = self.kb.get_document(doc_id)
if not old_doc:
raise ValueError(f"Document {doc_id} not found")

# 用新内容替换
self.kb.add_document(
new_content,
old_doc.get('title', ''),
old_doc.get('source', ''),
old_doc.get('metadata')
)

# 重建索引(简单做法:删除并重新添加)
self.rebuild_document_index(doc_id, new_content)

def rebuild_document_index(self, doc_id: str, content: str):
"""重建文档索引"""
# 删除旧的索引项(这里简化处理,实际上需要从FAISS索引中删除)
# 重新分块和索引
self.index_document(content)

RAG 2.0生成组件

智能生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# RAG 2.0生成组件
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import torch

class RAGGenerator:
"""RAG生成器"""
def __init__(self, model_name: str = "gpt2"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)

# 如果tokenizer没有pad_token,则设置为eos_token
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token

self.generator = pipeline(
'text-generation',
model=self.model,
tokenizer=self.tokenizer,
device=0 if torch.cuda.is_available() else -1
)

def generate_with_context(self, query: str, retrieved_docs: List[DocumentChunk],
max_length: int = 200, temperature: float = 0.7) -> str:
"""基于检索上下文生成回答"""
# 构建提示词
context_prompt = self.build_context_prompt(query, retrieved_docs)

# 生成回答
generated = self.generator(
context_prompt,
max_length=max_length,
num_return_sequences=1,
temperature=temperature,
pad_token_id=self.tokenizer.eos_token_id
)

# 提取生成的文本(去除提示部分)
full_text = generated[0]['generated_text']
answer = full_text[len(context_prompt):].strip()

return answer

def build_context_prompt(self, query: str, retrieved_docs: List[DocumentChunk]) -> str:
"""构建上下文提示词"""
prompt_parts = [
"根据以下文档内容回答问题:",
"",
]

# 添加检索到的文档
for i, doc in enumerate(retrieved_docs[:3]): # 只使用前3个最高分的文档
prompt_parts.append(f"文档 {i+1}: {doc.content}")
prompt_parts.append("")

prompt_parts.append(f"问题: {query}")
prompt_parts.append("回答:")

return "\n".join(prompt_parts)

class AdvancedRAGGenerator(RAGGenerator):
"""高级RAG生成器"""
def __init__(self, model_name: str = "facebook/opt-350m"):
super().__init__(model_name)

# 信心评分器
self.confidence_scorer = ConfidenceScorer()

# 引用追踪器
self.citation_tracker = CitationTracker()

def generate_with_confidence(self, query: str, retrieved_docs: List[DocumentChunk],
max_length: int = 200, temperature: float = 0.7) -> Dict[str, any]:
"""生成回答并返回信心评分"""
context_prompt = self.build_context_prompt(query, retrieved_docs)

generated = self.generator(
context_prompt,
max_length=max_length,
num_return_sequences=1,
temperature=temperature,
pad_token_id=self.tokenizer.eos_token_id
)

full_text = generated[0]['generated_text']
answer = full_text[len(context_prompt):].strip()

# 计算信心评分
confidence = self.confidence_scorer.score_answer(answer, retrieved_docs, query)

# 生成引用
citations = self.citation_tracker.track_citations(answer, retrieved_docs)

return {
'answer': answer,
'confidence': confidence,
'citations': citations,
'retrieved_docs_used': [doc.id for doc in retrieved_docs[:3]]
}

class ConfidenceScorer:
"""信心评分器"""
def score_answer(self, answer: str, retrieved_docs: List[DocumentChunk], query: str) -> float:
"""计算回答的信心评分"""
# 简单的信心评分算法
# 1. 检查答案与查询的相关性
query_terms = set(query.lower().split())
answer_terms = set(answer.lower().split())

relevance_score = len(query_terms.intersection(answer_terms)) / max(len(query_terms), 1)

# 2. 检查答案与文档的支持度
doc_content = " ".join([doc.content for doc in retrieved_docs]).lower()
answer_sentences = answer.split('.')

support_score = 0
for sentence in answer_sentences:
sentence_lower = sentence.lower()
if any(term in doc_content for term in sentence_lower.split()[:5]): # 检查前5个词
support_score += 1

support_score = support_score / max(len(answer_sentences), 1)

# 3. 综合评分
confidence = (relevance_score * 0.4 + support_score * 0.6)
return min(confidence, 1.0) # 限制在0-1范围内

class CitationTracker:
"""引用追踪器"""
def track_citations(self, answer: str, retrieved_docs: List[DocumentChunk]) -> List[Dict[str, any]]:
"""追踪引用来源"""
citations = []

for doc in retrieved_docs:
# 检查答案中是否包含文档的关键信息
doc_sentences = doc.content.split('.')
answer_lower = answer.lower()

matching_sentences = []
for sent in doc_sentences:
if len(sent.strip()) > 10: # 避免太短的句子
sent_lower = sent.lower()
if sent_lower in answer_lower:
matching_sentences.append(sent.strip())

if matching_sentences:
citations.append({
'document_id': doc.id,
'matched_content': matching_sentences,
'confidence': len(matching_sentences) / len(doc_sentences)
})

return citations

RAG 2.0集成系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# 完整的RAG 2.0系统
class RAG2System:
"""RAG 2.0系统"""
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
# 向量化器
self.vectorizer = AdvancedVectorizer(model_name)

# 知识库
self.knowledge_base = KnowledgeBase()
self.kb_manager = KnowledgeBaseManager(self.knowledge_base, self.vectorizer)

# 搜索引擎
self.search_engine = HybridSearchEngine(
SemanticSearchEngine(model_name),
alpha=0.6
)

# 生成器
self.generator = AdvancedRAGGenerator("gpt2")

# 性能监控
self.performance_monitor = RAGPerformanceMonitor()

def add_document(self, content: str, title: str = "", source: str = "",
metadata: dict = None, tags: List[str] = None) -> str:
"""添加文档到系统"""
start_time = time.time()

# 索引文档
doc_id = self.kb_manager.index_document(content, title, source, metadata, tags)

# 记录性能
end_time = time.time()
self.performance_monitor.record_operation('add_document', end_time - start_time)

return doc_id

def query(self, query_text: str, top_k: int = 5, max_length: int = 200) -> Dict[str, any]:
"""查询系统"""
start_time = time.time()

# 检索相关文档
retrieved_docs = self.search_engine.search(query_text, top_k)

if not retrieved_docs:
return {
'answer': "未找到相关文档",
'confidence': 0.0,
'citations': [],
'query_time': time.time() - start_time
}

# 生成回答
generation_result = self.generator.generate_with_confidence(
query_text, retrieved_docs, max_length
)

end_time = time.time()

# 记录性能
self.performance_monitor.record_operation('query', end_time - start_time)
self.performance_monitor.record_documents_retrieved(len(retrieved_docs))

return {
**generation_result,
'query_time': end_time - start_time,
'retrieved_count': len(retrieved_docs)
}

def batch_query(self, queries: List[str], top_k: int = 5) -> List[Dict[str, any]]:
"""批量查询"""
results = []

for query in queries:
result = self.query(query, top_k)
results.append(result)

return results

def get_statistics(self) -> Dict[str, any]:
"""获取系统统计信息"""
return self.performance_monitor.get_statistics()

class RAGPerformanceMonitor:
"""RAG性能监控器"""
def __init__(self):
self.operation_times = {'query': [], 'add_document': []}
self.documents_retrieved = []
self.query_count = 0

def record_operation(self, op_type: str, duration: float):
"""记录操作时间"""
if op_type in self.operation_times:
self.operation_times[op_type].append(duration)

# 只保留最近1000个记录
if len(self.operation_times[op_type]) > 1000:
self.operation_times[op_type] = self.operation_times[op_type][-1000:]

def record_documents_retrieved(self, count: int):
"""记录检索文档数量"""
self.documents_retrieved.append(count)
self.query_count += 1

# 只保留最近1000个记录
if len(self.documents_retrieved) > 1000:
self.documents_retrieved = self.documents_retrieved[-1000:]

def get_statistics(self) -> Dict[str, any]:
"""获取统计信息"""
stats = {}

for op_type, times in self.operation_times.items():
if times:
stats[f'{op_type}_avg_time'] = sum(times) / len(times)
stats[f'{op_type}_min_time'] = min(times)
stats[f'{op_type}_max_time'] = max(times)

if self.documents_retrieved:
stats['avg_documents_retrieved'] = sum(self.documents_retrieved) / len(self.documents_retrieved)
stats['total_queries'] = self.query_count

return stats

实际应用案例

企业知识管理系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# 企业知识管理RAG应用
class EnterpriseKnowledgeManager:
"""企业知识管理器"""
def __init__(self, rag_system: RAG2System):
self.rag = rag_system
self.departments = set()
self.document_types = set()

def add_department_document(self, content: str, department: str,
doc_type: str, title: str = "") -> str:
"""添加部门文档"""
# 添加部门和文档类型
self.departments.add(department)
self.document_types.add(doc_type)

# 创建标签
tags = [department, doc_type, "enterprise", "internal"]

# 添加到RAG系统
return self.rag.add_document(
content=content,
title=title,
source=f"department:{department}",
metadata={
"department": department,
"document_type": doc_type,
"added_at": datetime.now().isoformat()
},
tags=tags
)

def search_by_department(self, query: str, department: str, top_k: int = 5) -> Dict[str, any]:
"""按部门搜索"""
# 先按标签过滤
relevant_docs = self.rag.knowledge_base.search_by_tags([department], top_k * 2)

# 构建上下文进行RAG查询
if relevant_docs:
content_snippets = [doc['content'][:500] for doc in relevant_docs] # 限制长度
context = " ".join(content_snippets)
else:
context = ""

# 如果有上下文,使用上下文增强查询
if context:
enhanced_query = f"关于{department}部门: {query}\n相关背景: {context}"
else:
enhanced_query = query

return self.rag.query(enhanced_query, top_k)

def get_department_insights(self, department: str) -> Dict[str, any]:
"""获取部门洞察"""
# 获取该部门的文档统计
docs = self.rag.knowledge_base.search_by_tags([department], limit=100)

insights = {
'department': department,
'total_documents': len(docs),
'common_topics': [],
'recent_activity': [],
'most_referenced_docs': []
}

# 分析文档内容
all_content = " ".join([doc['content'] for doc in docs])

# 提取关键词(简单实现)
from collections import Counter
import re

words = re.findall(r'\b\w+\b', all_content.lower())
word_freq = Counter(words)

# 获取最常见的词汇
common_words = word_freq.most_common(10)
insights['common_topics'] = [word for word, freq in common_words if len(word) > 3]

# 最近活动(基于文档更新时间)
recent_docs = sorted(docs, key=lambda x: x.get('updated_at', ''), reverse=True)[:5]
insights['recent_activity'] = [doc['title'] for doc in recent_docs if doc.get('title')]

return insights

# 客服知识库应用
class CustomerServiceKB:
"""客服知识库"""
def __init__(self, rag_system: RAG2System):
self.rag = rag_system
self.ticket_categories = set()

def add_kb_article(self, title: str, content: str, category: str,
tags: List[str] = None) -> str:
"""添加知识库文章"""
self.ticket_categories.add(category)

all_tags = [category, "kb_article", "customer_service"]
if tags:
all_tags.extend(tags)

return self.rag.add_document(
content=content,
title=title,
source="knowledge_base",
metadata={
"category": category,
"type": "kb_article",
"last_updated": datetime.now().isoformat()
},
tags=all_tags
)

def handle_customer_query(self, query: str, customer_context: Dict[str, any] = None) -> Dict[str, any]:
"""处理客户查询"""
# 分析查询类型
query_analysis = self.analyze_query_type(query)

# 构建增强查询
if customer_context:
enhanced_query = f"""
客户问题: {query}
客户信息: {customer_context}
相关分类: {query_analysis['categories']}
"""
else:
enhanced_query = f"客户问题: {query}. 相关分类: {query_analysis['categories']}"

# 查询知识库
result = self.rag.query(enhanced_query, top_k=3)

# 格式化响应
response = {
'original_query': query,
'understood_intent': query_analysis['intent'],
'recommended_solution': result['answer'],
'confidence': result['confidence'],
'relevant_articles': result.get('citations', []),
'query_time': result.get('query_time', 0)
}

return response

def analyze_query_type(self, query: str) -> Dict[str, any]:
"""分析查询类型"""
query_lower = query.lower()

# 分类匹配
billing_keywords = ['bill', 'payment', 'charge', 'refund', 'invoice', 'cost']
technical_keywords = ['error', 'bug', 'not working', 'install', 'setup', 'config']
account_keywords = ['account', 'login', 'password', 'profile', 'settings', 'user']
feature_keywords = ['feature', 'how to', 'tutorial', 'guide', 'use', 'enable']

categories = []
intent = "general"

if any(word in query_lower for word in billing_keywords):
categories.append("billing")
intent = "billing_issue"
if any(word in query_lower for word in technical_keywords):
categories.append("technical")
intent = "technical_issue"
if any(word in query_lower for word in account_keywords):
categories.append("account")
intent = "account_issue"
if any(word in query_lower for word in feature_keywords):
categories.append("feature")
intent = "feature_question"

if not categories:
categories.append("general")

return {
'categories': categories,
'intent': intent,
'extracted_keywords': query_lower.split()[:10] # 前10个关键词
}

性能优化与扩展

向量索引优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# RAG 2.0索引优化
import faiss
from typing import Optional

class OptimizedVectorIndex:
"""优化的向量索引"""
def __init__(self, dimension: int, index_type: str = "IVF"):
self.dimension = dimension
self.index_type = index_type

# 创建优化的索引
if index_type == "IVF":
# IVF索引(倒排文件索引)
quantizer = faiss.IndexFlatIP(dimension) # 内积量化器
self.index = faiss.IndexIVFFlat(quantizer, dimension, min(1000, max(1, int(pow(len(self), 0.5)))))
elif index_type == "HNSW":
# HNSW索引(高效近似最近邻搜索)
self.index = faiss.IndexHNSWFlat(dimension, 32) # 32个连接数
elif index_type == "LSH":
# LSH索引(局部敏感哈希)
self.index = faiss.IndexLSH(dimension, 128) # 128位哈希
else:
# 默认使用精确搜索
self.index = faiss.IndexFlatIP(dimension)

# 设置GPU支持(如果可用)
self.use_gpu = hasattr(faiss, 'StandardGpuResources')
if self.use_gpu:
self.res = faiss.StandardGpuResources()
self.gpu_index = faiss.index_cpu_to_gpu(self.res, 0, self.index)
self.active_index = self.gpu_index
else:
self.active_index = self.index

def add_vectors(self, vectors: np.ndarray, ids: Optional[np.ndarray] = None):
"""添加向量到索引"""
# 如果是IVF索引,需要先训练
if self.index_type == "IVF" and not self.active_index.is_trained:
self.active_index.train(vectors)

self.active_index.add_with_ids(vectors, ids) if ids is not None else self.active_index.add(vectors)

def search(self, query_vector: np.ndarray, k: int = 10) -> Tuple[np.ndarray, np.ndarray]:
"""搜索最近邻"""
return self.active_index.search(np.array([query_vector]), k)

class CacheEnhancedRAG:
"""缓存增强的RAG系统"""
def __init__(self, rag_system: RAG2System, cache_size: int = 1000):
self.rag_system = rag_system
self.cache_size = cache_size
self.query_cache = {}
self.cache_order = [] # 用于LRU淘汰
self.stats = {'hits': 0, 'misses': 0}

def query(self, query_text: str, top_k: int = 5, max_length: int = 200) -> Dict[str, any]:
"""带缓存的查询"""
# 检查缓存
cache_key = self.hash_query(query_text)

if cache_key in self.query_cache:
self.stats['hits'] += 1
result = self.query_cache[cache_key]

# 更新LRU顺序
self.cache_order.remove(cache_key)
self.cache_order.append(cache_key)

return result
else:
self.stats['misses'] += 1
result = self.rag_system.query(query_text, top_k, max_length)

# 添加到缓存
self.add_to_cache(cache_key, result)

return result

def hash_query(self, query: str) -> str:
"""生成查询的哈希键"""
return hashlib.md5(query.encode()).hexdigest()

def add_to_cache(self, key: str, value: Dict[str, any]):
"""添加到缓存"""
if len(self.query_cache) >= self.cache_size:
# LRU淘汰
oldest_key = self.cache_order.pop(0)
del self.query_cache[oldest_key]

self.query_cache[key] = value
self.cache_order.append(key)

def get_cache_stats(self) -> Dict[str, any]:
"""获取缓存统计"""
total = self.stats['hits'] + self.stats['misses']
hit_rate = self.stats['hits'] / total if total > 0 else 0

return {
'hit_rate': hit_rate,
'cache_size': len(self.query_cache),
'total_queries': total,
'cache_capacity': self.cache_size
}

总结

  • RAG 2.0在语义搜索精度上大幅提升
  • 混合搜索策略整合了多种检索方法的优势
  • 知识库管理提供了更细粒度的内容组织
  • 上下文感知生成提高了回答质量
  • 多语言支持扩展了应用范围
  • 性能优化确保了大规模应用可行性
  • 信心评分增强了输出可靠性

RAG 2.0就像一个超级智能的图书馆,不仅拥有海量的知识资源,还能精准理解用户的真实需求,并以最优的方式整合和呈现信息。

未来发展方向

  1. 多模态RAG: 支持图像、音频等多种媒体类型
  2. 实时知识更新: 动态学习和更新知识库
  3. 推理链增强: 复杂逻辑推理能力
  4. 隐私保护: 联邦学习和差分隐私技术
  5. 可解释性: 提供决策过程的透明解释

扩展阅读

  1. RAG Papers
  2. Vector Database Technologies
  3. Semantic Search Techniques
bulb