Master building production-ready RAG applications with vector databases, advanced optimization techniques, and real-world deployment strategies.
Retrieval-Augmented Generation (RAG) represents a breakthrough in AI applications, combining the power of Large Language Models with external knowledge retrieval. This approach allows AI systems to access and reason over vast amounts of up-to-date information while maintaining the conversational abilities of modern LLMs. In this comprehensive guide, we'll build production-ready RAG systems from the ground up.
Understanding RAG Architecture
RAG systems work by retrieving relevant information from external sources and providing that context to language models for generating informed responses. The core insight is that while LLMs are powerful reasoning engines, they have knowledge cutoffs and can't access real-time information. RAG bridges this gap by combining retrieval systems with generation capabilities.
Core RAG Components
# rag_architecture.py
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import asyncio
import numpy as np
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
embedding: Optional[np.ndarray] = None
timestamp: datetime = datetime.now()
@dataclass
class RetrievalResult:
document: Document
score: float
relevance_explanation: Optional[str] = None
@dataclass
class RAGResponse:
answer: str
sources: List[RetrievalResult]
confidence: float
reasoning: Optional[str] = None
class DocumentStore(ABC):
"""Abstract interface for document storage"""
@abstractmethod
async def add_document(self, document: Document) -> str:
"""Add a document to the store"""
pass
@abstractmethod
async def get_document(self, doc_id: str) -> Optional[Document]:
"""Retrieve a document by ID"""
pass
@abstractmethod
async def search_documents(self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Dict[str, Any] = None) -> List[RetrievalResult]:
"""Search documents by embedding similarity"""
pass
@abstractmethod
async def update_document(self, doc_id: str, document: Document) -> bool:
"""Update an existing document"""
pass
@abstractmethod
async def delete_document(self, doc_id: str) -> bool:
"""Delete a document"""
pass
class EmbeddingModel(ABC):
"""Abstract interface for embedding generation"""
@abstractmethod
async def embed_text(self, text: str) -> np.ndarray:
"""Generate embedding for text"""
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[np.ndarray]:
"""Generate embeddings for multiple texts"""
pass
@property
@abstractmethod
def embedding_dimension(self) -> int:
"""Get embedding dimension"""
pass
class Retriever(ABC):
"""Abstract interface for information retrieval"""
@abstractmethod
async def retrieve(self,
query: str,
top_k: int = 10,
filters: Dict[str, Any] = None) -> List[RetrievalResult]:
"""Retrieve relevant documents for a query"""
pass
class Generator(ABC):
"""Abstract interface for response generation"""
@abstractmethod
async def generate(self,
query: str,
context: List[RetrievalResult]) -> RAGResponse:
"""Generate response given query and retrieved context"""
pass
Vector Database Implementation
# vector_database.py
import faiss
import numpy as np
import pickle
import sqlite3
import json
from typing import List, Dict, Any, Optional
from pathlib import Path
import asyncio
from concurrent.futures import ThreadPoolExecutor
class FAISSDocumentStore(DocumentStore):
"""FAISS-based vector database for document storage and retrieval"""
def __init__(self,
dimension: int,
index_type: str = "IVF",
db_path: str = "rag_database.db",
index_path: str = "faiss_index.bin"):
self.dimension = dimension
self.db_path = db_path
self.index_path = index_path
# Initialize FAISS index
if index_type == "IVF":
# Index with inverted file system for large datasets
quantizer = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
self.index = faiss.IndexIVFFlat(quantizer, dimension, 100) # 100 clusters
elif index_type == "HNSW":
# Hierarchical Navigable Small World graph
self.index = faiss.IndexHNSWFlat(dimension, 32)
self.index.hnsw.efConstruction = 40
else:
# Simple flat index for smaller datasets
self.index = faiss.IndexFlatIP(dimension)
# SQLite for metadata storage
self.executor = ThreadPoolExecutor(max_workers=4)
self._init_database()
self._load_index()
def _init_database(self):
"""Initialize SQLite database for metadata"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
metadata TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
faiss_id INTEGER
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON documents(timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_faiss_id ON documents(faiss_id)
""")
def _load_index(self):
"""Load existing FAISS index if available"""
index_file = Path(self.index_path)
if index_file.exists():
try:
self.index = faiss.read_index(str(index_file))
print(f"Loaded existing index with {self.index.ntotal} vectors")
except Exception as e:
print(f"Failed to load index: {e}")
# Keep the newly initialized index
def _save_index(self):
"""Save FAISS index to disk"""
try:
faiss.write_index(self.index, self.index_path)
except Exception as e:
print(f"Failed to save index: {e}")
async def add_document(self, document: Document) -> str:
"""Add document to the store"""
def _add_to_db():
# Add to SQLite
with sqlite3.connect(self.db_path) as conn:
faiss_id = self.index.ntotal
conn.execute("""
INSERT OR REPLACE INTO documents
(id, content, metadata, faiss_id)
VALUES (?, ?, ?, ?)
""", (
document.id,
document.content,
json.dumps(document.metadata),
faiss_id
))
# Add to FAISS index
if document.embedding is not None:
# Normalize for cosine similarity
embedding = document.embedding.astype(np.float32)
embedding = embedding / np.linalg.norm(embedding)
self.index.add(embedding.reshape(1, -1))
# Save index periodically
if self.index.ntotal % 1000 == 0:
self._save_index()
return document.id
return await asyncio.get_event_loop().run_in_executor(
self.executor, _add_to_db
)
async def get_document(self, doc_id: str) -> Optional[Document]:
"""Retrieve document by ID"""
def _get_from_db():
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT content, metadata, timestamp, faiss_id
FROM documents WHERE id = ?
""", (doc_id,))
row = cursor.fetchone()
if row:
content, metadata_json, timestamp, faiss_id = row
metadata = json.loads(metadata_json)
return Document(
id=doc_id,
content=content,
metadata=metadata,
timestamp=datetime.fromisoformat(timestamp)
)
return None
return await asyncio.get_event_loop().run_in_executor(
self.executor, _get_from_db
)
async def search_documents(self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Dict[str, Any] = None) -> List[RetrievalResult]:
"""Search documents by embedding similarity"""
def _search():
if self.index.ntotal == 0:
return []
# Normalize query embedding for cosine similarity
query_emb = query_embedding.astype(np.float32)
query_emb = query_emb / np.linalg.norm(query_emb)
# Search FAISS index
scores, indices = self.index.search(query_emb.reshape(1, -1), top_k)
results = []
with sqlite3.connect(self.db_path) as conn:
for score, idx in zip(scores[0], indices[0]):
if idx == -1: # FAISS returns -1 for invalid indices
continue
# Get document from SQLite
cursor = conn.execute("""
SELECT id, content, metadata
FROM documents WHERE faiss_id = ?
""", (int(idx),))
row = cursor.fetchone()
if row:
doc_id, content, metadata_json = row
metadata = json.loads(metadata_json)
# Apply filters if specified
if filters and not self._matches_filters(metadata, filters):
continue
document = Document(
id=doc_id,
content=content,
metadata=metadata
)
results.append(RetrievalResult(
document=document,
score=float(score)
))
return results
return await asyncio.get_event_loop().run_in_executor(
self.executor, _search
)
def _matches_filters(self, metadata: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""Check if document metadata matches filters"""
for key, value in filters.items():
if key not in metadata:
return False
if isinstance(value, list):
if metadata[key] not in value:
return False
else:
if metadata[key] != value:
return False
return True
async def update_document(self, doc_id: str, document: Document) -> bool:
"""Update existing document"""
existing = await self.get_document(doc_id)
if not existing:
return False
# For simplicity, we'll delete and re-add
# In production, you'd want more sophisticated update logic
await self.delete_document(doc_id)
await self.add_document(document)
return True
async def delete_document(self, doc_id: str) -> bool:
"""Delete document (mark as deleted in metadata)"""
def _delete_from_db():
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("DELETE FROM documents WHERE id = ?", (doc_id,))
return cursor.rowcount > 0
return await asyncio.get_event_loop().run_in_executor(
self.executor, _delete_from_db
)
def __del__(self):
"""Cleanup: save index and close executor"""
if hasattr(self, 'index'):
self._save_index()
if hasattr(self, 'executor'):
self.executor.shutdown(wait=True)
Advanced Embedding Models
# embedding_models.py
from sentence_transformers import SentenceTransformer
import openai
from typing import List
import numpy as np
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential
class SentenceTransformerEmbedding(EmbeddingModel):
"""Local sentence transformer model for embeddings"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self._dimension = self.model.get_sentence_embedding_dimension()
async def embed_text(self, text: str) -> np.ndarray:
"""Generate embedding for single text"""
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
embedding = await loop.run_in_executor(
None, self.model.encode, text
)
return embedding.astype(np.float32)
async def embed_batch(self, texts: List[str]) -> List[np.ndarray]:
"""Generate embeddings for multiple texts"""
loop = asyncio.get_event_loop()
embeddings = await loop.run_in_executor(
None, self.model.encode, texts
)
return [emb.astype(np.float32) for emb in embeddings]
@property
def embedding_dimension(self) -> int:
return self._dimension
class OpenAIEmbedding(EmbeddingModel):
"""OpenAI embedding model with async support"""
def __init__(self,
model: str = "text-embedding-ada-002",
api_key: str = None):
self.model = model
self.client = openai.AsyncOpenAI(api_key=api_key)
self._dimension = 1536 if "ada-002" in model else 1536 # Default
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def embed_text(self, text: str) -> np.ndarray:
"""Generate embedding for single text"""
response = await self.client.embeddings.create(
model=self.model,
input=text
)
return np.array(response.data[0].embedding, dtype=np.float32)
async def embed_batch(self, texts: List[str]) -> List[np.ndarray]:
"""Generate embeddings for multiple texts"""
# OpenAI supports batch processing
response = await self.client.embeddings.create(
model=self.model,
input=texts
)
return [
np.array(item.embedding, dtype=np.float32)
for item in response.data
]
@property
def embedding_dimension(self) -> int:
return self._dimension
class HybridEmbedding(EmbeddingModel):
"""Hybrid embedding combining multiple models"""
def __init__(self,
semantic_model: EmbeddingModel,
keyword_model: EmbeddingModel,
semantic_weight: float = 0.7):
self.semantic_model = semantic_model
self.keyword_model = keyword_model
self.semantic_weight = semantic_weight
self.keyword_weight = 1.0 - semantic_weight
# Dimension is sum of both models
self._dimension = (semantic_model.embedding_dimension +
keyword_model.embedding_dimension)
async def embed_text(self, text: str) -> np.ndarray:
"""Generate hybrid embedding"""
# Get embeddings from both models concurrently
semantic_task = self.semantic_model.embed_text(text)
keyword_task = self.keyword_model.embed_text(text)
semantic_emb, keyword_emb = await asyncio.gather(
semantic_task, keyword_task
)
# Weight and concatenate
weighted_semantic = semantic_emb * self.semantic_weight
weighted_keyword = keyword_emb * self.keyword_weight
return np.concatenate([weighted_semantic, weighted_keyword])
async def embed_batch(self, texts: List[str]) -> List[np.ndarray]:
"""Generate hybrid embeddings for multiple texts"""
semantic_task = self.semantic_model.embed_batch(texts)
keyword_task = self.keyword_model.embed_batch(texts)
semantic_embs, keyword_embs = await asyncio.gather(
semantic_task, keyword_task
)
result = []
for sem_emb, key_emb in zip(semantic_embs, keyword_embs):
weighted_semantic = sem_emb * self.semantic_weight
weighted_keyword = key_emb * self.keyword_weight
hybrid_emb = np.concatenate([weighted_semantic, weighted_keyword])
result.append(hybrid_emb)
return result
@property
def embedding_dimension(self) -> int:
return self._dimension
Advanced Retrieval Strategies
# advanced_retrieval.py
from typing import List, Dict, Any, Optional, Set
import re
from dataclasses import dataclass
from collections import defaultdict
import asyncio
import math
@dataclass
class RetrievalStrategy:
name: str
weight: float
enabled: bool = True
class AdvancedRetriever(Retriever):
"""Advanced retriever with multiple strategies"""
def __init__(self,
document_store: DocumentStore,
embedding_model: EmbeddingModel,
strategies: List[RetrievalStrategy] = None):
self.document_store = document_store
self.embedding_model = embedding_model
if strategies is None:
strategies = [
RetrievalStrategy("semantic", 0.6),
RetrievalStrategy("keyword", 0.2),
RetrievalStrategy("hybrid", 0.2)
]
self.strategies = {s.name: s for s in strategies}
# Query expansion patterns
self.expansion_patterns = {
"synonyms": {
"artificial intelligence": ["AI", "machine learning", "deep learning"],
"natural language processing": ["NLP", "text processing", "language understanding"],
"database": ["DB", "data store", "repository"],
# Add more domain-specific synonyms
}
}
async def retrieve(self,
query: str,
top_k: int = 10,
filters: Dict[str, Any] = None) -> List[RetrievalResult]:
"""Advanced retrieval with multiple strategies"""
# Step 1: Query preprocessing and expansion
expanded_queries = await self._expand_query(query)
# Step 2: Multi-strategy retrieval
all_results = []
if self.strategies["semantic"].enabled:
semantic_results = await self._semantic_retrieval(
expanded_queries, top_k * 2, filters
)
all_results.extend(semantic_results)
if self.strategies["keyword"].enabled:
keyword_results = await self._keyword_retrieval(
expanded_queries, top_k * 2, filters
)
all_results.extend(keyword_results)
if self.strategies["hybrid"].enabled:
hybrid_results = await self._hybrid_retrieval(
expanded_queries, top_k * 2, filters
)
all_results.extend(hybrid_results)
# Step 3: Result fusion and re-ranking
fused_results = await self._fuse_results(all_results, query)
# Step 4: Final ranking and filtering
final_results = await self._final_ranking(fused_results, query)
return final_results[:top_k]
async def _expand_query(self, query: str) -> List[str]:
"""Expand query with synonyms and related terms"""
expanded = [query] # Original query
query_lower = query.lower()
# Add synonym expansions
for term, synonyms in self.expansion_patterns["synonyms"].items():
if term in query_lower:
for synonym in synonyms:
expanded_query = query_lower.replace(term, synonym)
expanded.append(expanded_query)
# Add question variations
if not query.endswith('?'):
expanded.append(f"What is {query}?")
expanded.append(f"How does {query} work?")
return list(set(expanded)) # Remove duplicates
async def _semantic_retrieval(self,
queries: List[str],
top_k: int,
filters: Dict[str, Any]) -> List[RetrievalResult]:
"""Semantic similarity retrieval"""
# Use the main query for embedding
main_query = queries[0]
query_embedding = await self.embedding_model.embed_text(main_query)
results = await self.document_store.search_documents(
query_embedding, top_k, filters
)
# Weight results by strategy
strategy_weight = self.strategies["semantic"].weight
for result in results:
result.score *= strategy_weight
result.relevance_explanation = f"Semantic similarity: {result.score:.3f}"
return results
async def _keyword_retrieval(self,
queries: List[str],
top_k: int,
filters: Dict[str, Any]) -> List[RetrievalResult]:
"""Keyword-based retrieval using TF-IDF-like scoring"""
# Extract keywords from queries
all_keywords = set()
for query in queries:
keywords = self._extract_keywords(query)
all_keywords.update(keywords)
# This is a simplified implementation
# In production, you'd use a proper text search engine like Elasticsearch
results = []
strategy_weight = self.strategies["keyword"].weight
# For demo purposes, we'll search through document store
# In practice, you'd have a separate keyword index
return results # Placeholder
async def _hybrid_retrieval(self,
queries: List[str],
top_k: int,
filters: Dict[str, Any]) -> List[RetrievalResult]:
"""Hybrid retrieval combining semantic and keyword approaches"""
# Get semantic results
semantic_results = await self._semantic_retrieval(queries, top_k // 2, filters)
# Get keyword results
keyword_results = await self._keyword_retrieval(queries, top_k // 2, filters)
# Combine and re-weight
strategy_weight = self.strategies["hybrid"].weight
all_results = semantic_results + keyword_results
for result in all_results:
result.score *= strategy_weight
return all_results
async def _fuse_results(self,
all_results: List[RetrievalResult],
original_query: str) -> List[RetrievalResult]:
"""Fuse results from different strategies"""
# Group results by document ID
doc_results = defaultdict(list)
for result in all_results:
doc_results[result.document.id].append(result)
# Combine scores for same documents
fused_results = []
for doc_id, results in doc_results.items():
# Use maximum score (could also use average or weighted sum)
best_result = max(results, key=lambda r: r.score)
# Combine scores from all strategies
combined_score = sum(r.score for r in results)
best_result.score = combined_score
# Combine explanations
explanations = [r.relevance_explanation for r in results if r.relevance_explanation]
best_result.relevance_explanation = "; ".join(explanations)
fused_results.append(best_result)
return fused_results
async def _final_ranking(self,
results: List[RetrievalResult],
query: str) -> List[RetrievalResult]:
"""Final ranking with additional factors"""
# Add recency boost
for result in results:
doc = result.document
if hasattr(doc, 'timestamp') and doc.timestamp:
# Boost more recent documents
days_old = (datetime.now() - doc.timestamp).days
recency_boost = 1.0 / (1.0 + days_old * 0.01) # Decay factor
result.score *= recency_boost
# Add document quality score
for result in results:
doc = result.document
quality_score = self._calculate_quality_score(doc)
result.score *= quality_score
# Sort by final score
results.sort(key=lambda r: r.score, reverse=True)
return results
def _extract_keywords(self, text: str) -> Set[str]:
"""Extract keywords from text"""
# Simple keyword extraction (in production, use proper NLP)
words = re.findall(r'\b\w+\b', text.lower())
# Filter out stop words
stop_words = {
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have',
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should'
}
keywords = {word for word in words if len(word) > 2 and word not in stop_words}
return keywords
def _calculate_quality_score(self, document: Document) -> float:
"""Calculate document quality score"""
score = 1.0
# Length factor (prefer moderate length documents)
content_length = len(document.content)
if content_length < 100:
score *= 0.8 # Too short
elif content_length > 10000:
score *= 0.9 # Very long
else:
score *= 1.0 # Good length
# Metadata factors
if 'author' in document.metadata:
score *= 1.1 # Has author information
if 'source_quality' in document.metadata:
source_quality = document.metadata['source_quality']
score *= source_quality
return min(score, 2.0) # Cap the boost
class ContextualRetriever(AdvancedRetriever):
"""Retriever that maintains conversation context"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conversation_history = []
self.context_window = 5 # Last 5 interactions
async def retrieve_with_context(self,
query: str,
conversation_context: List[str] = None,
top_k: int = 10,
filters: Dict[str, Any] = None) -> List[RetrievalResult]:
"""Retrieve with conversation context"""
# Build contextual query
if conversation_context:
context_summary = self._summarize_context(conversation_context)
contextual_query = f"Context: {context_summary}\n\nCurrent question: {query}"
else:
contextual_query = query
# Use the contextual query for retrieval
results = await self.retrieve(contextual_query, top_k, filters)
# Update conversation history
self.conversation_history.append(query)
if len(self.conversation_history) > self.context_window:
self.conversation_history.pop(0)
return results
def _summarize_context(self, context: List[str]) -> str:
"""Summarize conversation context"""
if not context:
return ""
# Simple summarization (in production, use LLM)
recent_context = context[-3:] # Last 3 messages
return " ".join(recent_context)
RAG Generation with Citations
# rag_generator.py
from typing import List, Dict, Any, Optional
import re
import json
from openai import AsyncOpenAI
class RAGGenerator(Generator):
"""RAG generator with citation support"""
def __init__(self,
model: str = "gpt-4",
api_key: str = None,
temperature: float = 0.7,
max_tokens: int = 1000):
self.client = AsyncOpenAI(api_key=api_key)
self.model = model
self.temperature = temperature
self.max_tokens = max_tokens
# System prompt for RAG generation
self.system_prompt = """
You are an AI assistant that answers questions based on provided context documents.
Instructions:
1. Answer the question using ONLY the information provided in the context documents
2. If the context doesn't contain enough information, say so clearly
3. Always cite your sources using [Source X] format where X is the source number
4. Be precise and accurate in your responses
5. If there are conflicting information in the sources, mention this
6. Provide reasoning for your conclusions when appropriate
Format your response as JSON with these fields:
- "answer": Your main response with citations
- "confidence": A score from 0.0 to 1.0 indicating your confidence
- "reasoning": Brief explanation of your reasoning process
- "sources_used": List of source numbers you referenced
"""
async def generate(self,
query: str,
context: List[RetrievalResult]) -> RAGResponse:
"""Generate response with citations"""
# Prepare context with source numbers
context_text = self._format_context(context)
# Build the prompt
user_prompt = f"""
Context Documents:
{context_text}
Question: {query}
Please provide a comprehensive answer based on the context documents above.
"""
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_prompt}
]
try:
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens
)
response_content = response.choices[0].message.content
# Parse JSON response
try:
parsed_response = json.loads(response_content)
answer = parsed_response.get("answer", response_content)
confidence = parsed_response.get("confidence", 0.5)
reasoning = parsed_response.get("reasoning", "")
sources_used = parsed_response.get("sources_used", [])
except json.JSONDecodeError:
# Fallback if JSON parsing fails
answer = response_content
confidence = 0.7
reasoning = "Generated from provided context"
sources_used = list(range(1, len(context) + 1))
# Filter sources based on what was actually used
used_sources = []
for i, source_num in enumerate(sources_used):
if source_num <= len(context):
used_sources.append(context[source_num - 1])
return RAGResponse(
answer=answer,
sources=used_sources,
confidence=confidence,
reasoning=reasoning
)
except Exception as e:
# Error handling
return RAGResponse(
answer=f"I encountered an error while generating the response: {str(e)}",
sources=[],
confidence=0.0,
reasoning="Error in generation process"
)
def _format_context(self, context: List[RetrievalResult]) -> str:
"""Format context documents with source numbers"""
formatted_context = []
for i, result in enumerate(context, 1):
doc = result.document
# Include metadata if available
metadata_info = ""
if doc.metadata:
relevant_metadata = []
for key in ['title', 'author', 'date', 'source']:
if key in doc.metadata:
relevant_metadata.append(f"{key}: {doc.metadata[key]}")
if relevant_metadata:
metadata_info = f" ({', '.join(relevant_metadata)})"
formatted_context.append(
f"[Source {i}]{metadata_info}: {doc.content}"
)
return "\n\n".join(formatted_context)
class AdaptiveRAGGenerator(RAGGenerator):
"""RAG generator that adapts based on query type and context quality"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Different prompts for different query types
self.prompt_templates = {
"factual": """
You are answering a factual question. Focus on accuracy and provide specific details.
Always cite your sources and indicate if information is uncertain.
""",
"analytical": """
You are providing analysis based on the given information.
Synthesize information from multiple sources and provide insights.
Show your reasoning process and acknowledge any limitations.
""",
"comparative": """
You are comparing different concepts or options.
Present balanced viewpoints and highlight key differences/similarities.
Use multiple sources to provide comprehensive comparison.
""",
"procedural": """
You are explaining a process or providing instructions.
Be clear and step-by-step in your explanation.
Ensure all steps are supported by the provided sources.
"""
}
async def generate(self,
query: str,
context: List[RetrievalResult]) -> RAGResponse:
"""Generate adaptive response based on query type"""
# Classify query type
query_type = self._classify_query(query)
# Assess context quality
context_quality = self._assess_context_quality(context, query)
# Adapt generation strategy
if context_quality < 0.5:
# Low quality context - be more conservative
temperature = 0.3
system_prompt = self._get_conservative_prompt()
else:
# High quality context - can be more creative
temperature = self.temperature
system_prompt = self.prompt_templates.get(query_type, self.system_prompt)
# Generate with adapted parameters
original_temp = self.temperature
self.temperature = temperature
# Temporarily modify system prompt
original_prompt = self.system_prompt
self.system_prompt = system_prompt
try:
response = await super().generate(query, context)
# Add adaptive metadata
response.reasoning += f" [Query type: {query_type}, Context quality: {context_quality:.2f}]"
return response
finally:
# Restore original settings
self.temperature = original_temp
self.system_prompt = original_prompt
def _classify_query(self, query: str) -> str:
"""Classify query type for adaptive generation"""
query_lower = query.lower()
# Factual question indicators
factual_indicators = ['what is', 'who is', 'when did', 'where is', 'how many']
if any(indicator in query_lower for indicator in factual_indicators):
return "factual"
# Analytical question indicators
analytical_indicators = ['why', 'how does', 'explain', 'analyze', 'evaluate']
if any(indicator in query_lower for indicator in analytical_indicators):
return "analytical"
# Comparative question indicators
comparative_indicators = ['compare', 'versus', 'vs', 'difference between', 'better']
if any(indicator in query_lower for indicator in comparative_indicators):
return "comparative"
# Procedural question indicators
procedural_indicators = ['how to', 'steps', 'process', 'procedure', 'guide']
if any(indicator in query_lower for indicator in procedural_indicators):
return "procedural"
return "factual" # Default
def _assess_context_quality(self, context: List[RetrievalResult], query: str) -> float:
"""Assess quality of retrieved context for the query"""
if not context:
return 0.0
quality_score = 0.0
# Factor 1: Relevance scores
avg_relevance = sum(result.score for result in context) / len(context)
quality_score += avg_relevance * 0.4
# Factor 2: Content coverage
query_keywords = set(re.findall(r'\b\w+\b', query.lower()))
total_coverage = 0
for result in context:
content_words = set(re.findall(r'\b\w+\b', result.document.content.lower()))
coverage = len(query_keywords.intersection(content_words)) / len(query_keywords)
total_coverage += coverage
avg_coverage = total_coverage / len(context)
quality_score += avg_coverage * 0.3
# Factor 3: Source diversity
sources = set()
for result in context:
source = result.document.metadata.get('source', 'unknown')
sources.add(source)
diversity_score = min(len(sources) / len(context), 1.0)
quality_score += diversity_score * 0.2
# Factor 4: Content length (moderate length preferred)
avg_length = sum(len(result.document.content) for result in context) / len(context)
length_score = 1.0 if 200 <= avg_length <= 2000 else 0.5
quality_score += length_score * 0.1
return min(quality_score, 1.0)
def _get_conservative_prompt(self) -> str:
"""Get conservative prompt for low-quality context"""
return """
You are answering based on limited context information. Be conservative in your response.
Instructions:
1. Only state information you can directly verify from the context
2. Clearly indicate when information is insufficient or uncertain
3. Avoid making inferences beyond what's explicitly stated
4. Always cite your sources
5. If context is inadequate, clearly state this limitation
Prioritize accuracy over completeness.
"""
Complete RAG Application
# rag_application.py
import asyncio
from typing import List, Dict, Any, Optional
from pathlib import Path
import logging
class DocumentProcessor:
"""Process and chunk documents for RAG ingestion"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
async def process_text(self,
text: str,
metadata: Dict[str, Any] = None) -> List[Document]:
"""Process text into chunks"""
if metadata is None:
metadata = {}
# Simple text chunking (in production, use semantic chunking)
chunks = self._chunk_text(text)
documents = []
for i, chunk in enumerate(chunks):
doc = Document(
id=f"{metadata.get('source_id', 'doc')}_{i}",
content=chunk,
metadata={
**metadata,
'chunk_index': i,
'total_chunks': len(chunks)
}
)
documents.append(doc)
return documents
def _chunk_text(self, text: str) -> List[str]:
"""Chunk text into overlapping segments"""
# Split by sentences first
sentences = re.split(r'[.!?]+', text)
chunks = []
current_chunk = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# Check if adding this sentence would exceed chunk size
if len(current_chunk) + len(sentence) <= self.chunk_size:
current_chunk += sentence + ". "
else:
# Start new chunk
if current_chunk:
chunks.append(current_chunk.strip())
# Handle overlap
if self.chunk_overlap > 0 and chunks:
overlap_text = current_chunk[-self.chunk_overlap:]
current_chunk = overlap_text + sentence + ". "
else:
current_chunk = sentence + ". "
# Add final chunk
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
class RAGApplication:
"""Complete RAG application"""
def __init__(self,
document_store: DocumentStore,
embedding_model: EmbeddingModel,
retriever: Retriever,
generator: Generator,
processor: DocumentProcessor = None):
self.document_store = document_store
self.embedding_model = embedding_model
self.retriever = retriever
self.generator = generator
self.processor = processor or DocumentProcessor()
# Setup logging
self.logger = logging.getLogger(__name__)
# Performance metrics
self.metrics = {
'total_queries': 0,
'successful_queries': 0,
'avg_response_time': 0.0,
'avg_confidence': 0.0
}
async def ingest_document(self,
text: str,
metadata: Dict[str, Any] = None) -> List[str]:
"""Ingest a document into the RAG system"""
# Process document into chunks
documents = await self.processor.process_text(text, metadata)
# Generate embeddings and store
doc_ids = []
for doc in documents:
# Generate embedding
embedding = await self.embedding_model.embed_text(doc.content)
doc.embedding = embedding
# Store document
doc_id = await self.document_store.add_document(doc)
doc_ids.append(doc_id)
self.logger.info(f"Ingested document with {len(doc_ids)} chunks")
return doc_ids
async def query(self,
question: str,
top_k: int = 5,
filters: Dict[str, Any] = None,
include_reasoning: bool = True) -> Dict[str, Any]:
"""Query the RAG system"""
start_time = asyncio.get_event_loop().time()
try:
# Retrieve relevant documents
self.logger.info(f"Retrieving documents for query: {question[:100]}...")
retrieved_docs = await self.retriever.retrieve(
question, top_k=top_k, filters=filters
)
if not retrieved_docs:
return {
"answer": "I couldn't find relevant information to answer your question.",
"sources": [],
"confidence": 0.0,
"reasoning": "No relevant documents found",
"retrieval_count": 0
}
# Generate response
self.logger.info(f"Generating response using {len(retrieved_docs)} documents")
response = await self.generator.generate(question, retrieved_docs)
# Update metrics
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
self._update_metrics(response_time, response.confidence, success=True)
# Format final response
result = {
"answer": response.answer,
"sources": [
{
"content": source.document.content[:200] + "...",
"metadata": source.document.metadata,
"relevance_score": source.score
}
for source in response.sources
],
"confidence": response.confidence,
"retrieval_count": len(retrieved_docs),
"response_time": response_time
}
if include_reasoning and response.reasoning:
result["reasoning"] = response.reasoning
return result
except Exception as e:
self.logger.error(f"Error processing query: {e}")
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
self._update_metrics(response_time, 0.0, success=False)
return {
"answer": "I encountered an error while processing your question.",
"sources": [],
"confidence": 0.0,
"error": str(e),
"response_time": response_time
}
async def batch_query(self,
questions: List[str],
**kwargs) -> List[Dict[str, Any]]:
"""Process multiple queries concurrently"""
tasks = [self.query(q, **kwargs) for q in questions]
return await asyncio.gather(*tasks)
async def get_system_stats(self) -> Dict[str, Any]:
"""Get system statistics"""
# Document store stats
total_docs = 0 # You'd implement this in your document store
return {
"metrics": self.metrics,
"document_count": total_docs,
"embedding_dimension": self.embedding_model.embedding_dimension
}
def _update_metrics(self, response_time: float, confidence: float, success: bool):
"""Update performance metrics"""
self.metrics['total_queries'] += 1
if success:
self.metrics['successful_queries'] += 1
# Update running averages
total = self.metrics['total_queries']
self.metrics['avg_response_time'] = (
(self.metrics['avg_response_time'] * (total - 1) + response_time) / total
)
if success:
successful = self.metrics['successful_queries']
self.metrics['avg_confidence'] = (
(self.metrics['avg_confidence'] * (successful - 1) + confidence) / successful
)
# Example usage
async def main():
# Initialize components
embedding_model = SentenceTransformerEmbedding()
document_store = FAISSDocumentStore(
dimension=embedding_model.embedding_dimension
)
retriever = AdvancedRetriever(document_store, embedding_model)
generator = RAGGenerator()
# Create RAG application
rag_app = RAGApplication(
document_store=document_store,
embedding_model=embedding_model,
retriever=retriever,
generator=generator
)
# Ingest some documents
sample_docs = [
{
"text": "Python is a high-level programming language known for its simplicity and readability. It supports multiple programming paradigms including procedural, object-oriented, and functional programming.",
"metadata": {"source": "programming_guide", "topic": "python"}
},
{
"text": "Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed. It uses algorithms to build mathematical models based on training data.",
"metadata": {"source": "ml_textbook", "topic": "machine_learning"}
}
]
for doc in sample_docs:
await rag_app.ingest_document(doc["text"], doc["metadata"])
# Query the system
questions = [
"What is Python?",
"How does machine learning work?",
"What are the benefits of Python for beginners?"
]
for question in questions:
print(f"\nQuestion: {question}")
response = await rag_app.query(question)
print(f"Answer: {response['answer']}")
print(f"Confidence: {response['confidence']}")
print(f"Sources used: {len(response['sources'])}")
if __name__ == "__main__":
asyncio.run(main())
Production Deployment and Monitoring
RAG System Monitoring
# rag_monitoring.py
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
import json
from collections import defaultdict
@dataclass
class RAGMetrics:
query_count: int = 0
avg_response_time: float = 0.0
avg_confidence: float = 0.0
retrieval_accuracy: float = 0.0
user_satisfaction: float = 0.0
error_rate: float = 0.0
class RAGMonitor:
"""Comprehensive monitoring for RAG systems"""
def __init__(self, rag_app: RAGApplication):
self.rag_app = rag_app
self.metrics_history = []
self.alert_thresholds = {
'response_time': 5.0, # seconds
'confidence': 0.6, # minimum confidence
'error_rate': 0.1, # 10% error rate
'retrieval_accuracy': 0.7
}
async def track_query(self,
query: str,
response: Dict[str, Any],
user_feedback: Optional[float] = None):
"""Track individual query metrics"""
metrics = {
'timestamp': datetime.now(),
'query': query,
'response_time': response.get('response_time', 0),
'confidence': response.get('confidence', 0),
'retrieval_count': response.get('retrieval_count', 0),
'success': 'error' not in response,
'user_feedback': user_feedback
}
self.metrics_history.append(metrics)
# Keep only last 24 hours
cutoff = datetime.now() - timedelta(hours=24)
self.metrics_history = [
m for m in self.metrics_history
if m['timestamp'] > cutoff
]
# Check for alerts
await self._check_alerts(metrics)
async def _check_alerts(self, current_metrics: Dict[str, Any]):
"""Check for alert conditions"""
# Response time alert
if current_metrics['response_time'] > self.alert_thresholds['response_time']:
await self._send_alert('high_response_time', current_metrics)
# Confidence alert
if current_metrics['confidence'] < self.alert_thresholds['confidence']:
await self._send_alert('low_confidence', current_metrics)
# Error alert
if not current_metrics['success']:
await self._send_alert('query_error', current_metrics)
async def _send_alert(self, alert_type: str, metrics: Dict[str, Any]):
"""Send alert (implement your alerting system here)"""
print(f"ALERT [{alert_type}]: {metrics}")
def get_performance_report(self,
period_hours: int = 24) -> Dict[str, Any]:
"""Generate performance report"""
cutoff = datetime.now() - timedelta(hours=period_hours)
recent_metrics = [
m for m in self.metrics_history
if m['timestamp'] > cutoff
]
if not recent_metrics:
return {"error": "No data for specified period"}
# Calculate aggregated metrics
total_queries = len(recent_metrics)
successful_queries = [m for m in recent_metrics if m['success']]
avg_response_time = sum(m['response_time'] for m in recent_metrics) / total_queries
avg_confidence = sum(m['confidence'] for m in successful_queries) / len(successful_queries) if successful_queries else 0
error_rate = (total_queries - len(successful_queries)) / total_queries
# User satisfaction (if feedback available)
feedback_metrics = [m for m in recent_metrics if m['user_feedback'] is not None]
avg_satisfaction = sum(m['user_feedback'] for m in feedback_metrics) / len(feedback_metrics) if feedback_metrics else None
return {
'period_hours': period_hours,
'total_queries': total_queries,
'successful_queries': len(successful_queries),
'avg_response_time': avg_response_time,
'avg_confidence': avg_confidence,
'error_rate': error_rate,
'avg_user_satisfaction': avg_satisfaction,
'performance_trend': self._calculate_trend(recent_metrics)
}
def _calculate_trend(self, metrics: List[Dict[str, Any]]) -> str:
"""Calculate performance trend"""
if len(metrics) < 10:
return "insufficient_data"
# Split into two halves and compare
mid_point = len(metrics) // 2
first_half = metrics[:mid_point]
second_half = metrics[mid_point:]
first_avg_conf = sum(m['confidence'] for m in first_half) / len(first_half)
second_avg_conf = sum(m['confidence'] for m in second_half) / len(second_half)
if second_avg_conf > first_avg_conf * 1.05:
return "improving"
elif second_avg_conf < first_avg_conf * 0.95:
return "declining"
else:
return "stable"
Best Practices and Optimization
Performance Optimization
- Embedding Caching: Cache embeddings for frequently accessed documents
- Batch Processing: Process multiple queries in batches when possible
- Index Optimization: Regular index maintenance and optimization
- Context Window Management: Optimize context length based on model constraints
Quality Assurance
- Evaluation Datasets: Maintain evaluation datasets for continuous testing
- Human Feedback Loop: Implement user feedback collection and analysis
- Source Verification: Implement source credibility scoring
- Bias Detection: Monitor for potential biases in retrieval and generation
Security Considerations
- Input Sanitization: Validate and sanitize all user inputs
- Access Control: Implement proper authentication and authorization
- Data Privacy: Ensure compliance with data protection regulations
- Audit Logging: Maintain comprehensive audit logs
Conclusion
Building production-ready RAG applications requires careful consideration of architecture, implementation, and optimization. The patterns and techniques covered in this guide provide a solid foundation for creating robust, scalable, and reliable RAG systems.
Key takeaways:
- Architecture Matters: Design with modularity and scalability in mind
- Quality Control: Implement comprehensive testing and monitoring
- User Experience: Focus on response quality and relevance
- Continuous Improvement: Use metrics and feedback to iterate and improve
RAG technology is rapidly evolving, with new techniques and optimizations emerging regularly. Stay informed about the latest developments and be prepared to adapt your implementations as the field advances.
Share this article
David Childs
Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.