Master Vector Databases for AI Apps

David Childs

Master vector databases and embeddings for AI applications including similarity search, semantic retrieval, and production-ready optimization strategies.

Vector databases and embeddings form the backbone of modern AI applications, enabling semantic search, recommendation systems, RAG applications, and similarity matching at scale. Unlike traditional databases that store and query exact matches, vector databases work with high-dimensional embeddings that capture semantic meaning, allowing for nuanced similarity searches and intelligent information retrieval.

Understanding Vector Embeddings

Vector embeddings are numerical representations of data that capture semantic relationships in high-dimensional space. Points that are close together in this space represent similar concepts, enabling machines to understand context and meaning rather than just exact matches.

Embedding Fundamentals

# embedding_fundamentals.py
import numpy as np
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from enum import Enum
import math
from abc import ABC, abstractmethod

class EmbeddingType(Enum):
    TEXT = "text"
    IMAGE = "image"
    AUDIO = "audio"
    MULTIMODAL = "multimodal"

@dataclass
class EmbeddingMetadata:
    embedding_type: EmbeddingType
    dimension: int
    model_name: str
    creation_timestamp: float
    normalization: str = "l2"  # l2, l1, none
    distance_metric: str = "cosine"  # cosine, euclidean, dot_product

class VectorOperations:
    """Core vector operations for embeddings"""
    
    @staticmethod
    def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate cosine similarity between two vectors"""
        
        # Handle zero vectors
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        return np.dot(vec1, vec2) / (norm1 * norm2)
    
    @staticmethod
    def euclidean_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate Euclidean distance between two vectors"""
        return np.linalg.norm(vec1 - vec2)
    
    @staticmethod
    def manhattan_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate Manhattan (L1) distance between two vectors"""
        return np.sum(np.abs(vec1 - vec2))
    
    @staticmethod
    def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
        """Calculate dot product between two vectors"""
        return np.dot(vec1, vec2)
    
    @staticmethod
    def normalize_vector(vector: np.ndarray, method: str = "l2") -> np.ndarray:
        """Normalize vector using specified method"""
        
        if method == "l2":
            norm = np.linalg.norm(vector)
            return vector / norm if norm > 0 else vector
        elif method == "l1":
            norm = np.sum(np.abs(vector))
            return vector / norm if norm > 0 else vector
        elif method == "max":
            max_val = np.max(np.abs(vector))
            return vector / max_val if max_val > 0 else vector
        else:
            return vector
    
    @staticmethod
    def calculate_centroid(vectors: List[np.ndarray]) -> np.ndarray:
        """Calculate centroid of multiple vectors"""
        
        if not vectors:
            raise ValueError("Cannot calculate centroid of empty vector list")
        
        return np.mean(vectors, axis=0)
    
    @staticmethod
    def find_outliers(vectors: List[np.ndarray], 
                     threshold: float = 2.0) -> List[int]:
        """Find outlier vectors using distance from centroid"""
        
        centroid = VectorOperations.calculate_centroid(vectors)
        distances = [
            VectorOperations.euclidean_distance(vec, centroid) 
            for vec in vectors
        ]
        
        mean_distance = np.mean(distances)
        std_distance = np.std(distances)
        
        outliers = []
        for i, distance in enumerate(distances):
            if abs(distance - mean_distance) > threshold * std_distance:
                outliers.append(i)
        
        return outliers
    
    @staticmethod
    def reduce_dimensionality_pca(vectors: List[np.ndarray], 
                                 target_dim: int) -> Tuple[List[np.ndarray], np.ndarray]:
        """Reduce dimensionality using PCA"""
        
        # Convert to matrix
        X = np.vstack(vectors)
        
        # Center the data
        X_centered = X - np.mean(X, axis=0)
        
        # Compute covariance matrix
        cov_matrix = np.cov(X_centered.T)
        
        # Compute eigenvalues and eigenvectors
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
        
        # Sort by eigenvalues (descending)
        idx = np.argsort(eigenvalues)[::-1]
        eigenvalues = eigenvalues[idx]
        eigenvectors = eigenvectors[:, idx]
        
        # Select top components
        components = eigenvectors[:, :target_dim]
        
        # Transform data
        X_reduced = X_centered @ components
        
        reduced_vectors = [X_reduced[i] for i in range(len(vectors))]
        
        return reduced_vectors, components

class EmbeddingQuality:
    """Tools for assessing embedding quality"""
    
    @staticmethod
    def calculate_intrinsic_dimension(vectors: List[np.ndarray], 
                                    sample_size: int = 1000) -> float:
        """Estimate intrinsic dimensionality using correlation dimension"""
        
        if len(vectors) < sample_size:
            sample_vectors = vectors
        else:
            indices = np.random.choice(len(vectors), sample_size, replace=False)
            sample_vectors = [vectors[i] for i in indices]
        
        # Calculate pairwise distances
        distances = []
        for i in range(len(sample_vectors)):
            for j in range(i + 1, len(sample_vectors)):
                dist = VectorOperations.euclidean_distance(
                    sample_vectors[i], sample_vectors[j]
                )
                distances.append(dist)
        
        distances = np.array(distances)
        
        # Use correlation dimension estimation
        # This is a simplified version of the Grassberger-Procaccia algorithm
        
        radii = np.logspace(np.log10(np.min(distances)), 
                           np.log10(np.max(distances)), 20)
        
        correlations = []
        for r in radii:
            correlation = np.sum(distances < r) / len(distances)
            correlations.append(correlation)
        
        # Fit line to log-log plot
        log_radii = np.log(radii[1:])  # Skip first point to avoid log(0)
        log_correlations = np.log(np.array(correlations[1:]) + 1e-10)
        
        # Remove infinite values
        valid_mask = np.isfinite(log_radii) & np.isfinite(log_correlations)
        if np.sum(valid_mask) < 2:
            return float(len(sample_vectors[0]))  # Fall back to embedding dimension
        
        # Linear regression
        coeffs = np.polyfit(log_radii[valid_mask], log_correlations[valid_mask], 1)
        
        return abs(coeffs[0])  # Slope gives dimension estimate
    
    @staticmethod
    def calculate_embedding_coherence(vectors: List[np.ndarray],
                                    labels: List[str] = None) -> float:
        """Calculate coherence of embeddings (how well similar items cluster)"""
        
        if labels is None:
            # Without labels, use distance-based coherence
            return EmbeddingQuality._distance_based_coherence(vectors)
        
        # With labels, calculate label-based coherence
        return EmbeddingQuality._label_based_coherence(vectors, labels)
    
    @staticmethod
    def _distance_based_coherence(vectors: List[np.ndarray]) -> float:
        """Calculate coherence based on distance distribution"""
        
        # Calculate all pairwise distances
        n = len(vectors)
        distances = []
        
        for i in range(n):
            for j in range(i + 1, n):
                dist = VectorOperations.cosine_similarity(vectors[i], vectors[j])
                distances.append(dist)
        
        # Higher coherence means less variation in distances
        return 1.0 - np.std(distances)
    
    @staticmethod
    def _label_based_coherence(vectors: List[np.ndarray], 
                              labels: List[str]) -> float:
        """Calculate coherence based on label clustering"""
        
        # Group vectors by label
        label_groups = {}
        for i, label in enumerate(labels):
            if label not in label_groups:
                label_groups[label] = []
            label_groups[label].append(vectors[i])
        
        # Calculate intra-cluster similarity
        intra_similarities = []
        for label, group_vectors in label_groups.items():
            if len(group_vectors) < 2:
                continue
            
            group_similarities = []
            for i in range(len(group_vectors)):
                for j in range(i + 1, len(group_vectors)):
                    sim = VectorOperations.cosine_similarity(
                        group_vectors[i], group_vectors[j]
                    )
                    group_similarities.append(sim)
            
            if group_similarities:
                intra_similarities.extend(group_similarities)
        
        # Calculate inter-cluster similarity
        inter_similarities = []
        label_list = list(label_groups.keys())
        for i in range(len(label_list)):
            for j in range(i + 1, len(label_list)):
                label1, label2 = label_list[i], label_list[j]
                
                for vec1 in label_groups[label1]:
                    for vec2 in label_groups[label2]:
                        sim = VectorOperations.cosine_similarity(vec1, vec2)
                        inter_similarities.append(sim)
        
        # Good coherence: high intra-cluster, low inter-cluster similarity
        avg_intra = np.mean(intra_similarities) if intra_similarities else 0
        avg_inter = np.mean(inter_similarities) if inter_similarities else 0
        
        # Coherence score
        return avg_intra - avg_inter
    
    @staticmethod
    def evaluate_embedding_space(vectors: List[np.ndarray],
                               labels: List[str] = None) -> Dict[str, float]:
        """Comprehensive evaluation of embedding space quality"""
        
        if not vectors:
            return {"error": "No vectors provided"}
        
        results = {}
        
        # Basic statistics
        dimensions = len(vectors[0])
        results["embedding_dimension"] = dimensions
        results["num_vectors"] = len(vectors)
        
        # Intrinsic dimensionality
        try:
            results["intrinsic_dimension"] = EmbeddingQuality.calculate_intrinsic_dimension(vectors)
            results["dimension_efficiency"] = results["intrinsic_dimension"] / dimensions
        except:
            results["intrinsic_dimension"] = None
            results["dimension_efficiency"] = None
        
        # Coherence
        results["coherence"] = EmbeddingQuality.calculate_embedding_coherence(vectors, labels)
        
        # Density analysis
        centroid = VectorOperations.calculate_centroid(vectors)
        distances_from_center = [
            VectorOperations.euclidean_distance(vec, centroid) 
            for vec in vectors
        ]
        
        results["mean_distance_from_center"] = np.mean(distances_from_center)
        results["std_distance_from_center"] = np.std(distances_from_center)
        results["density_uniformity"] = 1.0 / (1.0 + np.std(distances_from_center))
        
        # Outlier detection
        outliers = VectorOperations.find_outliers(vectors)
        results["outlier_count"] = len(outliers)
        results["outlier_percentage"] = len(outliers) / len(vectors) * 100
        
        return results

# Example usage and testing
def demonstrate_vector_operations():
    """Demonstrate vector operations and quality assessment"""
    
    # Create sample embeddings
    np.random.seed(42)
    
    # Create three clusters of vectors
    cluster1 = [np.random.normal([1, 1, 1], 0.1, 3) for _ in range(50)]
    cluster2 = [np.random.normal([-1, -1, 1], 0.1, 3) for _ in range(50)]
    cluster3 = [np.random.normal([0, 1, -1], 0.1, 3) for _ in range(50)]
    
    all_vectors = cluster1 + cluster2 + cluster3
    labels = ['A'] * 50 + ['B'] * 50 + ['C'] * 50
    
    # Normalize vectors
    normalized_vectors = [
        VectorOperations.normalize_vector(vec) for vec in all_vectors
    ]
    
    # Evaluate embedding quality
    quality_metrics = EmbeddingQuality.evaluate_embedding_space(
        normalized_vectors, labels
    )
    
    print("Embedding Quality Metrics:")
    for metric, value in quality_metrics.items():
        print(f"  {metric}: {value}")
    
    # Test similarity calculations
    vec1, vec2 = normalized_vectors[0], normalized_vectors[1]  # Same cluster
    vec3 = normalized_vectors[50]  # Different cluster
    
    print(f"\nSimilarity within cluster: {VectorOperations.cosine_similarity(vec1, vec2):.3f}")
    print(f"Similarity across clusters: {VectorOperations.cosine_similarity(vec1, vec3):.3f}")
    
    # Test dimensionality reduction
    reduced_vectors, components = VectorOperations.reduce_dimensionality_pca(
        normalized_vectors, target_dim=2
    )
    
    print(f"\nReduced from {len(normalized_vectors[0])} to {len(reduced_vectors[0])} dimensions")
    print(f"PCA components shape: {components.shape}")

if __name__ == "__main__":
    demonstrate_vector_operations()

Advanced Embedding Models

# advanced_embeddings.py
import asyncio
import aiohttp
import numpy as np
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod
import torch
from transformers import AutoTokenizer, AutoModel
import openai
import time
from tenacity import retry, stop_after_attempt, wait_exponential

@dataclass
class EmbeddingRequest:
    text: str
    metadata: Dict[str, Any] = None
    model_params: Dict[str, Any] = None

@dataclass
class EmbeddingResponse:
    embedding: np.ndarray
    model_name: str
    dimensions: int
    metadata: Dict[str, Any] = None
    processing_time: float = 0.0

class EmbeddingModel(ABC):
    """Abstract base class for embedding models"""
    
    @abstractmethod
    async def embed_single(self, text: str) -> EmbeddingResponse:
        pass
    
    @abstractmethod
    async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
        pass
    
    @property
    @abstractmethod
    def model_name(self) -> str:
        pass
    
    @property
    @abstractmethod
    def dimensions(self) -> int:
        pass
    
    @property
    @abstractmethod
    def max_sequence_length(self) -> int:
        pass

class OpenAIEmbeddingModel(EmbeddingModel):
    """OpenAI embedding model implementation"""
    
    def __init__(self, 
                 model: str = "text-embedding-3-large",
                 api_key: str = None,
                 dimensions: Optional[int] = None):
        
        self.client = openai.AsyncOpenAI(api_key=api_key)
        self._model_name = model
        self._dimensions = dimensions
        
        # Model specifications
        self.model_specs = {
            "text-embedding-3-large": {"max_dim": 3072, "max_tokens": 8191},
            "text-embedding-3-small": {"max_dim": 1536, "max_tokens": 8191},
            "text-embedding-ada-002": {"max_dim": 1536, "max_tokens": 8191}
        }
    
    @property
    def model_name(self) -> str:
        return self._model_name
    
    @property
    def dimensions(self) -> int:
        if self._dimensions:
            return self._dimensions
        return self.model_specs.get(self._model_name, {}).get("max_dim", 1536)
    
    @property
    def max_sequence_length(self) -> int:
        return self.model_specs.get(self._model_name, {}).get("max_tokens", 8191)
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def embed_single(self, text: str) -> EmbeddingResponse:
        """Embed single text"""
        
        start_time = time.time()
        
        params = {"model": self._model_name, "input": text}
        if self._dimensions:
            params["dimensions"] = self._dimensions
        
        response = await self.client.embeddings.create(**params)
        
        processing_time = time.time() - start_time
        
        return EmbeddingResponse(
            embedding=np.array(response.data[0].embedding, dtype=np.float32),
            model_name=self._model_name,
            dimensions=len(response.data[0].embedding),
            processing_time=processing_time,
            metadata={
                "usage": response.usage.total_tokens,
                "model": response.model
            }
        )
    
    async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
        """Embed multiple texts in batch"""
        
        start_time = time.time()
        
        params = {"model": self._model_name, "input": texts}
        if self._dimensions:
            params["dimensions"] = self._dimensions
        
        response = await self.client.embeddings.create(**params)
        
        processing_time = time.time() - start_time
        
        results = []
        for i, data in enumerate(response.data):
            results.append(EmbeddingResponse(
                embedding=np.array(data.embedding, dtype=np.float32),
                model_name=self._model_name,
                dimensions=len(data.embedding),
                processing_time=processing_time / len(texts),
                metadata={
                    "usage": response.usage.total_tokens / len(texts),
                    "model": response.model,
                    "batch_index": i
                }
            ))
        
        return results

class HuggingFaceEmbeddingModel(EmbeddingModel):
    """HuggingFace transformer embedding model"""
    
    def __init__(self, 
                 model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
                 device: str = "auto",
                 normalize_embeddings: bool = True):
        
        self._model_name = model_name
        self.normalize_embeddings = normalize_embeddings
        
        # Set device
        if device == "auto":
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        else:
            self.device = torch.device(device)
        
        # Load model and tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name).to(self.device)
        self.model.eval()
        
        # Get model specifications
        self._dimensions = self.model.config.hidden_size
        self._max_length = self.tokenizer.model_max_length
    
    @property
    def model_name(self) -> str:
        return self._model_name
    
    @property
    def dimensions(self) -> int:
        return self._dimensions
    
    @property
    def max_sequence_length(self) -> int:
        return self._max_length
    
    async def embed_single(self, text: str) -> EmbeddingResponse:
        """Embed single text"""
        
        start_time = time.time()
        
        # Run in thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        embedding = await loop.run_in_executor(None, self._encode_text, text)
        
        processing_time = time.time() - start_time
        
        return EmbeddingResponse(
            embedding=embedding,
            model_name=self._model_name,
            dimensions=len(embedding),
            processing_time=processing_time
        )
    
    async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
        """Embed multiple texts in batch"""
        
        start_time = time.time()
        
        # Run in thread pool
        loop = asyncio.get_event_loop()
        embeddings = await loop.run_in_executor(None, self._encode_batch, texts)
        
        processing_time = time.time() - start_time
        
        results = []
        for i, embedding in enumerate(embeddings):
            results.append(EmbeddingResponse(
                embedding=embedding,
                model_name=self._model_name,
                dimensions=len(embedding),
                processing_time=processing_time / len(texts),
                metadata={"batch_index": i}
            ))
        
        return results
    
    def _encode_text(self, text: str) -> np.ndarray:
        """Encode single text (runs in thread pool)"""
        
        inputs = self.tokenizer(
            text,
            truncation=True,
            padding=True,
            max_length=self._max_length,
            return_tensors="pt"
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            
            # Mean pooling
            token_embeddings = outputs.last_hidden_state
            input_mask_expanded = inputs.attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
            
            embedding = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
            
            if self.normalize_embeddings:
                embedding = torch.nn.functional.normalize(embedding, p=2, dim=1)
            
            return embedding.cpu().numpy()[0].astype(np.float32)
    
    def _encode_batch(self, texts: List[str]) -> List[np.ndarray]:
        """Encode batch of texts (runs in thread pool)"""
        
        inputs = self.tokenizer(
            texts,
            truncation=True,
            padding=True,
            max_length=self._max_length,
            return_tensors="pt"
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            
            # Mean pooling
            token_embeddings = outputs.last_hidden_state
            input_mask_expanded = inputs.attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
            
            embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
            
            if self.normalize_embeddings:
                embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
            
            return [emb.cpu().numpy().astype(np.float32) for emb in embeddings]

class MultiModalEmbeddingModel(EmbeddingModel):
    """Multi-modal embedding model for text and images"""
    
    def __init__(self, 
                 text_model: EmbeddingModel,
                 image_model: Optional[Any] = None,
                 fusion_method: str = "concatenate"):
        
        self.text_model = text_model
        self.image_model = image_model
        self.fusion_method = fusion_method
        
        # Calculate combined dimensions
        if fusion_method == "concatenate":
            self._dimensions = text_model.dimensions
            if image_model:
                self._dimensions += getattr(image_model, 'dimensions', 512)
        else:
            self._dimensions = text_model.dimensions
    
    @property
    def model_name(self) -> str:
        return f"multimodal_{self.text_model.model_name}"
    
    @property
    def dimensions(self) -> int:
        return self._dimensions
    
    @property
    def max_sequence_length(self) -> int:
        return self.text_model.max_sequence_length
    
    async def embed_single(self, text: str, image_data: Optional[Any] = None) -> EmbeddingResponse:
        """Embed text and optionally image"""
        
        start_time = time.time()
        
        # Get text embedding
        text_response = await self.text_model.embed_single(text)
        text_embedding = text_response.embedding
        
        # Get image embedding if provided
        if image_data and self.image_model:
            image_embedding = await self._embed_image(image_data)
            
            if self.fusion_method == "concatenate":
                combined_embedding = np.concatenate([text_embedding, image_embedding])
            elif self.fusion_method == "average":
                # Ensure same dimensions
                min_dim = min(len(text_embedding), len(image_embedding))
                combined_embedding = (text_embedding[:min_dim] + image_embedding[:min_dim]) / 2
            else:
                combined_embedding = text_embedding
        else:
            combined_embedding = text_embedding
        
        processing_time = time.time() - start_time
        
        return EmbeddingResponse(
            embedding=combined_embedding,
            model_name=self.model_name,
            dimensions=len(combined_embedding),
            processing_time=processing_time,
            metadata={
                "has_image": image_data is not None,
                "fusion_method": self.fusion_method
            }
        )
    
    async def embed_batch(self, texts: List[str], images: Optional[List[Any]] = None) -> List[EmbeddingResponse]:
        """Embed batch of texts and optionally images"""
        
        # Get text embeddings
        text_responses = await self.text_model.embed_batch(texts)
        
        results = []
        for i, text_response in enumerate(text_responses):
            image_data = images[i] if images and i < len(images) else None
            
            if image_data and self.image_model:
                image_embedding = await self._embed_image(image_data)
                
                if self.fusion_method == "concatenate":
                    combined_embedding = np.concatenate([text_response.embedding, image_embedding])
                elif self.fusion_method == "average":
                    min_dim = min(len(text_response.embedding), len(image_embedding))
                    combined_embedding = (text_response.embedding[:min_dim] + image_embedding[:min_dim]) / 2
                else:
                    combined_embedding = text_response.embedding
            else:
                combined_embedding = text_response.embedding
            
            results.append(EmbeddingResponse(
                embedding=combined_embedding,
                model_name=self.model_name,
                dimensions=len(combined_embedding),
                processing_time=text_response.processing_time,
                metadata={
                    "has_image": image_data is not None,
                    "fusion_method": self.fusion_method,
                    "batch_index": i
                }
            ))
        
        return results
    
    async def _embed_image(self, image_data: Any) -> np.ndarray:
        """Embed image data (placeholder implementation)"""
        
        # This would integrate with actual image embedding models
        # like CLIP, ResNet features, etc.
        
        # For now, return random embedding
        return np.random.rand(512).astype(np.float32)

class HybridEmbeddingModel(EmbeddingModel):
    """Hybrid model combining multiple embedding approaches"""
    
    def __init__(self, 
                 models: List[EmbeddingModel],
                 weights: Optional[List[float]] = None,
                 combination_method: str = "weighted_average"):
        
        self.models = models
        self.weights = weights or [1.0] * len(models)
        self.combination_method = combination_method
        
        # Normalize weights
        total_weight = sum(self.weights)
        self.weights = [w / total_weight for w in self.weights]
        
        # Set dimensions based on combination method
        if combination_method == "concatenate":
            self._dimensions = sum(model.dimensions for model in models)
        else:
            self._dimensions = models[0].dimensions
    
    @property
    def model_name(self) -> str:
        return f"hybrid_{len(self.models)}_models"
    
    @property
    def dimensions(self) -> int:
        return self._dimensions
    
    @property
    def max_sequence_length(self) -> int:
        return min(model.max_sequence_length for model in self.models)
    
    async def embed_single(self, text: str) -> EmbeddingResponse:
        """Embed text using all models and combine"""
        
        start_time = time.time()
        
        # Get embeddings from all models concurrently
        tasks = [model.embed_single(text) for model in self.models]
        responses = await asyncio.gather(*tasks)
        
        # Combine embeddings
        combined_embedding = self._combine_embeddings(
            [resp.embedding for resp in responses]
        )
        
        processing_time = time.time() - start_time
        
        return EmbeddingResponse(
            embedding=combined_embedding,
            model_name=self.model_name,
            dimensions=len(combined_embedding),
            processing_time=processing_time,
            metadata={
                "component_models": [resp.model_name for resp in responses],
                "combination_method": self.combination_method
            }
        )
    
    async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
        """Embed batch using all models and combine"""
        
        start_time = time.time()
        
        # Get embeddings from all models concurrently
        tasks = [model.embed_batch(texts) for model in self.models]
        model_responses = await asyncio.gather(*tasks)
        
        # Combine embeddings for each text
        results = []
        for i in range(len(texts)):
            embeddings_for_text = [
                model_resp[i].embedding for model_resp in model_responses
            ]
            
            combined_embedding = self._combine_embeddings(embeddings_for_text)
            
            results.append(EmbeddingResponse(
                embedding=combined_embedding,
                model_name=self.model_name,
                dimensions=len(combined_embedding),
                processing_time=(time.time() - start_time) / len(texts),
                metadata={
                    "component_models": [resp[i].model_name for resp in model_responses],
                    "combination_method": self.combination_method,
                    "batch_index": i
                }
            ))
        
        return results
    
    def _combine_embeddings(self, embeddings: List[np.ndarray]) -> np.ndarray:
        """Combine multiple embeddings using specified method"""
        
        if self.combination_method == "concatenate":
            return np.concatenate(embeddings)
        
        elif self.combination_method == "weighted_average":
            # Ensure all embeddings have same dimension
            min_dim = min(len(emb) for emb in embeddings)
            trimmed_embeddings = [emb[:min_dim] for emb in embeddings]
            
            weighted_sum = np.zeros(min_dim, dtype=np.float32)
            for embedding, weight in zip(trimmed_embeddings, self.weights):
                weighted_sum += embedding * weight
            
            return weighted_sum
        
        elif self.combination_method == "max_pooling":
            min_dim = min(len(emb) for emb in embeddings)
            stacked = np.stack([emb[:min_dim] for emb in embeddings])
            return np.max(stacked, axis=0)
        
        elif self.combination_method == "attention_weighted":
            # Simple attention mechanism
            min_dim = min(len(emb) for emb in embeddings)
            trimmed_embeddings = [emb[:min_dim] for emb in embeddings]
            
            # Calculate attention weights based on L2 norm
            norms = [np.linalg.norm(emb) for emb in trimmed_embeddings]
            attention_weights = np.array(norms) / sum(norms)
            
            weighted_sum = np.zeros(min_dim, dtype=np.float32)
            for embedding, weight in zip(trimmed_embeddings, attention_weights):
                weighted_sum += embedding * weight
            
            return weighted_sum
        
        else:
            # Default to average
            min_dim = min(len(emb) for emb in embeddings)
            stacked = np.stack([emb[:min_dim] for emb in embeddings])
            return np.mean(stacked, axis=0)

# Example usage and benchmarking
async def benchmark_embedding_models():
    """Benchmark different embedding models"""
    
    # Sample texts for testing
    texts = [
        "Machine learning is transforming the world of technology.",
        "Natural language processing enables computers to understand human language.",
        "Vector databases are essential for semantic search applications.",
        "Artificial intelligence is revolutionizing many industries.",
        "Deep learning models require large amounts of training data."
    ]
    
    # Initialize models
    models = {
        "OpenAI": OpenAIEmbeddingModel("text-embedding-3-small"),
        "HuggingFace": HuggingFaceEmbeddingModel("sentence-transformers/all-MiniLM-L6-v2")
    }
    
    # Benchmark each model
    results = {}
    
    for model_name, model in models.items():
        print(f"\nBenchmarking {model_name}...")
        
        start_time = time.time()
        
        # Test single embedding
        single_response = await model.embed_single(texts[0])
        single_time = time.time() - start_time
        
        # Test batch embedding
        start_time = time.time()
        batch_responses = await model.embed_batch(texts)
        batch_time = time.time() - start_time
        
        results[model_name] = {
            "model_name": model.model_name,
            "dimensions": model.dimensions,
            "max_sequence_length": model.max_sequence_length,
            "single_embedding_time": single_time,
            "batch_embedding_time": batch_time,
            "avg_batch_time_per_item": batch_time / len(texts),
            "embedding_sample": single_response.embedding[:5].tolist()  # First 5 dimensions
        }
    
    # Print results
    print("\nBenchmark Results:")
    print("-" * 80)
    
    for model_name, metrics in results.items():
        print(f"\n{model_name}:")
        for metric, value in metrics.items():
            if isinstance(value, float):
                print(f"  {metric}: {value:.4f}")
            elif isinstance(value, list):
                print(f"  {metric}: {value}")
            else:
                print(f"  {metric}: {value}")

if __name__ == "__main__":
    asyncio.run(benchmark_embedding_models())

Vector Database Implementations

# vector_database_implementations.py
import asyncio
import numpy as np
import faiss
import sqlite3
import json
import pickle
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import uuid
from pathlib import Path
import logging
from abc import ABC, abstractmethod

@dataclass
class VectorDocument:
    id: str
    content: str
    embedding: np.ndarray
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class SearchResult:
    document: VectorDocument
    score: float
    rank: int

@dataclass
class SearchOptions:
    top_k: int = 10
    score_threshold: Optional[float] = None
    metadata_filters: Dict[str, Any] = field(default_factory=dict)
    include_metadata: bool = True
    include_content: bool = True

class VectorDatabase(ABC):
    """Abstract base class for vector databases"""
    
    @abstractmethod
    async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
        """Add documents to the database"""
        pass
    
    @abstractmethod
    async def search(self, 
                    query_vector: np.ndarray, 
                    options: SearchOptions = None) -> List[SearchResult]:
        """Search for similar vectors"""
        pass
    
    @abstractmethod
    async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
        """Get document by ID"""
        pass
    
    @abstractmethod
    async def delete_document(self, doc_id: str) -> bool:
        """Delete document by ID"""
        pass
    
    @abstractmethod
    async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
        """Update existing document"""
        pass
    
    @abstractmethod
    def get_stats(self) -> Dict[str, Any]:
        """Get database statistics"""
        pass

class FAISSVectorDatabase(VectorDatabase):
    """FAISS-based vector database implementation"""
    
    def __init__(self, 
                 dimension: int,
                 index_type: str = "IVFFlat",
                 metric: str = "cosine",
                 db_path: str = "faiss_vector_db",
                 nlist: int = 100):
        
        self.dimension = dimension
        self.index_type = index_type
        self.metric = metric
        self.db_path = Path(db_path)
        self.nlist = nlist
        
        # Initialize FAISS index
        self.index = self._create_index()
        self.is_trained = False
        
        # Initialize metadata storage
        self.db_path.mkdir(exist_ok=True)
        self.metadata_db_path = self.db_path / "metadata.db"
        self._init_metadata_db()
        
        # Document tracking
        self.doc_id_to_faiss_id = {}
        self.faiss_id_to_doc_id = {}
        self.next_faiss_id = 0
        
        # Load existing index if available
        self._load_index()
        
        self.logger = logging.getLogger(__name__)
    
    def _create_index(self) -> faiss.Index:
        """Create FAISS index based on configuration"""
        
        if self.index_type == "Flat":
            if self.metric == "cosine":
                index = faiss.IndexFlatIP(self.dimension)
            else:
                index = faiss.IndexFlatL2(self.dimension)
        
        elif self.index_type == "IVFFlat":
            if self.metric == "cosine":
                quantizer = faiss.IndexFlatIP(self.dimension)
                index = faiss.IndexIVFFlat(quantizer, self.dimension, self.nlist)
            else:
                quantizer = faiss.IndexFlatL2(self.dimension)
                index = faiss.IndexIVFFlat(quantizer, self.dimension, self.nlist)
        
        elif self.index_type == "HNSW":
            index = faiss.IndexHNSWFlat(self.dimension, 32)
            index.hnsw.efConstruction = 40
            index.hnsw.efSearch = 16
        
        elif self.index_type == "IVFPQFlat":
            quantizer = faiss.IndexFlatL2(self.dimension)
            index = faiss.IndexIVFPQ(quantizer, self.dimension, self.nlist, 8, 8)
        
        else:
            raise ValueError(f"Unsupported index type: {self.index_type}")
        
        return index
    
    def _init_metadata_db(self):
        """Initialize SQLite database for metadata"""
        
        with sqlite3.connect(self.metadata_db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS documents (
                    doc_id TEXT PRIMARY KEY,
                    faiss_id INTEGER UNIQUE,
                    content TEXT,
                    metadata TEXT,
                    timestamp TEXT
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_faiss_id ON documents(faiss_id)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_timestamp ON documents(timestamp)
            """)
    
    def _load_index(self):
        """Load existing FAISS index and metadata"""
        
        index_path = self.db_path / "faiss.index"
        mapping_path = self.db_path / "id_mapping.pkl"
        
        if index_path.exists():
            try:
                self.index = faiss.read_index(str(index_path))
                self.is_trained = True
                
                if mapping_path.exists():
                    with open(mapping_path, 'rb') as f:
                        mapping_data = pickle.load(f)
                        self.doc_id_to_faiss_id = mapping_data['doc_to_faiss']
                        self.faiss_id_to_doc_id = mapping_data['faiss_to_doc']
                        self.next_faiss_id = mapping_data['next_id']
                
                self.logger.info(f"Loaded FAISS index with {self.index.ntotal} vectors")
                
            except Exception as e:
                self.logger.error(f"Failed to load FAISS index: {e}")
                self.index = self._create_index()
    
    def _save_index(self):
        """Save FAISS index and metadata"""
        
        try:
            index_path = self.db_path / "faiss.index"
            mapping_path = self.db_path / "id_mapping.pkl"
            
            # Save FAISS index
            faiss.write_index(self.index, str(index_path))
            
            # Save ID mappings
            mapping_data = {
                'doc_to_faiss': self.doc_id_to_faiss_id,
                'faiss_to_doc': self.faiss_id_to_doc_id,
                'next_id': self.next_faiss_id
            }
            
            with open(mapping_path, 'wb') as f:
                pickle.dump(mapping_data, f)
                
        except Exception as e:
            self.logger.error(f"Failed to save FAISS index: {e}")
    
    async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
        """Add documents to the database"""
        
        if not documents:
            return []
        
        # Prepare embeddings
        embeddings = np.vstack([doc.embedding for doc in documents])
        
        # Normalize for cosine similarity if needed
        if self.metric == "cosine":
            embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        
        # Train index if needed
        if not self.is_trained and self.index_type.startswith("IVF"):
            self.index.train(embeddings)
            self.is_trained = True
        
        # Add to FAISS index
        start_faiss_id = self.next_faiss_id
        self.index.add(embeddings)
        
        # Store metadata and update mappings
        added_doc_ids = []
        
        with sqlite3.connect(self.metadata_db_path) as conn:
            for i, doc in enumerate(documents):
                faiss_id = start_faiss_id + i
                
                # Update mappings
                self.doc_id_to_faiss_id[doc.id] = faiss_id
                self.faiss_id_to_doc_id[faiss_id] = doc.id
                
                # Store in SQLite
                conn.execute("""
                    INSERT OR REPLACE INTO documents 
                    (doc_id, faiss_id, content, metadata, timestamp)
                    VALUES (?, ?, ?, ?, ?)
                """, (
                    doc.id,
                    faiss_id,
                    doc.content,
                    json.dumps(doc.metadata),
                    doc.timestamp.isoformat()
                ))
                
                added_doc_ids.append(doc.id)
        
        self.next_faiss_id += len(documents)
        
        # Save index periodically
        if len(documents) > 100 or self.index.ntotal % 1000 == 0:
            self._save_index()
        
        return added_doc_ids
    
    async def search(self, 
                    query_vector: np.ndarray, 
                    options: SearchOptions = None) -> List[SearchResult]:
        """Search for similar vectors"""
        
        if options is None:
            options = SearchOptions()
        
        if self.index.ntotal == 0:
            return []
        
        # Normalize query vector for cosine similarity
        if self.metric == "cosine":
            query_vector = query_vector / np.linalg.norm(query_vector)
        
        # Search FAISS index
        scores, indices = self.index.search(
            query_vector.reshape(1, -1).astype(np.float32), 
            options.top_k
        )
        
        # Convert results
        results = []
        
        with sqlite3.connect(self.metadata_db_path) as conn:
            for rank, (score, faiss_id) in enumerate(zip(scores[0], indices[0])):
                if faiss_id == -1:  # FAISS returns -1 for invalid indices
                    continue
                
                # Apply score threshold
                if options.score_threshold and score < options.score_threshold:
                    continue
                
                # Get document metadata
                cursor = conn.execute("""
                    SELECT doc_id, content, metadata, timestamp 
                    FROM documents WHERE faiss_id = ?
                """, (int(faiss_id),))
                
                row = cursor.fetchone()
                if not row:
                    continue
                
                doc_id, content, metadata_json, timestamp = row
                metadata = json.loads(metadata_json)
                
                # Apply metadata filters
                if options.metadata_filters:
                    if not self._matches_filters(metadata, options.metadata_filters):
                        continue
                
                # Create document
                document = VectorDocument(
                    id=doc_id,
                    content=content if options.include_content else "",
                    embedding=np.array([]),  # Don't include embedding in results
                    metadata=metadata if options.include_metadata else {},
                    timestamp=datetime.fromisoformat(timestamp)
                )
                
                results.append(SearchResult(
                    document=document,
                    score=float(score),
                    rank=rank
                ))
        
        return results
    
    def _matches_filters(self, metadata: Dict[str, Any], 
                        filters: Dict[str, Any]) -> bool:
        """Check if metadata matches filters"""
        
        for key, expected_value in filters.items():
            if key not in metadata:
                return False
            
            if isinstance(expected_value, list):
                if metadata[key] not in expected_value:
                    return False
            else:
                if metadata[key] != expected_value:
                    return False
        
        return True
    
    async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
        """Get document by ID"""
        
        with sqlite3.connect(self.metadata_db_path) as conn:
            cursor = conn.execute("""
                SELECT content, metadata, timestamp, faiss_id 
                FROM documents WHERE doc_id = ?
            """, (doc_id,))
            
            row = cursor.fetchone()
            if not row:
                return None
            
            content, metadata_json, timestamp, faiss_id = row
            
            # Get embedding from FAISS
            embedding = np.array([])
            if faiss_id in self.faiss_id_to_doc_id:
                try:
                    # This is a simplified approach - in practice, you'd need to 
                    # store embeddings separately or reconstruct them
                    embedding = np.random.rand(self.dimension)  # Placeholder
                except:
                    pass
            
            return VectorDocument(
                id=doc_id,
                content=content,
                embedding=embedding,
                metadata=json.loads(metadata_json),
                timestamp=datetime.fromisoformat(timestamp)
            )
    
    async def delete_document(self, doc_id: str) -> bool:
        """Delete document by ID"""
        
        if doc_id not in self.doc_id_to_faiss_id:
            return False
        
        faiss_id = self.doc_id_to_faiss_id[doc_id]
        
        # Remove from mappings
        del self.doc_id_to_faiss_id[doc_id]
        del self.faiss_id_to_doc_id[faiss_id]
        
        # Remove from SQLite
        with sqlite3.connect(self.metadata_db_path) as conn:
            cursor = conn.execute("DELETE FROM documents WHERE doc_id = ?", (doc_id,))
            
        # Note: FAISS doesn't support deletion, so we mark it as deleted
        # In production, you'd rebuild the index periodically
        
        return cursor.rowcount > 0
    
    async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
        """Update existing document"""
        
        # For FAISS, update means delete and re-add
        if await self.delete_document(doc_id):
            document.id = doc_id  # Ensure ID is preserved
            await self.add_documents([document])
            return True
        
        return False
    
    def get_stats(self) -> Dict[str, Any]:
        """Get database statistics"""
        
        with sqlite3.connect(self.metadata_db_path) as conn:
            cursor = conn.execute("SELECT COUNT(*) FROM documents")
            doc_count = cursor.fetchone()[0]
            
            cursor = conn.execute("""
                SELECT COUNT(DISTINCT json_extract(metadata, '$.category')) 
                FROM documents 
                WHERE json_extract(metadata, '$.category') IS NOT NULL
            """)
            category_count = cursor.fetchone()[0]
        
        return {
            "total_documents": doc_count,
            "faiss_vectors": self.index.ntotal,
            "dimension": self.dimension,
            "index_type": self.index_type,
            "metric": self.metric,
            "is_trained": self.is_trained,
            "unique_categories": category_count,
            "index_size_mb": self.index.ntotal * self.dimension * 4 / (1024 * 1024)
        }
    
    def __del__(self):
        """Cleanup when object is destroyed"""
        self._save_index()

