Build RAG Systems with Vector Databases

David Childs

Build production-ready RAG systems using vector databases, embedding optimization, and advanced retrieval patterns for intelligent applications.

RAG systems have revolutionized how we build AI applications that need access to specific knowledge. After building RAG systems processing millions of documents, I've learned that success requires more than just connecting an LLM to a vector database. Here's your comprehensive guide to building production-grade RAG systems.

RAG Architecture Fundamentals

Complete RAG System Design

# rag_system.py
from typing import List, Dict, Optional, Any
import numpy as np
from dataclasses import dataclass
import asyncio
from enum import Enum

class VectorDBProvider(Enum):
    PINECONE = "pinecone"
    WEAVIATE = "weaviate"
    QDRANT = "qdrant"
    MILVUS = "milvus"
    CHROMA = "chroma"
    FAISS = "faiss"

@dataclass
class Document:
    id: str
    content: str
    metadata: Dict[str, Any]
    embedding: Optional[np.ndarray] = None
    chunks: Optional[List['DocumentChunk']] = None

@dataclass
class DocumentChunk:
    id: str
    document_id: str
    content: str
    embedding: np.ndarray
    metadata: Dict[str, Any]
    position: int

class RAGSystem:
    def __init__(self, 
                 vector_db: VectorDBProvider,
                 embedding_model: str = "text-embedding-ada-002",
                 llm_model: str = "gpt-4",
                 chunk_size: int = 512,
                 chunk_overlap: int = 50):
        
        self.vector_db = self._initialize_vector_db(vector_db)
        self.embedding_model = self._initialize_embedding_model(embedding_model)
        self.llm_model = llm_model
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # Initialize components
        self.document_processor = DocumentProcessor(chunk_size, chunk_overlap)
        self.retriever = HybridRetriever(self.vector_db)
        self.reranker = Reranker()
        self.generator = ResponseGenerator(llm_model)
        
    async def index_documents(self, documents: List[Document]) -> Dict:
        """Index documents into vector database"""
        
        indexed_count = 0
        chunk_count = 0
        errors = []
        
        for doc in documents:
            try:
                # Process document into chunks
                chunks = self.document_processor.process(doc)
                
                # Generate embeddings
                for chunk in chunks:
                    chunk.embedding = await self._generate_embedding(chunk.content)
                
                # Store in vector database
                await self.vector_db.upsert(chunks)
                
                indexed_count += 1
                chunk_count += len(chunks)
                
            except Exception as e:
                errors.append({
                    'document_id': doc.id,
                    'error': str(e)
                })
        
        return {
            'indexed_documents': indexed_count,
            'total_chunks': chunk_count,
            'errors': errors
        }
    
    async def query(self, 
                   query: str,
                   top_k: int = 10,
                   filters: Optional[Dict] = None,
                   rerank: bool = True) -> Dict:
        """Query RAG system"""
        
        # Generate query embedding
        query_embedding = await self._generate_embedding(query)
        
        # Retrieve relevant chunks
        retrieved_chunks = await self.retriever.retrieve(
            query_embedding,
            query,  # For hybrid search
            top_k=top_k * 2 if rerank else top_k,
            filters=filters
        )
        
        # Rerank if enabled
        if rerank:
            retrieved_chunks = await self.reranker.rerank(
                query,
                retrieved_chunks,
                top_k=top_k
            )
        
        # Generate response
        response = await self.generator.generate(
            query,
            retrieved_chunks,
            self.llm_model
        )
        
        return {
            'response': response['text'],
            'sources': [chunk.metadata for chunk in retrieved_chunks],
            'confidence': response.get('confidence', 0.0)
        }
    
    async def _generate_embedding(self, text: str) -> np.ndarray:
        """Generate text embedding"""
        
        # In production, batch these requests
        embedding = await self.embedding_model.encode(text)
        return np.array(embedding)

class DocumentProcessor:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
    def process(self, document: Document) -> List[DocumentChunk]:
        """Process document into chunks"""
        
        chunks = []
        text = document.content
        
        # Smart chunking with sentence boundaries
        sentences = self._split_into_sentences(text)
        current_chunk = []
        current_size = 0
        position = 0
        
        for sentence in sentences:
            sentence_size = len(sentence.split())
            
            if current_size + sentence_size > self.chunk_size and current_chunk:
                # Create chunk
                chunk_text = ' '.join(current_chunk)
                chunk = DocumentChunk(
                    id=f"{document.id}_chunk_{position}",
                    document_id=document.id,
                    content=chunk_text,
                    embedding=None,  # Will be generated later
                    metadata={
                        **document.metadata,
                        'chunk_position': position,
                        'chunk_size': len(chunk_text)
                    },
                    position=position
                )
                chunks.append(chunk)
                
                # Overlap handling
                overlap_sentences = current_chunk[-self.chunk_overlap:]
                current_chunk = overlap_sentences
                current_size = sum(len(s.split()) for s in overlap_sentences)
                position += 1
            
            current_chunk.append(sentence)
            current_size += sentence_size
        
        # Add remaining chunk
        if current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunk = DocumentChunk(
                id=f"{document.id}_chunk_{position}",
                document_id=document.id,
                content=chunk_text,
                embedding=None,
                metadata={
                    **document.metadata,
                    'chunk_position': position,
                    'chunk_size': len(chunk_text)
                },
                position=position
            )
            chunks.append(chunk)
        
        return chunks
    
    def _split_into_sentences(self, text: str) -> List[str]:
        """Split text into sentences"""
        
        # Simple implementation - use NLTK or spaCy in production
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]

Vector Database Implementation

# vector_database.py
import numpy as np
from typing import List, Dict, Optional, Tuple
import faiss
import pickle
from abc import ABC, abstractmethod

class VectorDatabase(ABC):
    @abstractmethod
    async def upsert(self, chunks: List[DocumentChunk]):
        pass
    
    @abstractmethod
    async def search(self, embedding: np.ndarray, top_k: int) -> List[DocumentChunk]:
        pass
    
    @abstractmethod
    async def delete(self, ids: List[str]):
        pass

class FAISSVectorDB(VectorDatabase):
    def __init__(self, dimension: int = 1536, index_type: str = "IVF"):
        self.dimension = dimension
        self.index_type = index_type
        
        # Initialize FAISS index
        if index_type == "IVF":
            quantizer = faiss.IndexFlatL2(dimension)
            self.index = faiss.IndexIVFFlat(quantizer, dimension, 100)
        elif index_type == "HNSW":
            self.index = faiss.IndexHNSWFlat(dimension, 32)
        else:
            self.index = faiss.IndexFlatL2(dimension)
        
        # Metadata storage
        self.metadata = {}
        self.id_to_index = {}
        self.index_to_id = {}
        self.current_index = 0
        
    async def upsert(self, chunks: List[DocumentChunk]):
        """Insert or update chunks"""
        
        embeddings = []
        
        for chunk in chunks:
            # Store metadata
            self.metadata[chunk.id] = chunk
            
            # Map IDs to indices
            if chunk.id not in self.id_to_index:
                self.id_to_index[chunk.id] = self.current_index
                self.index_to_id[self.current_index] = chunk.id
                self.current_index += 1
            
            embeddings.append(chunk.embedding)
        
        # Add to FAISS index
        embeddings_array = np.array(embeddings).astype('float32')
        
        if self.index_type == "IVF" and not self.index.is_trained:
            self.index.train(embeddings_array)
        
        self.index.add(embeddings_array)
    
    async def search(self, 
                    embedding: np.ndarray,
                    top_k: int = 10,
                    filters: Optional[Dict] = None) -> List[Tuple[DocumentChunk, float]]:
        """Search for similar vectors"""
        
        # Search in FAISS
        query_embedding = embedding.reshape(1, -1).astype('float32')
        distances, indices = self.index.search(query_embedding, top_k)
        
        results = []
        for idx, distance in zip(indices[0], distances[0]):
            if idx == -1:
                continue
            
            chunk_id = self.index_to_id.get(idx)
            if chunk_id and chunk_id in self.metadata:
                chunk = self.metadata[chunk_id]
                
                # Apply filters
                if filters:
                    if not self._match_filters(chunk, filters):
                        continue
                
                results.append((chunk, float(distance)))
        
        return results
    
    def _match_filters(self, chunk: DocumentChunk, filters: Dict) -> bool:
        """Check if chunk matches filters"""
        
        for key, value in filters.items():
            if key not in chunk.metadata:
                return False
            
            if isinstance(value, list):
                if chunk.metadata[key] not in value:
                    return False
            else:
                if chunk.metadata[key] != value:
                    return False
        
        return True
    
    async def delete(self, ids: List[str]):
        """Delete chunks by ID"""
        
        for chunk_id in ids:
            if chunk_id in self.metadata:
                del self.metadata[chunk_id]
            
            if chunk_id in self.id_to_index:
                index = self.id_to_index[chunk_id]
                del self.id_to_index[chunk_id]
                del self.index_to_id[index]
    
    def save(self, path: str):
        """Save index to disk"""
        
        faiss.write_index(self.index, f"{path}.index")
        
        with open(f"{path}.metadata", 'wb') as f:
            pickle.dump({
                'metadata': self.metadata,
                'id_to_index': self.id_to_index,
                'index_to_id': self.index_to_id,
                'current_index': self.current_index
            }, f)
    
    def load(self, path: str):
        """Load index from disk"""
        
        self.index = faiss.read_index(f"{path}.index")
        
        with open(f"{path}.metadata", 'rb') as f:
            data = pickle.load(f)
            self.metadata = data['metadata']
            self.id_to_index = data['id_to_index']
            self.index_to_id = data['index_to_id']
            self.current_index = data['current_index']

Advanced Retrieval Techniques

Hybrid Search Implementation

# hybrid_retrieval.py
from typing import List, Tuple, Optional
import numpy as np
from rank_bm25 import BM25Okapi

class HybridRetriever:
    def __init__(self, 
                 vector_db: VectorDatabase,
                 alpha: float = 0.7):  # Weight for vector search
        self.vector_db = vector_db
        self.alpha = alpha
        self.bm25_index = None
        self.documents = []
        
    def build_bm25_index(self, documents: List[str]):
        """Build BM25 index for keyword search"""
        
        # Tokenize documents
        tokenized_docs = [doc.lower().split() for doc in documents]
        
        # Build BM25 index
        self.bm25_index = BM25Okapi(tokenized_docs)
        self.documents = documents
    
    async def retrieve(self,
                      query_embedding: np.ndarray,
                      query_text: str,
                      top_k: int = 10,
                      filters: Optional[Dict] = None) -> List[DocumentChunk]:
        """Hybrid retrieval combining vector and keyword search"""
        
        # Vector search
        vector_results = await self.vector_db.search(
            query_embedding,
            top_k=top_k * 2,
            filters=filters
        )
        
        # BM25 keyword search
        if self.bm25_index:
            query_tokens = query_text.lower().split()
            bm25_scores = self.bm25_index.get_scores(query_tokens)
            
            # Get top BM25 results
            top_indices = np.argsort(bm25_scores)[-top_k * 2:][::-1]
            bm25_results = [(self.documents[i], bm25_scores[i]) 
                           for i in top_indices]
        else:
            bm25_results = []
        
        # Combine and rerank
        combined_results = self._combine_results(
            vector_results,
            bm25_results,
            self.alpha
        )
        
        # Sort by combined score
        combined_results.sort(key=lambda x: x[1], reverse=True)
        
        return [chunk for chunk, _ in combined_results[:top_k]]
    
    def _combine_results(self,
                        vector_results: List[Tuple[DocumentChunk, float]],
                        bm25_results: List[Tuple[str, float]],
                        alpha: float) -> List[Tuple[DocumentChunk, float]]:
        """Combine vector and BM25 results"""
        
        combined = {}
        
        # Normalize vector scores
        if vector_results:
            max_vector_score = max(score for _, score in vector_results)
            for chunk, score in vector_results:
                normalized_score = score / max_vector_score if max_vector_score > 0 else 0
                combined[chunk.id] = (chunk, alpha * normalized_score)
        
        # Normalize and add BM25 scores
        if bm25_results:
            max_bm25_score = max(score for _, score in bm25_results)
            for doc_text, score in bm25_results:
                normalized_score = score / max_bm25_score if max_bm25_score > 0 else 0
                
                # Find corresponding chunk
                for chunk_id, (chunk, vec_score) in combined.items():
                    if doc_text in chunk.content:
                        combined[chunk_id] = (
                            chunk,
                            vec_score + (1 - alpha) * normalized_score
                        )
        
        return list(combined.values())

class Reranker:
    def __init__(self, model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model)
        
    async def rerank(self,
                    query: str,
                    chunks: List[DocumentChunk],
                    top_k: int = 5) -> List[DocumentChunk]:
        """Rerank chunks using cross-encoder"""
        
        if not chunks:
            return []
        
        # Prepare pairs
        pairs = [(query, chunk.content) for chunk in chunks]
        
        # Get scores
        scores = self.model.predict(pairs)
        
        # Sort by score
        chunk_scores = list(zip(chunks, scores))
        chunk_scores.sort(key=lambda x: x[1], reverse=True)
        
        return [chunk for chunk, _ in chunk_scores[:top_k]]

Contextual Compression

# contextual_compression.py
class ContextualCompressor:
    def __init__(self, llm_model: str = "gpt-3.5-turbo"):
        self.llm_model = llm_model
        
    async def compress_documents(self,
                                query: str,
                                documents: List[DocumentChunk],
                                max_tokens: int = 2000) -> str:
        """Compress documents to relevant information"""
        
        compressed_chunks = []
        
        for doc in documents:
            # Extract relevant sentences
            relevant = await self._extract_relevant(query, doc.content)
            
            if relevant:
                compressed_chunks.append(relevant)
        
        # Combine compressed chunks
        combined = self._combine_chunks(compressed_chunks, max_tokens)
        
        return combined
    
    async def _extract_relevant(self, query: str, content: str) -> str:
        """Extract query-relevant information from content"""
        
        prompt = f"""
        Query: {query}
        
        Document: {content}
        
        Extract only the information from the document that is directly relevant to answering the query.
        If nothing is relevant, return "NOT_RELEVANT".
        """
        
        # Call LLM for extraction
        response = await self._call_llm(prompt)
        
        if response != "NOT_RELEVANT":
            return response
        return ""
    
    def _combine_chunks(self, chunks: List[str], max_tokens: int) -> str:
        """Combine chunks within token limit"""
        
        combined = []
        current_tokens = 0
        
        for chunk in chunks:
            chunk_tokens = len(chunk.split())
            
            if current_tokens + chunk_tokens <= max_tokens:
                combined.append(chunk)
                current_tokens += chunk_tokens
            else:
                break
        
        return "\n\n".join(combined)

Query Enhancement

Query Expansion and Optimization

# query_enhancement.py
class QueryEnhancer:
    def __init__(self, llm_model: str = "gpt-3.5-turbo"):
        self.llm_model = llm_model
        self.query_cache = {}
        
    async def enhance_query(self, query: str) -> Dict:
        """Enhance query for better retrieval"""
        
        # Check cache
        if query in self.query_cache:
            return self.query_cache[query]
        
        enhanced = {
            'original': query,
            'expanded': await self._expand_query(query),
            'decomposed': await self._decompose_query(query),
            'hypothetical_answer': await self._generate_hypothetical_answer(query),
            'keywords': self._extract_keywords(query)
        }
        
        self.query_cache[query] = enhanced
        return enhanced
    
    async def _expand_query(self, query: str) -> str:
        """Expand query with synonyms and related terms"""
        
        prompt = f"""
        Expand the following query with related terms and synonyms that would help find relevant information:
        
        Query: {query}
        
        Expanded query (include original + related terms):
        """
        
        return await self._call_llm(prompt)
    
    async def _decompose_query(self, query: str) -> List[str]:
        """Decompose complex query into sub-queries"""
        
        prompt = f"""
        Break down this complex query into simpler sub-queries:
        
        Query: {query}
        
        Sub-queries (one per line):
        """
        
        response = await self._call_llm(prompt)
        return [q.strip() for q in response.split('\n') if q.strip()]
    
    async def _generate_hypothetical_answer(self, query: str) -> str:
        """Generate hypothetical answer for HyDE technique"""
        
        prompt = f"""
        Generate a hypothetical but realistic answer to this question:
        
        Question: {query}
        
        Hypothetical answer:
        """
        
        return await self._call_llm(prompt)
    
    def _extract_keywords(self, query: str) -> List[str]:
        """Extract key terms from query"""
        
        # Simple implementation - use NLP library in production
        import re
        
        # Remove common words
        stopwords = {'the', 'is', 'at', 'which', 'on', 'a', 'an', 'and', 'or', 'but'}
        
        words = re.findall(r'\w+', query.lower())
        keywords = [w for w in words if w not in stopwords and len(w) > 2]
        
        return keywords

Response Generation

Advanced Response Generation

# response_generation.py
class ResponseGenerator:
    def __init__(self, llm_model: str = "gpt-4"):
        self.llm_model = llm_model
        self.citation_formatter = CitationFormatter()
        
    async def generate(self,
                      query: str,
                      context_chunks: List[DocumentChunk],
                      model: str = None) -> Dict:
        """Generate response with citations"""
        
        # Format context
        context = self._format_context(context_chunks)
        
        # Generate response
        prompt = self._build_prompt(query, context)
        
        response = await self._call_llm(prompt, model or self.llm_model)
        
        # Post-process response
        processed = self._post_process(response, context_chunks)
        
        # Add citations
        with_citations = self.citation_formatter.add_citations(
            processed,
            context_chunks
        )
        
        # Calculate confidence
        confidence = self._calculate_confidence(response, context_chunks)
        
        return {
            'text': with_citations,
            'confidence': confidence,
            'chunks_used': len(context_chunks)
        }
    
    def _format_context(self, chunks: List[DocumentChunk]) -> str:
        """Format context chunks for prompt"""
        
        formatted = []
        
        for i, chunk in enumerate(chunks):
            formatted.append(f"[{i+1}] {chunk.content}")
        
        return "\n\n".join(formatted)
    
    def _build_prompt(self, query: str, context: str) -> str:
        """Build generation prompt"""
        
        return f"""Answer the following question based on the provided context. 
        If the answer cannot be found in the context, say so.
        Include reference numbers [1], [2], etc. for your sources.
        
        Context:
        {context}
        
        Question: {query}
        
        Answer:"""
    
    def _calculate_confidence(self, 
                             response: str,
                             chunks: List[DocumentChunk]) -> float:
        """Calculate response confidence score"""
        
        confidence = 1.0
        
        # Reduce confidence if response indicates uncertainty
        uncertainty_phrases = [
            "i'm not sure",
            "it's unclear",
            "cannot be determined",
            "no information",
            "not found in context"
        ]
        
        response_lower = response.lower()
        for phrase in uncertainty_phrases:
            if phrase in response_lower:
                confidence *= 0.5
        
        # Increase confidence based on number of citations
        import re
        citations = re.findall(r'\[\d+\]', response)
        confidence = min(1.0, confidence + len(citations) * 0.1)
        
        return confidence

class CitationFormatter:
    def add_citations(self, 
                      text: str,
                      chunks: List[DocumentChunk]) -> str:
        """Add proper citations to response"""
        
        # Map citation numbers to sources
        citation_map = {}
        
        for i, chunk in enumerate(chunks):
            citation_num = i + 1
            source = chunk.metadata.get('source', 'Unknown')
            page = chunk.metadata.get('page', '')
            
            citation_map[citation_num] = {
                'source': source,
                'page': page
            }
        
        # Add citation list at end
        if citation_map:
            citations_text = "\n\nSources:\n"
            for num, info in citation_map.items():
                citations_text += f"[{num}] {info['source']}"
                if info['page']:
                    citations_text += f", page {info['page']}"
                citations_text += "\n"
            
            text += citations_text
        
        return text

Evaluation and Optimization

RAG Evaluation Framework

# rag_evaluation.py
class RAGEvaluator:
    def __init__(self):
        self.metrics = {
            'retrieval': RetrievalMetrics(),
            'generation': GenerationMetrics(),
            'end_to_end': EndToEndMetrics()
        }
        
    async def evaluate(self,
                       rag_system: RAGSystem,
                       test_dataset: List[Dict]) -> Dict:
        """Evaluate RAG system performance"""
        
        results = {
            'retrieval_metrics': {},
            'generation_metrics': {},
            'end_to_end_metrics': {},
            'detailed_results': []
        }
        
        for test_case in test_dataset:
            query = test_case['query']
            expected_answer = test_case.get('answer')
            relevant_docs = test_case.get('relevant_docs', [])
            
            # Get RAG response
            response = await rag_system.query(query)
            
            # Evaluate retrieval
            if relevant_docs:
                retrieval_score = self.metrics['retrieval'].evaluate(
                    response['sources'],
                    relevant_docs
                )
            else:
                retrieval_score = None
            
            # Evaluate generation
            if expected_answer:
                generation_score = await self.metrics['generation'].evaluate(
                    response['response'],
                    expected_answer,
                    query
                )
            else:
                generation_score = None
            
            # Store results
            results['detailed_results'].append({
                'query': query,
                'response': response['response'],
                'retrieval_score': retrieval_score,
                'generation_score': generation_score
            })
        
        # Aggregate metrics
        results['retrieval_metrics'] = self._aggregate_scores(
            [r['retrieval_score'] for r in results['detailed_results'] 
             if r['retrieval_score']]
        )
        
        results['generation_metrics'] = self._aggregate_scores(
            [r['generation_score'] for r in results['detailed_results']
             if r['generation_score']]
        )
        
        return results
    
    def _aggregate_scores(self, scores: List[float]) -> Dict:
        """Aggregate evaluation scores"""
        
        if not scores:
            return {}
        
        return {
            'mean': np.mean(scores),
            'std': np.std(scores),
            'min': np.min(scores),
            'max': np.max(scores),
            'median': np.median(scores)
        }

class RetrievalMetrics:
    def evaluate(self, retrieved: List[Dict], relevant: List[str]) -> float:
        """Calculate retrieval metrics"""
        
        retrieved_ids = [doc.get('id') for doc in retrieved]
        
        # Precision
        relevant_retrieved = len(set(retrieved_ids) & set(relevant))
        precision = relevant_retrieved / len(retrieved_ids) if retrieved_ids else 0
        
        # Recall
        recall = relevant_retrieved / len(relevant) if relevant else 0
        
        # F1 Score
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return f1

Production Deployment

RAG API Service

# rag_api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Dict
import asyncio

app = FastAPI()

class IndexRequest(BaseModel):
    documents: List[Dict]
    metadata: Optional[Dict] = {}

class QueryRequest(BaseModel):
    query: str
    top_k: Optional[int] = 5
    filters: Optional[Dict] = None
    include_sources: Optional[bool] = True

class RAGService:
    def __init__(self):
        self.rag_system = None
        self.index_lock = asyncio.Lock()
        self.query_semaphore = asyncio.Semaphore(10)  # Limit concurrent queries
        
    async def initialize(self):
        """Initialize RAG system"""
        
        self.rag_system = RAGSystem(
            vector_db=VectorDBProvider.FAISS,
            embedding_model="text-embedding-ada-002",
            llm_model="gpt-4"
        )
        
        # Load existing index if available
        try:
            self.rag_system.vector_db.load("./data/index")
        except:
            pass

rag_service = RAGService()

@app.on_event("startup")
async def startup():
    await rag_service.initialize()

@app.post("/index")
async def index_documents(request: IndexRequest):
    """Index new documents"""
    
    async with rag_service.index_lock:
        try:
            # Convert to Document objects
            documents = [
                Document(
                    id=doc['id'],
                    content=doc['content'],
                    metadata=doc.get('metadata', {})
                )
                for doc in request.documents
            ]
            
            # Index documents
            result = await rag_service.rag_system.index_documents(documents)
            
            # Save index
            rag_service.rag_system.vector_db.save("./data/index")
            
            return result
            
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

@app.post("/query")
async def query(request: QueryRequest):
    """Query RAG system"""
    
    async with rag_service.query_semaphore:
        try:
            result = await rag_service.rag_system.query(
                query=request.query,
                top_k=request.top_k,
                filters=request.filters
            )
            
            if not request.include_sources:
                del result['sources']
            
            return result
            
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health():
    """Health check endpoint"""
    
    return {
        "status": "healthy",
        "vector_db": rag_service.rag_system is not None
    }

Best Practices Checklist

  • Choose appropriate chunking strategy
  • Implement hybrid search (vector + keyword)
  • Use query enhancement techniques
  • Set up document preprocessing pipeline
  • Implement caching for embeddings
  • Use reranking for better results
  • Monitor retrieval quality metrics
  • Implement proper error handling
  • Set up incremental indexing
  • Use metadata filtering effectively
  • Implement citation tracking
  • Monitor and optimize costs
  • Set up A/B testing for improvements
  • Regular index maintenance
  • Implement security and access control

Conclusion

Building production RAG systems requires careful orchestration of retrieval, generation, and optimization components. Success comes from understanding the interplay between chunking strategies, retrieval methods, and response generation. Start with a solid foundation, measure everything, and continuously iterate based on real user queries. Remember, the best RAG system is one that consistently delivers accurate, relevant, and well-sourced responses.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles