Build production-ready RAG systems using vector databases, embedding optimization, and advanced retrieval patterns for intelligent applications.
RAG systems have revolutionized how we build AI applications that need access to specific knowledge. After building RAG systems processing millions of documents, I've learned that success requires more than just connecting an LLM to a vector database. Here's your comprehensive guide to building production-grade RAG systems.
RAG Architecture Fundamentals
Complete RAG System Design
# rag_system.py
from typing import List, Dict, Optional, Any
import numpy as np
from dataclasses import dataclass
import asyncio
from enum import Enum
class VectorDBProvider(Enum):
PINECONE = "pinecone"
WEAVIATE = "weaviate"
QDRANT = "qdrant"
MILVUS = "milvus"
CHROMA = "chroma"
FAISS = "faiss"
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
embedding: Optional[np.ndarray] = None
chunks: Optional[List['DocumentChunk']] = None
@dataclass
class DocumentChunk:
id: str
document_id: str
content: str
embedding: np.ndarray
metadata: Dict[str, Any]
position: int
class RAGSystem:
def __init__(self,
vector_db: VectorDBProvider,
embedding_model: str = "text-embedding-ada-002",
llm_model: str = "gpt-4",
chunk_size: int = 512,
chunk_overlap: int = 50):
self.vector_db = self._initialize_vector_db(vector_db)
self.embedding_model = self._initialize_embedding_model(embedding_model)
self.llm_model = llm_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize components
self.document_processor = DocumentProcessor(chunk_size, chunk_overlap)
self.retriever = HybridRetriever(self.vector_db)
self.reranker = Reranker()
self.generator = ResponseGenerator(llm_model)
async def index_documents(self, documents: List[Document]) -> Dict:
"""Index documents into vector database"""
indexed_count = 0
chunk_count = 0
errors = []
for doc in documents:
try:
# Process document into chunks
chunks = self.document_processor.process(doc)
# Generate embeddings
for chunk in chunks:
chunk.embedding = await self._generate_embedding(chunk.content)
# Store in vector database
await self.vector_db.upsert(chunks)
indexed_count += 1
chunk_count += len(chunks)
except Exception as e:
errors.append({
'document_id': doc.id,
'error': str(e)
})
return {
'indexed_documents': indexed_count,
'total_chunks': chunk_count,
'errors': errors
}
async def query(self,
query: str,
top_k: int = 10,
filters: Optional[Dict] = None,
rerank: bool = True) -> Dict:
"""Query RAG system"""
# Generate query embedding
query_embedding = await self._generate_embedding(query)
# Retrieve relevant chunks
retrieved_chunks = await self.retriever.retrieve(
query_embedding,
query, # For hybrid search
top_k=top_k * 2 if rerank else top_k,
filters=filters
)
# Rerank if enabled
if rerank:
retrieved_chunks = await self.reranker.rerank(
query,
retrieved_chunks,
top_k=top_k
)
# Generate response
response = await self.generator.generate(
query,
retrieved_chunks,
self.llm_model
)
return {
'response': response['text'],
'sources': [chunk.metadata for chunk in retrieved_chunks],
'confidence': response.get('confidence', 0.0)
}
async def _generate_embedding(self, text: str) -> np.ndarray:
"""Generate text embedding"""
# In production, batch these requests
embedding = await self.embedding_model.encode(text)
return np.array(embedding)
class DocumentProcessor:
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def process(self, document: Document) -> List[DocumentChunk]:
"""Process document into chunks"""
chunks = []
text = document.content
# Smart chunking with sentence boundaries
sentences = self._split_into_sentences(text)
current_chunk = []
current_size = 0
position = 0
for sentence in sentences:
sentence_size = len(sentence.split())
if current_size + sentence_size > self.chunk_size and current_chunk:
# Create chunk
chunk_text = ' '.join(current_chunk)
chunk = DocumentChunk(
id=f"{document.id}_chunk_{position}",
document_id=document.id,
content=chunk_text,
embedding=None, # Will be generated later
metadata={
**document.metadata,
'chunk_position': position,
'chunk_size': len(chunk_text)
},
position=position
)
chunks.append(chunk)
# Overlap handling
overlap_sentences = current_chunk[-self.chunk_overlap:]
current_chunk = overlap_sentences
current_size = sum(len(s.split()) for s in overlap_sentences)
position += 1
current_chunk.append(sentence)
current_size += sentence_size
# Add remaining chunk
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunk = DocumentChunk(
id=f"{document.id}_chunk_{position}",
document_id=document.id,
content=chunk_text,
embedding=None,
metadata={
**document.metadata,
'chunk_position': position,
'chunk_size': len(chunk_text)
},
position=position
)
chunks.append(chunk)
return chunks
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences"""
# Simple implementation - use NLTK or spaCy in production
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
Vector Database Implementation
# vector_database.py
import numpy as np
from typing import List, Dict, Optional, Tuple
import faiss
import pickle
from abc import ABC, abstractmethod
class VectorDatabase(ABC):
@abstractmethod
async def upsert(self, chunks: List[DocumentChunk]):
pass
@abstractmethod
async def search(self, embedding: np.ndarray, top_k: int) -> List[DocumentChunk]:
pass
@abstractmethod
async def delete(self, ids: List[str]):
pass
class FAISSVectorDB(VectorDatabase):
def __init__(self, dimension: int = 1536, index_type: str = "IVF"):
self.dimension = dimension
self.index_type = index_type
# Initialize FAISS index
if index_type == "IVF":
quantizer = faiss.IndexFlatL2(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, 100)
elif index_type == "HNSW":
self.index = faiss.IndexHNSWFlat(dimension, 32)
else:
self.index = faiss.IndexFlatL2(dimension)
# Metadata storage
self.metadata = {}
self.id_to_index = {}
self.index_to_id = {}
self.current_index = 0
async def upsert(self, chunks: List[DocumentChunk]):
"""Insert or update chunks"""
embeddings = []
for chunk in chunks:
# Store metadata
self.metadata[chunk.id] = chunk
# Map IDs to indices
if chunk.id not in self.id_to_index:
self.id_to_index[chunk.id] = self.current_index
self.index_to_id[self.current_index] = chunk.id
self.current_index += 1
embeddings.append(chunk.embedding)
# Add to FAISS index
embeddings_array = np.array(embeddings).astype('float32')
if self.index_type == "IVF" and not self.index.is_trained:
self.index.train(embeddings_array)
self.index.add(embeddings_array)
async def search(self,
embedding: np.ndarray,
top_k: int = 10,
filters: Optional[Dict] = None) -> List[Tuple[DocumentChunk, float]]:
"""Search for similar vectors"""
# Search in FAISS
query_embedding = embedding.reshape(1, -1).astype('float32')
distances, indices = self.index.search(query_embedding, top_k)
results = []
for idx, distance in zip(indices[0], distances[0]):
if idx == -1:
continue
chunk_id = self.index_to_id.get(idx)
if chunk_id and chunk_id in self.metadata:
chunk = self.metadata[chunk_id]
# Apply filters
if filters:
if not self._match_filters(chunk, filters):
continue
results.append((chunk, float(distance)))
return results
def _match_filters(self, chunk: DocumentChunk, filters: Dict) -> bool:
"""Check if chunk matches filters"""
for key, value in filters.items():
if key not in chunk.metadata:
return False
if isinstance(value, list):
if chunk.metadata[key] not in value:
return False
else:
if chunk.metadata[key] != value:
return False
return True
async def delete(self, ids: List[str]):
"""Delete chunks by ID"""
for chunk_id in ids:
if chunk_id in self.metadata:
del self.metadata[chunk_id]
if chunk_id in self.id_to_index:
index = self.id_to_index[chunk_id]
del self.id_to_index[chunk_id]
del self.index_to_id[index]
def save(self, path: str):
"""Save index to disk"""
faiss.write_index(self.index, f"{path}.index")
with open(f"{path}.metadata", 'wb') as f:
pickle.dump({
'metadata': self.metadata,
'id_to_index': self.id_to_index,
'index_to_id': self.index_to_id,
'current_index': self.current_index
}, f)
def load(self, path: str):
"""Load index from disk"""
self.index = faiss.read_index(f"{path}.index")
with open(f"{path}.metadata", 'rb') as f:
data = pickle.load(f)
self.metadata = data['metadata']
self.id_to_index = data['id_to_index']
self.index_to_id = data['index_to_id']
self.current_index = data['current_index']
Advanced Retrieval Techniques
Hybrid Search Implementation
# hybrid_retrieval.py
from typing import List, Tuple, Optional
import numpy as np
from rank_bm25 import BM25Okapi
class HybridRetriever:
def __init__(self,
vector_db: VectorDatabase,
alpha: float = 0.7): # Weight for vector search
self.vector_db = vector_db
self.alpha = alpha
self.bm25_index = None
self.documents = []
def build_bm25_index(self, documents: List[str]):
"""Build BM25 index for keyword search"""
# Tokenize documents
tokenized_docs = [doc.lower().split() for doc in documents]
# Build BM25 index
self.bm25_index = BM25Okapi(tokenized_docs)
self.documents = documents
async def retrieve(self,
query_embedding: np.ndarray,
query_text: str,
top_k: int = 10,
filters: Optional[Dict] = None) -> List[DocumentChunk]:
"""Hybrid retrieval combining vector and keyword search"""
# Vector search
vector_results = await self.vector_db.search(
query_embedding,
top_k=top_k * 2,
filters=filters
)
# BM25 keyword search
if self.bm25_index:
query_tokens = query_text.lower().split()
bm25_scores = self.bm25_index.get_scores(query_tokens)
# Get top BM25 results
top_indices = np.argsort(bm25_scores)[-top_k * 2:][::-1]
bm25_results = [(self.documents[i], bm25_scores[i])
for i in top_indices]
else:
bm25_results = []
# Combine and rerank
combined_results = self._combine_results(
vector_results,
bm25_results,
self.alpha
)
# Sort by combined score
combined_results.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in combined_results[:top_k]]
def _combine_results(self,
vector_results: List[Tuple[DocumentChunk, float]],
bm25_results: List[Tuple[str, float]],
alpha: float) -> List[Tuple[DocumentChunk, float]]:
"""Combine vector and BM25 results"""
combined = {}
# Normalize vector scores
if vector_results:
max_vector_score = max(score for _, score in vector_results)
for chunk, score in vector_results:
normalized_score = score / max_vector_score if max_vector_score > 0 else 0
combined[chunk.id] = (chunk, alpha * normalized_score)
# Normalize and add BM25 scores
if bm25_results:
max_bm25_score = max(score for _, score in bm25_results)
for doc_text, score in bm25_results:
normalized_score = score / max_bm25_score if max_bm25_score > 0 else 0
# Find corresponding chunk
for chunk_id, (chunk, vec_score) in combined.items():
if doc_text in chunk.content:
combined[chunk_id] = (
chunk,
vec_score + (1 - alpha) * normalized_score
)
return list(combined.values())
class Reranker:
def __init__(self, model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model)
async def rerank(self,
query: str,
chunks: List[DocumentChunk],
top_k: int = 5) -> List[DocumentChunk]:
"""Rerank chunks using cross-encoder"""
if not chunks:
return []
# Prepare pairs
pairs = [(query, chunk.content) for chunk in chunks]
# Get scores
scores = self.model.predict(pairs)
# Sort by score
chunk_scores = list(zip(chunks, scores))
chunk_scores.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in chunk_scores[:top_k]]
Contextual Compression
# contextual_compression.py
class ContextualCompressor:
def __init__(self, llm_model: str = "gpt-3.5-turbo"):
self.llm_model = llm_model
async def compress_documents(self,
query: str,
documents: List[DocumentChunk],
max_tokens: int = 2000) -> str:
"""Compress documents to relevant information"""
compressed_chunks = []
for doc in documents:
# Extract relevant sentences
relevant = await self._extract_relevant(query, doc.content)
if relevant:
compressed_chunks.append(relevant)
# Combine compressed chunks
combined = self._combine_chunks(compressed_chunks, max_tokens)
return combined
async def _extract_relevant(self, query: str, content: str) -> str:
"""Extract query-relevant information from content"""
prompt = f"""
Query: {query}
Document: {content}
Extract only the information from the document that is directly relevant to answering the query.
If nothing is relevant, return "NOT_RELEVANT".
"""
# Call LLM for extraction
response = await self._call_llm(prompt)
if response != "NOT_RELEVANT":
return response
return ""
def _combine_chunks(self, chunks: List[str], max_tokens: int) -> str:
"""Combine chunks within token limit"""
combined = []
current_tokens = 0
for chunk in chunks:
chunk_tokens = len(chunk.split())
if current_tokens + chunk_tokens <= max_tokens:
combined.append(chunk)
current_tokens += chunk_tokens
else:
break
return "\n\n".join(combined)
Query Enhancement
Query Expansion and Optimization
# query_enhancement.py
class QueryEnhancer:
def __init__(self, llm_model: str = "gpt-3.5-turbo"):
self.llm_model = llm_model
self.query_cache = {}
async def enhance_query(self, query: str) -> Dict:
"""Enhance query for better retrieval"""
# Check cache
if query in self.query_cache:
return self.query_cache[query]
enhanced = {
'original': query,
'expanded': await self._expand_query(query),
'decomposed': await self._decompose_query(query),
'hypothetical_answer': await self._generate_hypothetical_answer(query),
'keywords': self._extract_keywords(query)
}
self.query_cache[query] = enhanced
return enhanced
async def _expand_query(self, query: str) -> str:
"""Expand query with synonyms and related terms"""
prompt = f"""
Expand the following query with related terms and synonyms that would help find relevant information:
Query: {query}
Expanded query (include original + related terms):
"""
return await self._call_llm(prompt)
async def _decompose_query(self, query: str) -> List[str]:
"""Decompose complex query into sub-queries"""
prompt = f"""
Break down this complex query into simpler sub-queries:
Query: {query}
Sub-queries (one per line):
"""
response = await self._call_llm(prompt)
return [q.strip() for q in response.split('\n') if q.strip()]
async def _generate_hypothetical_answer(self, query: str) -> str:
"""Generate hypothetical answer for HyDE technique"""
prompt = f"""
Generate a hypothetical but realistic answer to this question:
Question: {query}
Hypothetical answer:
"""
return await self._call_llm(prompt)
def _extract_keywords(self, query: str) -> List[str]:
"""Extract key terms from query"""
# Simple implementation - use NLP library in production
import re
# Remove common words
stopwords = {'the', 'is', 'at', 'which', 'on', 'a', 'an', 'and', 'or', 'but'}
words = re.findall(r'\w+', query.lower())
keywords = [w for w in words if w not in stopwords and len(w) > 2]
return keywords
Response Generation
Advanced Response Generation
# response_generation.py
class ResponseGenerator:
def __init__(self, llm_model: str = "gpt-4"):
self.llm_model = llm_model
self.citation_formatter = CitationFormatter()
async def generate(self,
query: str,
context_chunks: List[DocumentChunk],
model: str = None) -> Dict:
"""Generate response with citations"""
# Format context
context = self._format_context(context_chunks)
# Generate response
prompt = self._build_prompt(query, context)
response = await self._call_llm(prompt, model or self.llm_model)
# Post-process response
processed = self._post_process(response, context_chunks)
# Add citations
with_citations = self.citation_formatter.add_citations(
processed,
context_chunks
)
# Calculate confidence
confidence = self._calculate_confidence(response, context_chunks)
return {
'text': with_citations,
'confidence': confidence,
'chunks_used': len(context_chunks)
}
def _format_context(self, chunks: List[DocumentChunk]) -> str:
"""Format context chunks for prompt"""
formatted = []
for i, chunk in enumerate(chunks):
formatted.append(f"[{i+1}] {chunk.content}")
return "\n\n".join(formatted)
def _build_prompt(self, query: str, context: str) -> str:
"""Build generation prompt"""
return f"""Answer the following question based on the provided context.
If the answer cannot be found in the context, say so.
Include reference numbers [1], [2], etc. for your sources.
Context:
{context}
Question: {query}
Answer:"""
def _calculate_confidence(self,
response: str,
chunks: List[DocumentChunk]) -> float:
"""Calculate response confidence score"""
confidence = 1.0
# Reduce confidence if response indicates uncertainty
uncertainty_phrases = [
"i'm not sure",
"it's unclear",
"cannot be determined",
"no information",
"not found in context"
]
response_lower = response.lower()
for phrase in uncertainty_phrases:
if phrase in response_lower:
confidence *= 0.5
# Increase confidence based on number of citations
import re
citations = re.findall(r'\[\d+\]', response)
confidence = min(1.0, confidence + len(citations) * 0.1)
return confidence
class CitationFormatter:
def add_citations(self,
text: str,
chunks: List[DocumentChunk]) -> str:
"""Add proper citations to response"""
# Map citation numbers to sources
citation_map = {}
for i, chunk in enumerate(chunks):
citation_num = i + 1
source = chunk.metadata.get('source', 'Unknown')
page = chunk.metadata.get('page', '')
citation_map[citation_num] = {
'source': source,
'page': page
}
# Add citation list at end
if citation_map:
citations_text = "\n\nSources:\n"
for num, info in citation_map.items():
citations_text += f"[{num}] {info['source']}"
if info['page']:
citations_text += f", page {info['page']}"
citations_text += "\n"
text += citations_text
return text
Evaluation and Optimization
RAG Evaluation Framework
# rag_evaluation.py
class RAGEvaluator:
def __init__(self):
self.metrics = {
'retrieval': RetrievalMetrics(),
'generation': GenerationMetrics(),
'end_to_end': EndToEndMetrics()
}
async def evaluate(self,
rag_system: RAGSystem,
test_dataset: List[Dict]) -> Dict:
"""Evaluate RAG system performance"""
results = {
'retrieval_metrics': {},
'generation_metrics': {},
'end_to_end_metrics': {},
'detailed_results': []
}
for test_case in test_dataset:
query = test_case['query']
expected_answer = test_case.get('answer')
relevant_docs = test_case.get('relevant_docs', [])
# Get RAG response
response = await rag_system.query(query)
# Evaluate retrieval
if relevant_docs:
retrieval_score = self.metrics['retrieval'].evaluate(
response['sources'],
relevant_docs
)
else:
retrieval_score = None
# Evaluate generation
if expected_answer:
generation_score = await self.metrics['generation'].evaluate(
response['response'],
expected_answer,
query
)
else:
generation_score = None
# Store results
results['detailed_results'].append({
'query': query,
'response': response['response'],
'retrieval_score': retrieval_score,
'generation_score': generation_score
})
# Aggregate metrics
results['retrieval_metrics'] = self._aggregate_scores(
[r['retrieval_score'] for r in results['detailed_results']
if r['retrieval_score']]
)
results['generation_metrics'] = self._aggregate_scores(
[r['generation_score'] for r in results['detailed_results']
if r['generation_score']]
)
return results
def _aggregate_scores(self, scores: List[float]) -> Dict:
"""Aggregate evaluation scores"""
if not scores:
return {}
return {
'mean': np.mean(scores),
'std': np.std(scores),
'min': np.min(scores),
'max': np.max(scores),
'median': np.median(scores)
}
class RetrievalMetrics:
def evaluate(self, retrieved: List[Dict], relevant: List[str]) -> float:
"""Calculate retrieval metrics"""
retrieved_ids = [doc.get('id') for doc in retrieved]
# Precision
relevant_retrieved = len(set(retrieved_ids) & set(relevant))
precision = relevant_retrieved / len(retrieved_ids) if retrieved_ids else 0
# Recall
recall = relevant_retrieved / len(relevant) if relevant else 0
# F1 Score
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return f1
Production Deployment
RAG API Service
# rag_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Dict
import asyncio
app = FastAPI()
class IndexRequest(BaseModel):
documents: List[Dict]
metadata: Optional[Dict] = {}
class QueryRequest(BaseModel):
query: str
top_k: Optional[int] = 5
filters: Optional[Dict] = None
include_sources: Optional[bool] = True
class RAGService:
def __init__(self):
self.rag_system = None
self.index_lock = asyncio.Lock()
self.query_semaphore = asyncio.Semaphore(10) # Limit concurrent queries
async def initialize(self):
"""Initialize RAG system"""
self.rag_system = RAGSystem(
vector_db=VectorDBProvider.FAISS,
embedding_model="text-embedding-ada-002",
llm_model="gpt-4"
)
# Load existing index if available
try:
self.rag_system.vector_db.load("./data/index")
except:
pass
rag_service = RAGService()
@app.on_event("startup")
async def startup():
await rag_service.initialize()
@app.post("/index")
async def index_documents(request: IndexRequest):
"""Index new documents"""
async with rag_service.index_lock:
try:
# Convert to Document objects
documents = [
Document(
id=doc['id'],
content=doc['content'],
metadata=doc.get('metadata', {})
)
for doc in request.documents
]
# Index documents
result = await rag_service.rag_system.index_documents(documents)
# Save index
rag_service.rag_system.vector_db.save("./data/index")
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query")
async def query(request: QueryRequest):
"""Query RAG system"""
async with rag_service.query_semaphore:
try:
result = await rag_service.rag_system.query(
query=request.query,
top_k=request.top_k,
filters=request.filters
)
if not request.include_sources:
del result['sources']
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "healthy",
"vector_db": rag_service.rag_system is not None
}
Best Practices Checklist
Conclusion
Building production RAG systems requires careful orchestration of retrieval, generation, and optimization components. Success comes from understanding the interplay between chunking strategies, retrieval methods, and response generation. Start with a solid foundation, measure everything, and continuously iterate based on real user queries. Remember, the best RAG system is one that consistently delivers accurate, relevant, and well-sourced responses.