class PineconeVectorDatabase(VectorDatabase):
    """Pinecone vector database implementation"""
    
    def __init__(self, 
                 api_key: str,
                 environment: str,
                 index_name: str,
                 dimension: int,
                 metric: str = "cosine"):
        
        try:
            import pinecone
            from pinecone import Pinecone
            
            self.pc = Pinecone(api_key=api_key)
            self.index_name = index_name
            self.dimension = dimension
            self.metric = metric
            
            # Create or connect to index
            if index_name not in self.pc.list_indexes().names():
                self.pc.create_index(
                    name=index_name,
                    dimension=dimension,
                    metric=metric
                )
            
            self.index = self.pc.Index(index_name)
            
        except ImportError:
            raise ImportError("pinecone-client is required for PineconeVectorDatabase")
    
    async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
        """Add documents to Pinecone"""
        
        vectors = []
        for doc in documents:
            vectors.append({
                "id": doc.id,
                "values": doc.embedding.tolist(),
                "metadata": {
                    "content": doc.content,
                    "timestamp": doc.timestamp.isoformat(),
                    **doc.metadata
                }
            })
        
        # Upsert in batches
        batch_size = 100
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch)
        
        return [doc.id for doc in documents]
    
    async def search(self, 
                    query_vector: np.ndarray, 
                    options: SearchOptions = None) -> List[SearchResult]:
        """Search Pinecone index"""
        
        if options is None:
            options = SearchOptions()
        
        # Build filter
        filter_dict = {}
        if options.metadata_filters:
            filter_dict = options.metadata_filters
        
        # Search
        response = self.index.query(
            vector=query_vector.tolist(),
            top_k=options.top_k,
            filter=filter_dict if filter_dict else None,
            include_metadata=True
        )
        
        results = []
        for rank, match in enumerate(response.matches):
            if options.score_threshold and match.score < options.score_threshold:
                continue
            
            metadata = match.metadata or {}
            
            document = VectorDocument(
                id=match.id,
                content=metadata.get("content", "") if options.include_content else "",
                embedding=np.array([]),
                metadata={k: v for k, v in metadata.items() if k not in ["content", "timestamp"]} if options.include_metadata else {},
                timestamp=datetime.fromisoformat(metadata.get("timestamp", datetime.now().isoformat()))
            )
            
            results.append(SearchResult(
                document=document,
                score=match.score,
                rank=rank
            ))
        
        return results
    
    async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
        """Get document from Pinecone"""
        
        response = self.index.fetch(ids=[doc_id])
        
        if doc_id not in response.vectors:
            return None
        
        vector_data = response.vectors[doc_id]
        metadata = vector_data.metadata or {}
        
        return VectorDocument(
            id=doc_id,
            content=metadata.get("content", ""),
            embedding=np.array(vector_data.values),
            metadata={k: v for k, v in metadata.items() if k not in ["content", "timestamp"]},
            timestamp=datetime.fromisoformat(metadata.get("timestamp", datetime.now().isoformat()))
        )
    
    async def delete_document(self, doc_id: str) -> bool:
        """Delete document from Pinecone"""
        
        self.index.delete(ids=[doc_id])
        return True
    
    async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
        """Update document in Pinecone"""
        
        vector = {
            "id": doc_id,
            "values": document.embedding.tolist(),
            "metadata": {
                "content": document.content,
                "timestamp": document.timestamp.isoformat(),
                **document.metadata
            }
        }
        
        self.index.upsert(vectors=[vector])
        return True
    
    def get_stats(self) -> Dict[str, Any]:
        """Get Pinecone index stats"""
        
        stats = self.index.describe_index_stats()
        
        return {
            "total_documents": stats.total_vector_count,
            "dimension": self.dimension,
            "metric": self.metric,
            "index_name": self.index_name,
            "index_fullness": stats.index_fullness,
            "namespaces": len(stats.namespaces) if stats.namespaces else 0
        }

class ChromaVectorDatabase(VectorDatabase):
    """ChromaDB vector database implementation"""
    
    def __init__(self, 
                 collection_name: str = "default",
                 persist_directory: str = "./chroma_db",
                 embedding_function = None):
        
        try:
            import chromadb
            from chromadb.config import Settings
            
            self.client = chromadb.PersistentClient(
                path=persist_directory,
                settings=Settings(anonymized_telemetry=False)
            )
            
            self.collection = self.client.get_or_create_collection(
                name=collection_name,
                embedding_function=embedding_function
            )
            
        except ImportError:
            raise ImportError("chromadb is required for ChromaVectorDatabase")
    
    async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
        """Add documents to ChromaDB"""
        
        ids = [doc.id for doc in documents]
        embeddings = [doc.embedding.tolist() for doc in documents]
        documents_content = [doc.content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        
        self.collection.add(
            ids=ids,
            embeddings=embeddings,
            documents=documents_content,
            metadatas=metadatas
        )
        
        return ids
    
    async def search(self, 
                    query_vector: np.ndarray, 
                    options: SearchOptions = None) -> List[SearchResult]:
        """Search ChromaDB"""
        
        if options is None:
            options = SearchOptions()
        
        # Build where clause for metadata filtering
        where_clause = options.metadata_filters if options.metadata_filters else None
        
        response = self.collection.query(
            query_embeddings=[query_vector.tolist()],
            n_results=options.top_k,
            where=where_clause,
            include=["documents", "metadatas", "distances"]
        )
        
        results = []
        
        if response['ids']:
            for rank, (doc_id, distance, content, metadata) in enumerate(zip(
                response['ids'][0],
                response['distances'][0],
                response['documents'][0],
                response['metadatas'][0]
            )):
                # Convert distance to similarity score (ChromaDB returns distances)
                score = 1.0 / (1.0 + distance)
                
                if options.score_threshold and score < options.score_threshold:
                    continue
                
                document = VectorDocument(
                    id=doc_id,
                    content=content if options.include_content else "",
                    embedding=np.array([]),
                    metadata=metadata if options.include_metadata else {},
                    timestamp=datetime.now()  # ChromaDB doesn't store timestamps by default
                )
                
                results.append(SearchResult(
                    document=document,
                    score=score,
                    rank=rank
                ))
        
        return results
    
    async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
        """Get document from ChromaDB"""
        
        response = self.collection.get(
            ids=[doc_id],
            include=["documents", "metadatas", "embeddings"]
        )
        
        if not response['ids']:
            return None
        
        return VectorDocument(
            id=response['ids'][0],
            content=response['documents'][0],
            embedding=np.array(response['embeddings'][0]),
            metadata=response['metadatas'][0],
            timestamp=datetime.now()
        )
    
    async def delete_document(self, doc_id: str) -> bool:
        """Delete document from ChromaDB"""
        
        self.collection.delete(ids=[doc_id])
        return True
    
    async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
        """Update document in ChromaDB"""
        
        self.collection.update(
            ids=[doc_id],
            embeddings=[document.embedding.tolist()],
            documents=[document.content],
            metadatas=[document.metadata]
        )
        return True
    
    def get_stats(self) -> Dict[str, Any]:
        """Get ChromaDB collection stats"""
        
        count = self.collection.count()
        
        return {
            "total_documents": count,
            "collection_name": self.collection.name
        }

# Vector database factory
class VectorDatabaseFactory:
    """Factory for creating vector database instances"""
    
    @staticmethod
    def create_database(db_type: str, **kwargs) -> VectorDatabase:
        """Create vector database instance"""
        
        if db_type.lower() == "faiss":
            return FAISSVectorDatabase(**kwargs)
        elif db_type.lower() == "pinecone":
            return PineconeVectorDatabase(**kwargs)
        elif db_type.lower() == "chroma":
            return ChromaVectorDatabase(**kwargs)
        else:
            raise ValueError(f"Unsupported database type: {db_type}")

# Example usage and testing
async def test_vector_databases():
    """Test different vector database implementations"""
    
    # Create sample documents
    np.random.seed(42)
    documents = []
    
    for i in range(100):
        doc = VectorDocument(
            id=f"doc_{i}",
            content=f"This is document {i} with some sample content.",
            embedding=np.random.rand(384).astype(np.float32),
            metadata={"category": f"cat_{i % 5}", "importance": i % 3}
        )
        documents.append(doc)
    
    # Test FAISS database
    print("Testing FAISS Vector Database...")
    
    faiss_db = VectorDatabaseFactory.create_database(
        "faiss",
        dimension=384,
        index_type="Flat",
        db_path="./test_faiss_db"
    )
    
    # Add documents
    added_ids = await faiss_db.add_documents(documents[:50])
    print(f"Added {len(added_ids)} documents to FAISS")
    
    # Search
    query_vector = np.random.rand(384).astype(np.float32)
    search_options = SearchOptions(top_k=5, metadata_filters={"category": "cat_1"})
    results = await faiss_db.search(query_vector, search_options)
    
    print(f"Found {len(results)} results")
    for result in results:
        print(f"  Doc: {result.document.id}, Score: {result.score:.3f}")
    
    # Get stats
    stats = faiss_db.get_stats()
    print(f"Database stats: {stats}")

if __name__ == "__main__":
    asyncio.run(test_vector_databases())

Production Optimization and Monitoring

# vector_db_optimization.py
import asyncio
import time
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
from collections import defaultdict, deque
import threading
from concurrent.futures import ThreadPoolExecutor
import psutil

@dataclass
class PerformanceMetrics:
    operation: str
    duration: float
    vectors_processed: int
    memory_usage_mb: float
    timestamp: datetime
    success: bool
    error_message: Optional[str] = None

class VectorDatabaseOptimizer:
    """Optimization and monitoring for vector databases"""
    
    def __init__(self, vector_db: VectorDatabase):
        self.vector_db = vector_db
        self.metrics_history = deque(maxlen=10000)
        self.performance_cache = {}
        self.optimization_suggestions = []
        
        # Performance tracking
        self.search_times = deque(maxlen=1000)
        self.insert_times = deque(maxlen=1000)
        self.memory_usage = deque(maxlen=1000)
        
        # Threading for background optimization
        self.executor = ThreadPoolExecutor(max_workers=2)
        self.optimization_running = False
        
        self.logger = logging.getLogger(__name__)
    
    async def monitor_performance(self, operation_name: str):
        """Context manager for monitoring operation performance"""
        
        return PerformanceMonitor(self, operation_name)
    
    def record_metrics(self, metrics: PerformanceMetrics):
        """Record performance metrics"""
        
        self.metrics_history.append(metrics)
        
        # Update specific metric queues
        if metrics.operation == "search":
            self.search_times.append(metrics.duration)
        elif metrics.operation == "insert":
            self.insert_times.append(metrics.duration)
        
        self.memory_usage.append(metrics.memory_usage_mb)
        
        # Check for performance issues
        self._check_performance_issues(metrics)
    
    def _check_performance_issues(self, metrics: PerformanceMetrics):
        """Check for performance issues and generate suggestions"""
        
        # Slow search detection
        if metrics.operation == "search" and metrics.duration > 1.0:
            self.optimization_suggestions.append({
                "type": "slow_search",
                "message": f"Search took {metrics.duration:.2f}s, consider index optimization",
                "timestamp": metrics.timestamp,
                "severity": "medium"
            })
        
        # High memory usage
        if metrics.memory_usage_mb > 1000:  # 1GB threshold
            self.optimization_suggestions.append({
                "type": "high_memory",
                "message": f"Memory usage at {metrics.memory_usage_mb:.0f}MB, consider optimization",
                "timestamp": metrics.timestamp,
                "severity": "high"
            })
        
        # Keep only recent suggestions
        cutoff = datetime.now() - timedelta(hours=1)
        self.optimization_suggestions = [
            s for s in self.optimization_suggestions 
            if s["timestamp"] > cutoff
        ]
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get comprehensive performance summary"""
        
        # Calculate averages
        avg_search_time = np.mean(self.search_times) if self.search_times else 0
        avg_insert_time = np.mean(self.insert_times) if self.insert_times else 0
        avg_memory = np.mean(self.memory_usage) if self.memory_usage else 0
        
        # Calculate percentiles
        search_p95 = np.percentile(self.search_times, 95) if self.search_times else 0
        search_p99 = np.percentile(self.search_times, 99) if self.search_times else 0
        
        # Error rate
        recent_metrics = [m for m in self.metrics_history if m.timestamp > datetime.now() - timedelta(hours=1)]
        error_rate = sum(1 for m in recent_metrics if not m.success) / len(recent_metrics) if recent_metrics else 0
        
        return {
            "search_performance": {
                "avg_time_ms": avg_search_time * 1000,
                "p95_time_ms": search_p95 * 1000,
                "p99_time_ms": search_p99 * 1000,
                "total_searches": len(self.search_times)
            },
            "insert_performance": {
                "avg_time_ms": avg_insert_time * 1000,
                "total_inserts": len(self.insert_times)
            },
            "memory_usage": {
                "avg_mb": avg_memory,
                "current_mb": self.memory_usage[-1] if self.memory_usage else 0
            },
            "error_rate": error_rate,
            "optimization_suggestions": len(self.optimization_suggestions),
            "database_stats": self.vector_db.get_stats()
        }
    
    async def optimize_search_parameters(self, 
                                       test_queries: List[np.ndarray],
                                       optimization_target: str = "latency") -> Dict[str, Any]:
        """Optimize search parameters for better performance"""
        
        if not hasattr(self.vector_db, 'index') or not hasattr(self.vector_db.index, 'nprobe'):
            return {"message": "Index doesn't support parameter optimization"}
        
        self.logger.info("Starting search parameter optimization...")
        
        # Test different nprobe values for IVF indices
        best_params = {"nprobe": 1}
        best_score = float('inf')
        
        nprobe_values = [1, 2, 4, 8, 16, 32, 64, 128]
        
        for nprobe in nprobe_values:
            if hasattr(self.vector_db.index, 'nprobe'):
                self.vector_db.index.nprobe = nprobe
            
            # Run test searches
            total_time = 0
            total_results = 0
            
            for query in test_queries[:10]:  # Test with subset
                start_time = time.time()
                
                try:
                    results = await self.vector_db.search(query)
                    search_time = time.time() - start_time
                    
                    total_time += search_time
                    total_results += len(results)
                    
                except Exception as e:
                    self.logger.error(f"Search failed with nprobe={nprobe}: {e}")
                    continue
            
            if total_results > 0:
                if optimization_target == "latency":
                    score = total_time / len(test_queries[:10])
                elif optimization_target == "throughput":
                    score = total_time  # Lower is better for throughput too
                else:
                    score = total_time
                
                if score < best_score:
                    best_score = score
                    best_params["nprobe"] = nprobe
                
                self.logger.info(f"nprobe={nprobe}: avg_time={score:.3f}s")
        
        # Apply best parameters
        if hasattr(self.vector_db.index, 'nprobe'):
            self.vector_db.index.nprobe = best_params["nprobe"]
        
        self.logger.info(f"Optimization complete. Best parameters: {best_params}")
        
        return {
            "best_parameters": best_params,
            "best_score": best_score,
            "optimization_target": optimization_target
        }
    
    async def analyze_embedding_distribution(self, 
                                          sample_size: int = 1000) -> Dict[str, Any]:
        """Analyze embedding distribution for optimization insights"""
        
        # This would require access to embeddings, which depends on the DB implementation
        # For FAISS, we could sample from the index
        
        analysis = {
            "sample_size": sample_size,
            "analysis_timestamp": datetime.now().isoformat()
        }
        
        if hasattr(self.vector_db, 'index') and hasattr(self.vector_db.index, 'reconstruct'):
            try:
                # Sample embeddings from FAISS index
                embeddings = []
                total_vectors = self.vector_db.index.ntotal
                
                if total_vectors > 0:
                    sample_indices = np.random.choice(
                        total_vectors, 
                        min(sample_size, total_vectors), 
                        replace=False
                    )
                    
                    for idx in sample_indices:
                        try:
                            embedding = self.vector_db.index.reconstruct(int(idx))
                            embeddings.append(embedding)
                        except:
                            continue
                    
                    if embeddings:
                        embeddings_matrix = np.vstack(embeddings)
                        
                        analysis.update({
                            "mean_norm": float(np.mean(np.linalg.norm(embeddings_matrix, axis=1))),
                            "std_norm": float(np.std(np.linalg.norm(embeddings_matrix, axis=1))),
                            "dimension": embeddings_matrix.shape[1],
                            "sparsity": float(np.mean(embeddings_matrix == 0)),
                            "dynamic_range": {
                                "min": float(np.min(embeddings_matrix)),
                                "max": float(np.max(embeddings_matrix)),
                                "mean": float(np.mean(embeddings_matrix)),
                                "std": float(np.std(embeddings_matrix))
                            }
                        })
                        
                        # Check for normalization
                        norms = np.linalg.norm(embeddings_matrix, axis=1)
                        is_normalized = np.allclose(norms, 1.0, rtol=1e-3)
                        analysis["is_normalized"] = is_normalized
                        
                        # Clustering analysis
                        if len(embeddings) > 10:
                            from sklearn.cluster import KMeans
                            
                            try:
                                kmeans = KMeans(n_clusters=min(10, len(embeddings) // 2), random_state=42)
                                cluster_labels = kmeans.fit_predict(embeddings_matrix)
                                
                                analysis["clustering"] = {
                                    "n_clusters": len(np.unique(cluster_labels)),
                                    "silhouette_score": self._calculate_silhouette_score(embeddings_matrix, cluster_labels)
                                }
                            except:
                                pass
            
            except Exception as e:
                analysis["error"] = f"Failed to analyze embeddings: {str(e)}"
        
        return analysis
    
    def _calculate_silhouette_score(self, embeddings: np.ndarray, labels: np.ndarray) -> float:
        """Calculate silhouette score for clustering"""
        
        try:
            from sklearn.metrics import silhouette_score
            return float(silhouette_score(embeddings, labels))
        except:
            return 0.0
    
    async def suggest_optimizations(self) -> List[Dict[str, Any]]:
        """Generate optimization suggestions based on performance analysis"""
        
        suggestions = []
        stats = self.vector_db.get_stats()
        performance = self.get_performance_summary()
        
        # Index type suggestions
        if stats.get("index_type") == "Flat" and stats.get("total_documents", 0) > 10000:
            suggestions.append({
                "type": "index_optimization",
                "priority": "high",
                "message": "Consider using IVFFlat index for better search performance with large datasets",
                "action": "Change index_type from Flat to IVFFlat"
            })
        
        # Memory optimization
        if performance["memory_usage"]["avg_mb"] > 2000:
            suggestions.append({
                "type": "memory_optimization",
                "priority": "medium",
                "message": "High memory usage detected. Consider using quantization or dimension reduction",
                "action": "Implement PQ quantization or reduce embedding dimensions"
            })
        
        # Search performance
        if performance["search_performance"]["avg_time_ms"] > 100:
            suggestions.append({
                "type": "search_optimization",
                "priority": "medium",
                "message": "Search latency is high. Consider index tuning or caching",
                "action": "Optimize search parameters or implement result caching"
            })
        
        # Error rate
        if performance["error_rate"] > 0.01:  # 1% error rate
            suggestions.append({
                "type": "reliability",
                "priority": "high",
                "message": f"High error rate detected: {performance['error_rate']:.2%}",
                "action": "Investigate error patterns and implement better error handling"
            })
        
        return suggestions
    
    async def run_performance_benchmark(self, 
                                      num_search_queries: int = 100,
                                      num_insert_docs: int = 1000) -> Dict[str, Any]:
        """Run comprehensive performance benchmark"""
        
        self.logger.info("Starting performance benchmark...")
        
        benchmark_results = {
            "benchmark_timestamp": datetime.now().isoformat(),
            "search_benchmark": {},
            "insert_benchmark": {},
            "system_info": {
                "cpu_count": psutil.cpu_count(),
                "memory_total_gb": psutil.virtual_memory().total / (1024**3),
                "memory_available_gb": psutil.virtual_memory().available / (1024**3)
            }
        }
        
        # Search benchmark
        if hasattr(self.vector_db, 'dimension'):
            dimension = self.vector_db.dimension
        else:
            dimension = 384  # Default
        
        search_queries = [np.random.rand(dimension).astype(np.float32) for _ in range(num_search_queries)]
        
        search_times = []
        for query in search_queries:
            async with self.monitor_performance("benchmark_search") as monitor:
                try:
                    results = await self.vector_db.search(query)
                    search_times.append(monitor.duration)
                except Exception as e:
                    self.logger.error(f"Search benchmark error: {e}")
        
        if search_times:
            benchmark_results["search_benchmark"] = {
                "avg_latency_ms": np.mean(search_times) * 1000,
                "p50_latency_ms": np.percentile(search_times, 50) * 1000,
                "p95_latency_ms": np.percentile(search_times, 95) * 1000,
                "p99_latency_ms": np.percentile(search_times, 99) * 1000,
                "min_latency_ms": np.min(search_times) * 1000,
                "max_latency_ms": np.max(search_times) * 1000,
                "queries_per_second": len(search_times) / sum(search_times) if sum(search_times) > 0 else 0
            }
        
        # Insert benchmark
        insert_docs = []
        for i in range(num_insert_docs):
            doc = VectorDocument(
                id=f"benchmark_doc_{i}",
                content=f"Benchmark document {i}",
                embedding=np.random.rand(dimension).astype(np.float32),
                metadata={"benchmark": True, "batch": i // 100}
            )
            insert_docs.append(doc)
        
        # Insert in batches
        batch_size = 100
        insert_times = []
        
        for i in range(0, len(insert_docs), batch_size):
            batch = insert_docs[i:i + batch_size]
            
            async with self.monitor_performance("benchmark_insert") as monitor:
                try:
                    await self.vector_db.add_documents(batch)
                    insert_times.append(monitor.duration)
                except Exception as e:
                    self.logger.error(f"Insert benchmark error: {e}")
        
        if insert_times:
            total_docs = len(insert_docs)
            total_time = sum(insert_times)
            
            benchmark_results["insert_benchmark"] = {
                "avg_batch_time_ms": np.mean(insert_times) * 1000,
                "total_time_s": total_time,
                "docs_per_second": total_docs / total_time if total_time > 0 else 0,
                "avg_time_per_doc_ms": (total_time / total_docs) * 1000 if total_docs > 0 else 0
            }
        
        # Cleanup benchmark documents
        try:
            for doc in insert_docs:
                await self.vector_db.delete_document(doc.id)
        except:
            pass  # Best effort cleanup
        
        self.logger.info("Performance benchmark completed")
        return benchmark_results

class PerformanceMonitor:
    """Context manager for monitoring operation performance"""
    
    def __init__(self, optimizer: VectorDatabaseOptimizer, operation_name: str):
        self.optimizer = optimizer
        self.operation_name = operation_name
        self.start_time = None
        self.start_memory = None
        self.duration = 0
    
    async def __aenter__(self):
        self.start_time = time.time()
        self.start_memory = psutil.Process().memory_info().rss / (1024 * 1024)  # MB
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.duration = time.time() - self.start_time
        end_memory = psutil.Process().memory_info().rss / (1024 * 1024)  # MB
        
        metrics = PerformanceMetrics(
            operation=self.operation_name,
            duration=self.duration,
            vectors_processed=1,  # Default, can be overridden
            memory_usage_mb=end_memory,
            timestamp=datetime.now(),
            success=exc_type is None,
            error_message=str(exc_val) if exc_val else None
        )
        
        self.optimizer.record_metrics(metrics)

# Example usage
async def demonstrate_optimization():
    """Demonstrate vector database optimization"""
    
    # Create sample database
    from vector_database_implementations import VectorDatabaseFactory
    
    db = VectorDatabaseFactory.create_database(
        "faiss",
        dimension=384,
        index_type="IVFFlat",
        db_path="./optimization_test_db"
    )
    
    optimizer = VectorDatabaseOptimizer(db)
    
    # Add sample data
    documents = []
    for i in range(1000):
        doc = VectorDocument(
            id=f"doc_{i}",
            content=f"Sample document {i}",
            embedding=np.random.rand(384).astype(np.float32),
            metadata={"category": f"cat_{i % 5}"}
        )
        documents.append(doc)
    
    # Monitor insertion
    async with optimizer.monitor_performance("bulk_insert"):
        await db.add_documents(documents)
    
    # Run benchmark
    benchmark_results = await optimizer.run_performance_benchmark(
        num_search_queries=50,
        num_insert_docs=100
    )
    
    print("Benchmark Results:")
    print(json.dumps(benchmark_results, indent=2, default=str))
    
    # Get optimization suggestions
    suggestions = await optimizer.suggest_optimizations()
    print(f"\nOptimization Suggestions:")
    for suggestion in suggestions:
        print(f"- {suggestion['message']} (Priority: {suggestion['priority']})")
    
    # Get performance summary
    summary = optimizer.get_performance_summary()
    print(f"\nPerformance Summary:")
    print(json.dumps(summary, indent=2, default=str))

if __name__ == "__main__":
    asyncio.run(demonstrate_optimization())

Production Best Practices

Scaling Strategies

  1. Horizontal Partitioning: Distribute vectors across multiple indices based on metadata
  2. Index Optimization: Choose appropriate index types based on data characteristics
  3. Batch Operations: Use batch inserts and searches for better throughput
  4. Asynchronous Processing: Implement async operations for better concurrency

Monitoring and Alerting

  1. Performance Metrics: Track search latency, throughput, and resource usage
  2. Quality Metrics: Monitor embedding quality and search relevance
  3. System Health: Monitor memory usage, disk space, and error rates
  4. Automated Alerts: Set up alerts for performance degradation

Security and Privacy

  1. Access Control: Implement proper authentication and authorization
  2. Data Encryption: Encrypt embeddings at rest and in transit
  3. Privacy Compliance: Ensure compliance with data protection regulations
  4. Audit Logging: Maintain comprehensive audit logs

Conclusion

Vector databases and embeddings are fundamental to modern AI applications, enabling semantic search, recommendation systems, and intelligent information retrieval. The implementations and optimization strategies covered in this guide provide a solid foundation for building production-ready vector database systems.

Key takeaways:

  1. Choose the Right Database: Select vector database based on scale, performance, and feature requirements
  2. Optimize for Your Use Case: Tune indices and parameters based on your specific data and query patterns
  3. Monitor Performance: Implement comprehensive monitoring and optimization
  4. Plan for Scale: Design with horizontal scaling and performance optimization in mind

As vector database technology continues to evolve, staying current with new developments and optimization techniques will be crucial for maintaining high-performance AI applications.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles