Master vector databases and embeddings for AI applications including similarity search, semantic retrieval, and production-ready optimization strategies.
Vector databases and embeddings form the backbone of modern AI applications, enabling semantic search, recommendation systems, RAG applications, and similarity matching at scale. Unlike traditional databases that store and query exact matches, vector databases work with high-dimensional embeddings that capture semantic meaning, allowing for nuanced similarity searches and intelligent information retrieval.
Understanding Vector Embeddings
Vector embeddings are numerical representations of data that capture semantic relationships in high-dimensional space. Points that are close together in this space represent similar concepts, enabling machines to understand context and meaning rather than just exact matches.
Embedding Fundamentals
# embedding_fundamentals.py
import numpy as np
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass
from enum import Enum
import math
from abc import ABC, abstractmethod
class EmbeddingType(Enum):
TEXT = "text"
IMAGE = "image"
AUDIO = "audio"
MULTIMODAL = "multimodal"
@dataclass
class EmbeddingMetadata:
embedding_type: EmbeddingType
dimension: int
model_name: str
creation_timestamp: float
normalization: str = "l2" # l2, l1, none
distance_metric: str = "cosine" # cosine, euclidean, dot_product
class VectorOperations:
"""Core vector operations for embeddings"""
@staticmethod
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate cosine similarity between two vectors"""
# Handle zero vectors
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return np.dot(vec1, vec2) / (norm1 * norm2)
@staticmethod
def euclidean_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate Euclidean distance between two vectors"""
return np.linalg.norm(vec1 - vec2)
@staticmethod
def manhattan_distance(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate Manhattan (L1) distance between two vectors"""
return np.sum(np.abs(vec1 - vec2))
@staticmethod
def dot_product(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Calculate dot product between two vectors"""
return np.dot(vec1, vec2)
@staticmethod
def normalize_vector(vector: np.ndarray, method: str = "l2") -> np.ndarray:
"""Normalize vector using specified method"""
if method == "l2":
norm = np.linalg.norm(vector)
return vector / norm if norm > 0 else vector
elif method == "l1":
norm = np.sum(np.abs(vector))
return vector / norm if norm > 0 else vector
elif method == "max":
max_val = np.max(np.abs(vector))
return vector / max_val if max_val > 0 else vector
else:
return vector
@staticmethod
def calculate_centroid(vectors: List[np.ndarray]) -> np.ndarray:
"""Calculate centroid of multiple vectors"""
if not vectors:
raise ValueError("Cannot calculate centroid of empty vector list")
return np.mean(vectors, axis=0)
@staticmethod
def find_outliers(vectors: List[np.ndarray],
threshold: float = 2.0) -> List[int]:
"""Find outlier vectors using distance from centroid"""
centroid = VectorOperations.calculate_centroid(vectors)
distances = [
VectorOperations.euclidean_distance(vec, centroid)
for vec in vectors
]
mean_distance = np.mean(distances)
std_distance = np.std(distances)
outliers = []
for i, distance in enumerate(distances):
if abs(distance - mean_distance) > threshold * std_distance:
outliers.append(i)
return outliers
@staticmethod
def reduce_dimensionality_pca(vectors: List[np.ndarray],
target_dim: int) -> Tuple[List[np.ndarray], np.ndarray]:
"""Reduce dimensionality using PCA"""
# Convert to matrix
X = np.vstack(vectors)
# Center the data
X_centered = X - np.mean(X, axis=0)
# Compute covariance matrix
cov_matrix = np.cov(X_centered.T)
# Compute eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# Sort by eigenvalues (descending)
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# Select top components
components = eigenvectors[:, :target_dim]
# Transform data
X_reduced = X_centered @ components
reduced_vectors = [X_reduced[i] for i in range(len(vectors))]
return reduced_vectors, components
class EmbeddingQuality:
"""Tools for assessing embedding quality"""
@staticmethod
def calculate_intrinsic_dimension(vectors: List[np.ndarray],
sample_size: int = 1000) -> float:
"""Estimate intrinsic dimensionality using correlation dimension"""
if len(vectors) < sample_size:
sample_vectors = vectors
else:
indices = np.random.choice(len(vectors), sample_size, replace=False)
sample_vectors = [vectors[i] for i in indices]
# Calculate pairwise distances
distances = []
for i in range(len(sample_vectors)):
for j in range(i + 1, len(sample_vectors)):
dist = VectorOperations.euclidean_distance(
sample_vectors[i], sample_vectors[j]
)
distances.append(dist)
distances = np.array(distances)
# Use correlation dimension estimation
# This is a simplified version of the Grassberger-Procaccia algorithm
radii = np.logspace(np.log10(np.min(distances)),
np.log10(np.max(distances)), 20)
correlations = []
for r in radii:
correlation = np.sum(distances < r) / len(distances)
correlations.append(correlation)
# Fit line to log-log plot
log_radii = np.log(radii[1:]) # Skip first point to avoid log(0)
log_correlations = np.log(np.array(correlations[1:]) + 1e-10)
# Remove infinite values
valid_mask = np.isfinite(log_radii) & np.isfinite(log_correlations)
if np.sum(valid_mask) < 2:
return float(len(sample_vectors[0])) # Fall back to embedding dimension
# Linear regression
coeffs = np.polyfit(log_radii[valid_mask], log_correlations[valid_mask], 1)
return abs(coeffs[0]) # Slope gives dimension estimate
@staticmethod
def calculate_embedding_coherence(vectors: List[np.ndarray],
labels: List[str] = None) -> float:
"""Calculate coherence of embeddings (how well similar items cluster)"""
if labels is None:
# Without labels, use distance-based coherence
return EmbeddingQuality._distance_based_coherence(vectors)
# With labels, calculate label-based coherence
return EmbeddingQuality._label_based_coherence(vectors, labels)
@staticmethod
def _distance_based_coherence(vectors: List[np.ndarray]) -> float:
"""Calculate coherence based on distance distribution"""
# Calculate all pairwise distances
n = len(vectors)
distances = []
for i in range(n):
for j in range(i + 1, n):
dist = VectorOperations.cosine_similarity(vectors[i], vectors[j])
distances.append(dist)
# Higher coherence means less variation in distances
return 1.0 - np.std(distances)
@staticmethod
def _label_based_coherence(vectors: List[np.ndarray],
labels: List[str]) -> float:
"""Calculate coherence based on label clustering"""
# Group vectors by label
label_groups = {}
for i, label in enumerate(labels):
if label not in label_groups:
label_groups[label] = []
label_groups[label].append(vectors[i])
# Calculate intra-cluster similarity
intra_similarities = []
for label, group_vectors in label_groups.items():
if len(group_vectors) < 2:
continue
group_similarities = []
for i in range(len(group_vectors)):
for j in range(i + 1, len(group_vectors)):
sim = VectorOperations.cosine_similarity(
group_vectors[i], group_vectors[j]
)
group_similarities.append(sim)
if group_similarities:
intra_similarities.extend(group_similarities)
# Calculate inter-cluster similarity
inter_similarities = []
label_list = list(label_groups.keys())
for i in range(len(label_list)):
for j in range(i + 1, len(label_list)):
label1, label2 = label_list[i], label_list[j]
for vec1 in label_groups[label1]:
for vec2 in label_groups[label2]:
sim = VectorOperations.cosine_similarity(vec1, vec2)
inter_similarities.append(sim)
# Good coherence: high intra-cluster, low inter-cluster similarity
avg_intra = np.mean(intra_similarities) if intra_similarities else 0
avg_inter = np.mean(inter_similarities) if inter_similarities else 0
# Coherence score
return avg_intra - avg_inter
@staticmethod
def evaluate_embedding_space(vectors: List[np.ndarray],
labels: List[str] = None) -> Dict[str, float]:
"""Comprehensive evaluation of embedding space quality"""
if not vectors:
return {"error": "No vectors provided"}
results = {}
# Basic statistics
dimensions = len(vectors[0])
results["embedding_dimension"] = dimensions
results["num_vectors"] = len(vectors)
# Intrinsic dimensionality
try:
results["intrinsic_dimension"] = EmbeddingQuality.calculate_intrinsic_dimension(vectors)
results["dimension_efficiency"] = results["intrinsic_dimension"] / dimensions
except:
results["intrinsic_dimension"] = None
results["dimension_efficiency"] = None
# Coherence
results["coherence"] = EmbeddingQuality.calculate_embedding_coherence(vectors, labels)
# Density analysis
centroid = VectorOperations.calculate_centroid(vectors)
distances_from_center = [
VectorOperations.euclidean_distance(vec, centroid)
for vec in vectors
]
results["mean_distance_from_center"] = np.mean(distances_from_center)
results["std_distance_from_center"] = np.std(distances_from_center)
results["density_uniformity"] = 1.0 / (1.0 + np.std(distances_from_center))
# Outlier detection
outliers = VectorOperations.find_outliers(vectors)
results["outlier_count"] = len(outliers)
results["outlier_percentage"] = len(outliers) / len(vectors) * 100
return results
# Example usage and testing
def demonstrate_vector_operations():
"""Demonstrate vector operations and quality assessment"""
# Create sample embeddings
np.random.seed(42)
# Create three clusters of vectors
cluster1 = [np.random.normal([1, 1, 1], 0.1, 3) for _ in range(50)]
cluster2 = [np.random.normal([-1, -1, 1], 0.1, 3) for _ in range(50)]
cluster3 = [np.random.normal([0, 1, -1], 0.1, 3) for _ in range(50)]
all_vectors = cluster1 + cluster2 + cluster3
labels = ['A'] * 50 + ['B'] * 50 + ['C'] * 50
# Normalize vectors
normalized_vectors = [
VectorOperations.normalize_vector(vec) for vec in all_vectors
]
# Evaluate embedding quality
quality_metrics = EmbeddingQuality.evaluate_embedding_space(
normalized_vectors, labels
)
print("Embedding Quality Metrics:")
for metric, value in quality_metrics.items():
print(f" {metric}: {value}")
# Test similarity calculations
vec1, vec2 = normalized_vectors[0], normalized_vectors[1] # Same cluster
vec3 = normalized_vectors[50] # Different cluster
print(f"\nSimilarity within cluster: {VectorOperations.cosine_similarity(vec1, vec2):.3f}")
print(f"Similarity across clusters: {VectorOperations.cosine_similarity(vec1, vec3):.3f}")
# Test dimensionality reduction
reduced_vectors, components = VectorOperations.reduce_dimensionality_pca(
normalized_vectors, target_dim=2
)
print(f"\nReduced from {len(normalized_vectors[0])} to {len(reduced_vectors[0])} dimensions")
print(f"PCA components shape: {components.shape}")
if __name__ == "__main__":
demonstrate_vector_operations()
Advanced Embedding Models
# advanced_embeddings.py
import asyncio
import aiohttp
import numpy as np
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from abc import ABC, abstractmethod
import torch
from transformers import AutoTokenizer, AutoModel
import openai
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@dataclass
class EmbeddingRequest:
text: str
metadata: Dict[str, Any] = None
model_params: Dict[str, Any] = None
@dataclass
class EmbeddingResponse:
embedding: np.ndarray
model_name: str
dimensions: int
metadata: Dict[str, Any] = None
processing_time: float = 0.0
class EmbeddingModel(ABC):
"""Abstract base class for embedding models"""
@abstractmethod
async def embed_single(self, text: str) -> EmbeddingResponse:
pass
@abstractmethod
async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
pass
@property
@abstractmethod
def model_name(self) -> str:
pass
@property
@abstractmethod
def dimensions(self) -> int:
pass
@property
@abstractmethod
def max_sequence_length(self) -> int:
pass
class OpenAIEmbeddingModel(EmbeddingModel):
"""OpenAI embedding model implementation"""
def __init__(self,
model: str = "text-embedding-3-large",
api_key: str = None,
dimensions: Optional[int] = None):
self.client = openai.AsyncOpenAI(api_key=api_key)
self._model_name = model
self._dimensions = dimensions
# Model specifications
self.model_specs = {
"text-embedding-3-large": {"max_dim": 3072, "max_tokens": 8191},
"text-embedding-3-small": {"max_dim": 1536, "max_tokens": 8191},
"text-embedding-ada-002": {"max_dim": 1536, "max_tokens": 8191}
}
@property
def model_name(self) -> str:
return self._model_name
@property
def dimensions(self) -> int:
if self._dimensions:
return self._dimensions
return self.model_specs.get(self._model_name, {}).get("max_dim", 1536)
@property
def max_sequence_length(self) -> int:
return self.model_specs.get(self._model_name, {}).get("max_tokens", 8191)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def embed_single(self, text: str) -> EmbeddingResponse:
"""Embed single text"""
start_time = time.time()
params = {"model": self._model_name, "input": text}
if self._dimensions:
params["dimensions"] = self._dimensions
response = await self.client.embeddings.create(**params)
processing_time = time.time() - start_time
return EmbeddingResponse(
embedding=np.array(response.data[0].embedding, dtype=np.float32),
model_name=self._model_name,
dimensions=len(response.data[0].embedding),
processing_time=processing_time,
metadata={
"usage": response.usage.total_tokens,
"model": response.model
}
)
async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
"""Embed multiple texts in batch"""
start_time = time.time()
params = {"model": self._model_name, "input": texts}
if self._dimensions:
params["dimensions"] = self._dimensions
response = await self.client.embeddings.create(**params)
processing_time = time.time() - start_time
results = []
for i, data in enumerate(response.data):
results.append(EmbeddingResponse(
embedding=np.array(data.embedding, dtype=np.float32),
model_name=self._model_name,
dimensions=len(data.embedding),
processing_time=processing_time / len(texts),
metadata={
"usage": response.usage.total_tokens / len(texts),
"model": response.model,
"batch_index": i
}
))
return results
class HuggingFaceEmbeddingModel(EmbeddingModel):
"""HuggingFace transformer embedding model"""
def __init__(self,
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
device: str = "auto",
normalize_embeddings: bool = True):
self._model_name = model_name
self.normalize_embeddings = normalize_embeddings
# Set device
if device == "auto":
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
self.device = torch.device(device)
# Load model and tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name).to(self.device)
self.model.eval()
# Get model specifications
self._dimensions = self.model.config.hidden_size
self._max_length = self.tokenizer.model_max_length
@property
def model_name(self) -> str:
return self._model_name
@property
def dimensions(self) -> int:
return self._dimensions
@property
def max_sequence_length(self) -> int:
return self._max_length
async def embed_single(self, text: str) -> EmbeddingResponse:
"""Embed single text"""
start_time = time.time()
# Run in thread pool to avoid blocking
loop = asyncio.get_event_loop()
embedding = await loop.run_in_executor(None, self._encode_text, text)
processing_time = time.time() - start_time
return EmbeddingResponse(
embedding=embedding,
model_name=self._model_name,
dimensions=len(embedding),
processing_time=processing_time
)
async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
"""Embed multiple texts in batch"""
start_time = time.time()
# Run in thread pool
loop = asyncio.get_event_loop()
embeddings = await loop.run_in_executor(None, self._encode_batch, texts)
processing_time = time.time() - start_time
results = []
for i, embedding in enumerate(embeddings):
results.append(EmbeddingResponse(
embedding=embedding,
model_name=self._model_name,
dimensions=len(embedding),
processing_time=processing_time / len(texts),
metadata={"batch_index": i}
))
return results
def _encode_text(self, text: str) -> np.ndarray:
"""Encode single text (runs in thread pool)"""
inputs = self.tokenizer(
text,
truncation=True,
padding=True,
max_length=self._max_length,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
# Mean pooling
token_embeddings = outputs.last_hidden_state
input_mask_expanded = inputs.attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
embedding = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
if self.normalize_embeddings:
embedding = torch.nn.functional.normalize(embedding, p=2, dim=1)
return embedding.cpu().numpy()[0].astype(np.float32)
def _encode_batch(self, texts: List[str]) -> List[np.ndarray]:
"""Encode batch of texts (runs in thread pool)"""
inputs = self.tokenizer(
texts,
truncation=True,
padding=True,
max_length=self._max_length,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
# Mean pooling
token_embeddings = outputs.last_hidden_state
input_mask_expanded = inputs.attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
if self.normalize_embeddings:
embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
return [emb.cpu().numpy().astype(np.float32) for emb in embeddings]
class MultiModalEmbeddingModel(EmbeddingModel):
"""Multi-modal embedding model for text and images"""
def __init__(self,
text_model: EmbeddingModel,
image_model: Optional[Any] = None,
fusion_method: str = "concatenate"):
self.text_model = text_model
self.image_model = image_model
self.fusion_method = fusion_method
# Calculate combined dimensions
if fusion_method == "concatenate":
self._dimensions = text_model.dimensions
if image_model:
self._dimensions += getattr(image_model, 'dimensions', 512)
else:
self._dimensions = text_model.dimensions
@property
def model_name(self) -> str:
return f"multimodal_{self.text_model.model_name}"
@property
def dimensions(self) -> int:
return self._dimensions
@property
def max_sequence_length(self) -> int:
return self.text_model.max_sequence_length
async def embed_single(self, text: str, image_data: Optional[Any] = None) -> EmbeddingResponse:
"""Embed text and optionally image"""
start_time = time.time()
# Get text embedding
text_response = await self.text_model.embed_single(text)
text_embedding = text_response.embedding
# Get image embedding if provided
if image_data and self.image_model:
image_embedding = await self._embed_image(image_data)
if self.fusion_method == "concatenate":
combined_embedding = np.concatenate([text_embedding, image_embedding])
elif self.fusion_method == "average":
# Ensure same dimensions
min_dim = min(len(text_embedding), len(image_embedding))
combined_embedding = (text_embedding[:min_dim] + image_embedding[:min_dim]) / 2
else:
combined_embedding = text_embedding
else:
combined_embedding = text_embedding
processing_time = time.time() - start_time
return EmbeddingResponse(
embedding=combined_embedding,
model_name=self.model_name,
dimensions=len(combined_embedding),
processing_time=processing_time,
metadata={
"has_image": image_data is not None,
"fusion_method": self.fusion_method
}
)
async def embed_batch(self, texts: List[str], images: Optional[List[Any]] = None) -> List[EmbeddingResponse]:
"""Embed batch of texts and optionally images"""
# Get text embeddings
text_responses = await self.text_model.embed_batch(texts)
results = []
for i, text_response in enumerate(text_responses):
image_data = images[i] if images and i < len(images) else None
if image_data and self.image_model:
image_embedding = await self._embed_image(image_data)
if self.fusion_method == "concatenate":
combined_embedding = np.concatenate([text_response.embedding, image_embedding])
elif self.fusion_method == "average":
min_dim = min(len(text_response.embedding), len(image_embedding))
combined_embedding = (text_response.embedding[:min_dim] + image_embedding[:min_dim]) / 2
else:
combined_embedding = text_response.embedding
else:
combined_embedding = text_response.embedding
results.append(EmbeddingResponse(
embedding=combined_embedding,
model_name=self.model_name,
dimensions=len(combined_embedding),
processing_time=text_response.processing_time,
metadata={
"has_image": image_data is not None,
"fusion_method": self.fusion_method,
"batch_index": i
}
))
return results
async def _embed_image(self, image_data: Any) -> np.ndarray:
"""Embed image data (placeholder implementation)"""
# This would integrate with actual image embedding models
# like CLIP, ResNet features, etc.
# For now, return random embedding
return np.random.rand(512).astype(np.float32)
class HybridEmbeddingModel(EmbeddingModel):
"""Hybrid model combining multiple embedding approaches"""
def __init__(self,
models: List[EmbeddingModel],
weights: Optional[List[float]] = None,
combination_method: str = "weighted_average"):
self.models = models
self.weights = weights or [1.0] * len(models)
self.combination_method = combination_method
# Normalize weights
total_weight = sum(self.weights)
self.weights = [w / total_weight for w in self.weights]
# Set dimensions based on combination method
if combination_method == "concatenate":
self._dimensions = sum(model.dimensions for model in models)
else:
self._dimensions = models[0].dimensions
@property
def model_name(self) -> str:
return f"hybrid_{len(self.models)}_models"
@property
def dimensions(self) -> int:
return self._dimensions
@property
def max_sequence_length(self) -> int:
return min(model.max_sequence_length for model in self.models)
async def embed_single(self, text: str) -> EmbeddingResponse:
"""Embed text using all models and combine"""
start_time = time.time()
# Get embeddings from all models concurrently
tasks = [model.embed_single(text) for model in self.models]
responses = await asyncio.gather(*tasks)
# Combine embeddings
combined_embedding = self._combine_embeddings(
[resp.embedding for resp in responses]
)
processing_time = time.time() - start_time
return EmbeddingResponse(
embedding=combined_embedding,
model_name=self.model_name,
dimensions=len(combined_embedding),
processing_time=processing_time,
metadata={
"component_models": [resp.model_name for resp in responses],
"combination_method": self.combination_method
}
)
async def embed_batch(self, texts: List[str]) -> List[EmbeddingResponse]:
"""Embed batch using all models and combine"""
start_time = time.time()
# Get embeddings from all models concurrently
tasks = [model.embed_batch(texts) for model in self.models]
model_responses = await asyncio.gather(*tasks)
# Combine embeddings for each text
results = []
for i in range(len(texts)):
embeddings_for_text = [
model_resp[i].embedding for model_resp in model_responses
]
combined_embedding = self._combine_embeddings(embeddings_for_text)
results.append(EmbeddingResponse(
embedding=combined_embedding,
model_name=self.model_name,
dimensions=len(combined_embedding),
processing_time=(time.time() - start_time) / len(texts),
metadata={
"component_models": [resp[i].model_name for resp in model_responses],
"combination_method": self.combination_method,
"batch_index": i
}
))
return results
def _combine_embeddings(self, embeddings: List[np.ndarray]) -> np.ndarray:
"""Combine multiple embeddings using specified method"""
if self.combination_method == "concatenate":
return np.concatenate(embeddings)
elif self.combination_method == "weighted_average":
# Ensure all embeddings have same dimension
min_dim = min(len(emb) for emb in embeddings)
trimmed_embeddings = [emb[:min_dim] for emb in embeddings]
weighted_sum = np.zeros(min_dim, dtype=np.float32)
for embedding, weight in zip(trimmed_embeddings, self.weights):
weighted_sum += embedding * weight
return weighted_sum
elif self.combination_method == "max_pooling":
min_dim = min(len(emb) for emb in embeddings)
stacked = np.stack([emb[:min_dim] for emb in embeddings])
return np.max(stacked, axis=0)
elif self.combination_method == "attention_weighted":
# Simple attention mechanism
min_dim = min(len(emb) for emb in embeddings)
trimmed_embeddings = [emb[:min_dim] for emb in embeddings]
# Calculate attention weights based on L2 norm
norms = [np.linalg.norm(emb) for emb in trimmed_embeddings]
attention_weights = np.array(norms) / sum(norms)
weighted_sum = np.zeros(min_dim, dtype=np.float32)
for embedding, weight in zip(trimmed_embeddings, attention_weights):
weighted_sum += embedding * weight
return weighted_sum
else:
# Default to average
min_dim = min(len(emb) for emb in embeddings)
stacked = np.stack([emb[:min_dim] for emb in embeddings])
return np.mean(stacked, axis=0)
# Example usage and benchmarking
async def benchmark_embedding_models():
"""Benchmark different embedding models"""
# Sample texts for testing
texts = [
"Machine learning is transforming the world of technology.",
"Natural language processing enables computers to understand human language.",
"Vector databases are essential for semantic search applications.",
"Artificial intelligence is revolutionizing many industries.",
"Deep learning models require large amounts of training data."
]
# Initialize models
models = {
"OpenAI": OpenAIEmbeddingModel("text-embedding-3-small"),
"HuggingFace": HuggingFaceEmbeddingModel("sentence-transformers/all-MiniLM-L6-v2")
}
# Benchmark each model
results = {}
for model_name, model in models.items():
print(f"\nBenchmarking {model_name}...")
start_time = time.time()
# Test single embedding
single_response = await model.embed_single(texts[0])
single_time = time.time() - start_time
# Test batch embedding
start_time = time.time()
batch_responses = await model.embed_batch(texts)
batch_time = time.time() - start_time
results[model_name] = {
"model_name": model.model_name,
"dimensions": model.dimensions,
"max_sequence_length": model.max_sequence_length,
"single_embedding_time": single_time,
"batch_embedding_time": batch_time,
"avg_batch_time_per_item": batch_time / len(texts),
"embedding_sample": single_response.embedding[:5].tolist() # First 5 dimensions
}
# Print results
print("\nBenchmark Results:")
print("-" * 80)
for model_name, metrics in results.items():
print(f"\n{model_name}:")
for metric, value in metrics.items():
if isinstance(value, float):
print(f" {metric}: {value:.4f}")
elif isinstance(value, list):
print(f" {metric}: {value}")
else:
print(f" {metric}: {value}")
if __name__ == "__main__":
asyncio.run(benchmark_embedding_models())
Vector Database Implementations
# vector_database_implementations.py
import asyncio
import numpy as np
import faiss
import sqlite3
import json
import pickle
from typing import List, Dict, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from datetime import datetime
import uuid
from pathlib import Path
import logging
from abc import ABC, abstractmethod
@dataclass
class VectorDocument:
id: str
content: str
embedding: np.ndarray
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class SearchResult:
document: VectorDocument
score: float
rank: int
@dataclass
class SearchOptions:
top_k: int = 10
score_threshold: Optional[float] = None
metadata_filters: Dict[str, Any] = field(default_factory=dict)
include_metadata: bool = True
include_content: bool = True
class VectorDatabase(ABC):
"""Abstract base class for vector databases"""
@abstractmethod
async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
"""Add documents to the database"""
pass
@abstractmethod
async def search(self,
query_vector: np.ndarray,
options: SearchOptions = None) -> List[SearchResult]:
"""Search for similar vectors"""
pass
@abstractmethod
async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
"""Get document by ID"""
pass
@abstractmethod
async def delete_document(self, doc_id: str) -> bool:
"""Delete document by ID"""
pass
@abstractmethod
async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
"""Update existing document"""
pass
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
pass
class FAISSVectorDatabase(VectorDatabase):
"""FAISS-based vector database implementation"""
def __init__(self,
dimension: int,
index_type: str = "IVFFlat",
metric: str = "cosine",
db_path: str = "faiss_vector_db",
nlist: int = 100):
self.dimension = dimension
self.index_type = index_type
self.metric = metric
self.db_path = Path(db_path)
self.nlist = nlist
# Initialize FAISS index
self.index = self._create_index()
self.is_trained = False
# Initialize metadata storage
self.db_path.mkdir(exist_ok=True)
self.metadata_db_path = self.db_path / "metadata.db"
self._init_metadata_db()
# Document tracking
self.doc_id_to_faiss_id = {}
self.faiss_id_to_doc_id = {}
self.next_faiss_id = 0
# Load existing index if available
self._load_index()
self.logger = logging.getLogger(__name__)
def _create_index(self) -> faiss.Index:
"""Create FAISS index based on configuration"""
if self.index_type == "Flat":
if self.metric == "cosine":
index = faiss.IndexFlatIP(self.dimension)
else:
index = faiss.IndexFlatL2(self.dimension)
elif self.index_type == "IVFFlat":
if self.metric == "cosine":
quantizer = faiss.IndexFlatIP(self.dimension)
index = faiss.IndexIVFFlat(quantizer, self.dimension, self.nlist)
else:
quantizer = faiss.IndexFlatL2(self.dimension)
index = faiss.IndexIVFFlat(quantizer, self.dimension, self.nlist)
elif self.index_type == "HNSW":
index = faiss.IndexHNSWFlat(self.dimension, 32)
index.hnsw.efConstruction = 40
index.hnsw.efSearch = 16
elif self.index_type == "IVFPQFlat":
quantizer = faiss.IndexFlatL2(self.dimension)
index = faiss.IndexIVFPQ(quantizer, self.dimension, self.nlist, 8, 8)
else:
raise ValueError(f"Unsupported index type: {self.index_type}")
return index
def _init_metadata_db(self):
"""Initialize SQLite database for metadata"""
with sqlite3.connect(self.metadata_db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS documents (
doc_id TEXT PRIMARY KEY,
faiss_id INTEGER UNIQUE,
content TEXT,
metadata TEXT,
timestamp TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_faiss_id ON documents(faiss_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON documents(timestamp)
""")
def _load_index(self):
"""Load existing FAISS index and metadata"""
index_path = self.db_path / "faiss.index"
mapping_path = self.db_path / "id_mapping.pkl"
if index_path.exists():
try:
self.index = faiss.read_index(str(index_path))
self.is_trained = True
if mapping_path.exists():
with open(mapping_path, 'rb') as f:
mapping_data = pickle.load(f)
self.doc_id_to_faiss_id = mapping_data['doc_to_faiss']
self.faiss_id_to_doc_id = mapping_data['faiss_to_doc']
self.next_faiss_id = mapping_data['next_id']
self.logger.info(f"Loaded FAISS index with {self.index.ntotal} vectors")
except Exception as e:
self.logger.error(f"Failed to load FAISS index: {e}")
self.index = self._create_index()
def _save_index(self):
"""Save FAISS index and metadata"""
try:
index_path = self.db_path / "faiss.index"
mapping_path = self.db_path / "id_mapping.pkl"
# Save FAISS index
faiss.write_index(self.index, str(index_path))
# Save ID mappings
mapping_data = {
'doc_to_faiss': self.doc_id_to_faiss_id,
'faiss_to_doc': self.faiss_id_to_doc_id,
'next_id': self.next_faiss_id
}
with open(mapping_path, 'wb') as f:
pickle.dump(mapping_data, f)
except Exception as e:
self.logger.error(f"Failed to save FAISS index: {e}")
async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
"""Add documents to the database"""
if not documents:
return []
# Prepare embeddings
embeddings = np.vstack([doc.embedding for doc in documents])
# Normalize for cosine similarity if needed
if self.metric == "cosine":
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
# Train index if needed
if not self.is_trained and self.index_type.startswith("IVF"):
self.index.train(embeddings)
self.is_trained = True
# Add to FAISS index
start_faiss_id = self.next_faiss_id
self.index.add(embeddings)
# Store metadata and update mappings
added_doc_ids = []
with sqlite3.connect(self.metadata_db_path) as conn:
for i, doc in enumerate(documents):
faiss_id = start_faiss_id + i
# Update mappings
self.doc_id_to_faiss_id[doc.id] = faiss_id
self.faiss_id_to_doc_id[faiss_id] = doc.id
# Store in SQLite
conn.execute("""
INSERT OR REPLACE INTO documents
(doc_id, faiss_id, content, metadata, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (
doc.id,
faiss_id,
doc.content,
json.dumps(doc.metadata),
doc.timestamp.isoformat()
))
added_doc_ids.append(doc.id)
self.next_faiss_id += len(documents)
# Save index periodically
if len(documents) > 100 or self.index.ntotal % 1000 == 0:
self._save_index()
return added_doc_ids
async def search(self,
query_vector: np.ndarray,
options: SearchOptions = None) -> List[SearchResult]:
"""Search for similar vectors"""
if options is None:
options = SearchOptions()
if self.index.ntotal == 0:
return []
# Normalize query vector for cosine similarity
if self.metric == "cosine":
query_vector = query_vector / np.linalg.norm(query_vector)
# Search FAISS index
scores, indices = self.index.search(
query_vector.reshape(1, -1).astype(np.float32),
options.top_k
)
# Convert results
results = []
with sqlite3.connect(self.metadata_db_path) as conn:
for rank, (score, faiss_id) in enumerate(zip(scores[0], indices[0])):
if faiss_id == -1: # FAISS returns -1 for invalid indices
continue
# Apply score threshold
if options.score_threshold and score < options.score_threshold:
continue
# Get document metadata
cursor = conn.execute("""
SELECT doc_id, content, metadata, timestamp
FROM documents WHERE faiss_id = ?
""", (int(faiss_id),))
row = cursor.fetchone()
if not row:
continue
doc_id, content, metadata_json, timestamp = row
metadata = json.loads(metadata_json)
# Apply metadata filters
if options.metadata_filters:
if not self._matches_filters(metadata, options.metadata_filters):
continue
# Create document
document = VectorDocument(
id=doc_id,
content=content if options.include_content else "",
embedding=np.array([]), # Don't include embedding in results
metadata=metadata if options.include_metadata else {},
timestamp=datetime.fromisoformat(timestamp)
)
results.append(SearchResult(
document=document,
score=float(score),
rank=rank
))
return results
def _matches_filters(self, metadata: Dict[str, Any],
filters: Dict[str, Any]) -> bool:
"""Check if metadata matches filters"""
for key, expected_value in filters.items():
if key not in metadata:
return False
if isinstance(expected_value, list):
if metadata[key] not in expected_value:
return False
else:
if metadata[key] != expected_value:
return False
return True
async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
"""Get document by ID"""
with sqlite3.connect(self.metadata_db_path) as conn:
cursor = conn.execute("""
SELECT content, metadata, timestamp, faiss_id
FROM documents WHERE doc_id = ?
""", (doc_id,))
row = cursor.fetchone()
if not row:
return None
content, metadata_json, timestamp, faiss_id = row
# Get embedding from FAISS
embedding = np.array([])
if faiss_id in self.faiss_id_to_doc_id:
try:
# This is a simplified approach - in practice, you'd need to
# store embeddings separately or reconstruct them
embedding = np.random.rand(self.dimension) # Placeholder
except:
pass
return VectorDocument(
id=doc_id,
content=content,
embedding=embedding,
metadata=json.loads(metadata_json),
timestamp=datetime.fromisoformat(timestamp)
)
async def delete_document(self, doc_id: str) -> bool:
"""Delete document by ID"""
if doc_id not in self.doc_id_to_faiss_id:
return False
faiss_id = self.doc_id_to_faiss_id[doc_id]
# Remove from mappings
del self.doc_id_to_faiss_id[doc_id]
del self.faiss_id_to_doc_id[faiss_id]
# Remove from SQLite
with sqlite3.connect(self.metadata_db_path) as conn:
cursor = conn.execute("DELETE FROM documents WHERE doc_id = ?", (doc_id,))
# Note: FAISS doesn't support deletion, so we mark it as deleted
# In production, you'd rebuild the index periodically
return cursor.rowcount > 0
async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
"""Update existing document"""
# For FAISS, update means delete and re-add
if await self.delete_document(doc_id):
document.id = doc_id # Ensure ID is preserved
await self.add_documents([document])
return True
return False
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
with sqlite3.connect(self.metadata_db_path) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM documents")
doc_count = cursor.fetchone()[0]
cursor = conn.execute("""
SELECT COUNT(DISTINCT json_extract(metadata, '$.category'))
FROM documents
WHERE json_extract(metadata, '$.category') IS NOT NULL
""")
category_count = cursor.fetchone()[0]
return {
"total_documents": doc_count,
"faiss_vectors": self.index.ntotal,
"dimension": self.dimension,
"index_type": self.index_type,
"metric": self.metric,
"is_trained": self.is_trained,
"unique_categories": category_count,
"index_size_mb": self.index.ntotal * self.dimension * 4 / (1024 * 1024)
}
def __del__(self):
"""Cleanup when object is destroyed"""
self._save_index()
class PineconeVectorDatabase(VectorDatabase):
"""Pinecone vector database implementation"""
def __init__(self,
api_key: str,
environment: str,
index_name: str,
dimension: int,
metric: str = "cosine"):
try:
import pinecone
from pinecone import Pinecone
self.pc = Pinecone(api_key=api_key)
self.index_name = index_name
self.dimension = dimension
self.metric = metric
# Create or connect to index
if index_name not in self.pc.list_indexes().names():
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric
)
self.index = self.pc.Index(index_name)
except ImportError:
raise ImportError("pinecone-client is required for PineconeVectorDatabase")
async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
"""Add documents to Pinecone"""
vectors = []
for doc in documents:
vectors.append({
"id": doc.id,
"values": doc.embedding.tolist(),
"metadata": {
"content": doc.content,
"timestamp": doc.timestamp.isoformat(),
**doc.metadata
}
})
# Upsert in batches
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
self.index.upsert(vectors=batch)
return [doc.id for doc in documents]
async def search(self,
query_vector: np.ndarray,
options: SearchOptions = None) -> List[SearchResult]:
"""Search Pinecone index"""
if options is None:
options = SearchOptions()
# Build filter
filter_dict = {}
if options.metadata_filters:
filter_dict = options.metadata_filters
# Search
response = self.index.query(
vector=query_vector.tolist(),
top_k=options.top_k,
filter=filter_dict if filter_dict else None,
include_metadata=True
)
results = []
for rank, match in enumerate(response.matches):
if options.score_threshold and match.score < options.score_threshold:
continue
metadata = match.metadata or {}
document = VectorDocument(
id=match.id,
content=metadata.get("content", "") if options.include_content else "",
embedding=np.array([]),
metadata={k: v for k, v in metadata.items() if k not in ["content", "timestamp"]} if options.include_metadata else {},
timestamp=datetime.fromisoformat(metadata.get("timestamp", datetime.now().isoformat()))
)
results.append(SearchResult(
document=document,
score=match.score,
rank=rank
))
return results
async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
"""Get document from Pinecone"""
response = self.index.fetch(ids=[doc_id])
if doc_id not in response.vectors:
return None
vector_data = response.vectors[doc_id]
metadata = vector_data.metadata or {}
return VectorDocument(
id=doc_id,
content=metadata.get("content", ""),
embedding=np.array(vector_data.values),
metadata={k: v for k, v in metadata.items() if k not in ["content", "timestamp"]},
timestamp=datetime.fromisoformat(metadata.get("timestamp", datetime.now().isoformat()))
)
async def delete_document(self, doc_id: str) -> bool:
"""Delete document from Pinecone"""
self.index.delete(ids=[doc_id])
return True
async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
"""Update document in Pinecone"""
vector = {
"id": doc_id,
"values": document.embedding.tolist(),
"metadata": {
"content": document.content,
"timestamp": document.timestamp.isoformat(),
**document.metadata
}
}
self.index.upsert(vectors=[vector])
return True
def get_stats(self) -> Dict[str, Any]:
"""Get Pinecone index stats"""
stats = self.index.describe_index_stats()
return {
"total_documents": stats.total_vector_count,
"dimension": self.dimension,
"metric": self.metric,
"index_name": self.index_name,
"index_fullness": stats.index_fullness,
"namespaces": len(stats.namespaces) if stats.namespaces else 0
}
class ChromaVectorDatabase(VectorDatabase):
"""ChromaDB vector database implementation"""
def __init__(self,
collection_name: str = "default",
persist_directory: str = "./chroma_db",
embedding_function = None):
try:
import chromadb
from chromadb.config import Settings
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=embedding_function
)
except ImportError:
raise ImportError("chromadb is required for ChromaVectorDatabase")
async def add_documents(self, documents: List[VectorDocument]) -> List[str]:
"""Add documents to ChromaDB"""
ids = [doc.id for doc in documents]
embeddings = [doc.embedding.tolist() for doc in documents]
documents_content = [doc.content for doc in documents]
metadatas = [doc.metadata for doc in documents]
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=documents_content,
metadatas=metadatas
)
return ids
async def search(self,
query_vector: np.ndarray,
options: SearchOptions = None) -> List[SearchResult]:
"""Search ChromaDB"""
if options is None:
options = SearchOptions()
# Build where clause for metadata filtering
where_clause = options.metadata_filters if options.metadata_filters else None
response = self.collection.query(
query_embeddings=[query_vector.tolist()],
n_results=options.top_k,
where=where_clause,
include=["documents", "metadatas", "distances"]
)
results = []
if response['ids']:
for rank, (doc_id, distance, content, metadata) in enumerate(zip(
response['ids'][0],
response['distances'][0],
response['documents'][0],
response['metadatas'][0]
)):
# Convert distance to similarity score (ChromaDB returns distances)
score = 1.0 / (1.0 + distance)
if options.score_threshold and score < options.score_threshold:
continue
document = VectorDocument(
id=doc_id,
content=content if options.include_content else "",
embedding=np.array([]),
metadata=metadata if options.include_metadata else {},
timestamp=datetime.now() # ChromaDB doesn't store timestamps by default
)
results.append(SearchResult(
document=document,
score=score,
rank=rank
))
return results
async def get_document(self, doc_id: str) -> Optional[VectorDocument]:
"""Get document from ChromaDB"""
response = self.collection.get(
ids=[doc_id],
include=["documents", "metadatas", "embeddings"]
)
if not response['ids']:
return None
return VectorDocument(
id=response['ids'][0],
content=response['documents'][0],
embedding=np.array(response['embeddings'][0]),
metadata=response['metadatas'][0],
timestamp=datetime.now()
)
async def delete_document(self, doc_id: str) -> bool:
"""Delete document from ChromaDB"""
self.collection.delete(ids=[doc_id])
return True
async def update_document(self, doc_id: str, document: VectorDocument) -> bool:
"""Update document in ChromaDB"""
self.collection.update(
ids=[doc_id],
embeddings=[document.embedding.tolist()],
documents=[document.content],
metadatas=[document.metadata]
)
return True
def get_stats(self) -> Dict[str, Any]:
"""Get ChromaDB collection stats"""
count = self.collection.count()
return {
"total_documents": count,
"collection_name": self.collection.name
}
# Vector database factory
class VectorDatabaseFactory:
"""Factory for creating vector database instances"""
@staticmethod
def create_database(db_type: str, **kwargs) -> VectorDatabase:
"""Create vector database instance"""
if db_type.lower() == "faiss":
return FAISSVectorDatabase(**kwargs)
elif db_type.lower() == "pinecone":
return PineconeVectorDatabase(**kwargs)
elif db_type.lower() == "chroma":
return ChromaVectorDatabase(**kwargs)
else:
raise ValueError(f"Unsupported database type: {db_type}")
# Example usage and testing
async def test_vector_databases():
"""Test different vector database implementations"""
# Create sample documents
np.random.seed(42)
documents = []
for i in range(100):
doc = VectorDocument(
id=f"doc_{i}",
content=f"This is document {i} with some sample content.",
embedding=np.random.rand(384).astype(np.float32),
metadata={"category": f"cat_{i % 5}", "importance": i % 3}
)
documents.append(doc)
# Test FAISS database
print("Testing FAISS Vector Database...")
faiss_db = VectorDatabaseFactory.create_database(
"faiss",
dimension=384,
index_type="Flat",
db_path="./test_faiss_db"
)
# Add documents
added_ids = await faiss_db.add_documents(documents[:50])
print(f"Added {len(added_ids)} documents to FAISS")
# Search
query_vector = np.random.rand(384).astype(np.float32)
search_options = SearchOptions(top_k=5, metadata_filters={"category": "cat_1"})
results = await faiss_db.search(query_vector, search_options)
print(f"Found {len(results)} results")
for result in results:
print(f" Doc: {result.document.id}, Score: {result.score:.3f}")
# Get stats
stats = faiss_db.get_stats()
print(f"Database stats: {stats}")
if __name__ == "__main__":
asyncio.run(test_vector_databases())
Production Optimization and Monitoring
# vector_db_optimization.py
import asyncio
import time
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
from collections import defaultdict, deque
import threading
from concurrent.futures import ThreadPoolExecutor
import psutil
@dataclass
class PerformanceMetrics:
operation: str
duration: float
vectors_processed: int
memory_usage_mb: float
timestamp: datetime
success: bool
error_message: Optional[str] = None
class VectorDatabaseOptimizer:
"""Optimization and monitoring for vector databases"""
def __init__(self, vector_db: VectorDatabase):
self.vector_db = vector_db
self.metrics_history = deque(maxlen=10000)
self.performance_cache = {}
self.optimization_suggestions = []
# Performance tracking
self.search_times = deque(maxlen=1000)
self.insert_times = deque(maxlen=1000)
self.memory_usage = deque(maxlen=1000)
# Threading for background optimization
self.executor = ThreadPoolExecutor(max_workers=2)
self.optimization_running = False
self.logger = logging.getLogger(__name__)
async def monitor_performance(self, operation_name: str):
"""Context manager for monitoring operation performance"""
return PerformanceMonitor(self, operation_name)
def record_metrics(self, metrics: PerformanceMetrics):
"""Record performance metrics"""
self.metrics_history.append(metrics)
# Update specific metric queues
if metrics.operation == "search":
self.search_times.append(metrics.duration)
elif metrics.operation == "insert":
self.insert_times.append(metrics.duration)
self.memory_usage.append(metrics.memory_usage_mb)
# Check for performance issues
self._check_performance_issues(metrics)
def _check_performance_issues(self, metrics: PerformanceMetrics):
"""Check for performance issues and generate suggestions"""
# Slow search detection
if metrics.operation == "search" and metrics.duration > 1.0:
self.optimization_suggestions.append({
"type": "slow_search",
"message": f"Search took {metrics.duration:.2f}s, consider index optimization",
"timestamp": metrics.timestamp,
"severity": "medium"
})
# High memory usage
if metrics.memory_usage_mb > 1000: # 1GB threshold
self.optimization_suggestions.append({
"type": "high_memory",
"message": f"Memory usage at {metrics.memory_usage_mb:.0f}MB, consider optimization",
"timestamp": metrics.timestamp,
"severity": "high"
})
# Keep only recent suggestions
cutoff = datetime.now() - timedelta(hours=1)
self.optimization_suggestions = [
s for s in self.optimization_suggestions
if s["timestamp"] > cutoff
]
def get_performance_summary(self) -> Dict[str, Any]:
"""Get comprehensive performance summary"""
# Calculate averages
avg_search_time = np.mean(self.search_times) if self.search_times else 0
avg_insert_time = np.mean(self.insert_times) if self.insert_times else 0
avg_memory = np.mean(self.memory_usage) if self.memory_usage else 0
# Calculate percentiles
search_p95 = np.percentile(self.search_times, 95) if self.search_times else 0
search_p99 = np.percentile(self.search_times, 99) if self.search_times else 0
# Error rate
recent_metrics = [m for m in self.metrics_history if m.timestamp > datetime.now() - timedelta(hours=1)]
error_rate = sum(1 for m in recent_metrics if not m.success) / len(recent_metrics) if recent_metrics else 0
return {
"search_performance": {
"avg_time_ms": avg_search_time * 1000,
"p95_time_ms": search_p95 * 1000,
"p99_time_ms": search_p99 * 1000,
"total_searches": len(self.search_times)
},
"insert_performance": {
"avg_time_ms": avg_insert_time * 1000,
"total_inserts": len(self.insert_times)
},
"memory_usage": {
"avg_mb": avg_memory,
"current_mb": self.memory_usage[-1] if self.memory_usage else 0
},
"error_rate": error_rate,
"optimization_suggestions": len(self.optimization_suggestions),
"database_stats": self.vector_db.get_stats()
}
async def optimize_search_parameters(self,
test_queries: List[np.ndarray],
optimization_target: str = "latency") -> Dict[str, Any]:
"""Optimize search parameters for better performance"""
if not hasattr(self.vector_db, 'index') or not hasattr(self.vector_db.index, 'nprobe'):
return {"message": "Index doesn't support parameter optimization"}
self.logger.info("Starting search parameter optimization...")
# Test different nprobe values for IVF indices
best_params = {"nprobe": 1}
best_score = float('inf')
nprobe_values = [1, 2, 4, 8, 16, 32, 64, 128]
for nprobe in nprobe_values:
if hasattr(self.vector_db.index, 'nprobe'):
self.vector_db.index.nprobe = nprobe
# Run test searches
total_time = 0
total_results = 0
for query in test_queries[:10]: # Test with subset
start_time = time.time()
try:
results = await self.vector_db.search(query)
search_time = time.time() - start_time
total_time += search_time
total_results += len(results)
except Exception as e:
self.logger.error(f"Search failed with nprobe={nprobe}: {e}")
continue
if total_results > 0:
if optimization_target == "latency":
score = total_time / len(test_queries[:10])
elif optimization_target == "throughput":
score = total_time # Lower is better for throughput too
else:
score = total_time
if score < best_score:
best_score = score
best_params["nprobe"] = nprobe
self.logger.info(f"nprobe={nprobe}: avg_time={score:.3f}s")
# Apply best parameters
if hasattr(self.vector_db.index, 'nprobe'):
self.vector_db.index.nprobe = best_params["nprobe"]
self.logger.info(f"Optimization complete. Best parameters: {best_params}")
return {
"best_parameters": best_params,
"best_score": best_score,
"optimization_target": optimization_target
}
async def analyze_embedding_distribution(self,
sample_size: int = 1000) -> Dict[str, Any]:
"""Analyze embedding distribution for optimization insights"""
# This would require access to embeddings, which depends on the DB implementation
# For FAISS, we could sample from the index
analysis = {
"sample_size": sample_size,
"analysis_timestamp": datetime.now().isoformat()
}
if hasattr(self.vector_db, 'index') and hasattr(self.vector_db.index, 'reconstruct'):
try:
# Sample embeddings from FAISS index
embeddings = []
total_vectors = self.vector_db.index.ntotal
if total_vectors > 0:
sample_indices = np.random.choice(
total_vectors,
min(sample_size, total_vectors),
replace=False
)
for idx in sample_indices:
try:
embedding = self.vector_db.index.reconstruct(int(idx))
embeddings.append(embedding)
except:
continue
if embeddings:
embeddings_matrix = np.vstack(embeddings)
analysis.update({
"mean_norm": float(np.mean(np.linalg.norm(embeddings_matrix, axis=1))),
"std_norm": float(np.std(np.linalg.norm(embeddings_matrix, axis=1))),
"dimension": embeddings_matrix.shape[1],
"sparsity": float(np.mean(embeddings_matrix == 0)),
"dynamic_range": {
"min": float(np.min(embeddings_matrix)),
"max": float(np.max(embeddings_matrix)),
"mean": float(np.mean(embeddings_matrix)),
"std": float(np.std(embeddings_matrix))
}
})
# Check for normalization
norms = np.linalg.norm(embeddings_matrix, axis=1)
is_normalized = np.allclose(norms, 1.0, rtol=1e-3)
analysis["is_normalized"] = is_normalized
# Clustering analysis
if len(embeddings) > 10:
from sklearn.cluster import KMeans
try:
kmeans = KMeans(n_clusters=min(10, len(embeddings) // 2), random_state=42)
cluster_labels = kmeans.fit_predict(embeddings_matrix)
analysis["clustering"] = {
"n_clusters": len(np.unique(cluster_labels)),
"silhouette_score": self._calculate_silhouette_score(embeddings_matrix, cluster_labels)
}
except:
pass
except Exception as e:
analysis["error"] = f"Failed to analyze embeddings: {str(e)}"
return analysis
def _calculate_silhouette_score(self, embeddings: np.ndarray, labels: np.ndarray) -> float:
"""Calculate silhouette score for clustering"""
try:
from sklearn.metrics import silhouette_score
return float(silhouette_score(embeddings, labels))
except:
return 0.0
async def suggest_optimizations(self) -> List[Dict[str, Any]]:
"""Generate optimization suggestions based on performance analysis"""
suggestions = []
stats = self.vector_db.get_stats()
performance = self.get_performance_summary()
# Index type suggestions
if stats.get("index_type") == "Flat" and stats.get("total_documents", 0) > 10000:
suggestions.append({
"type": "index_optimization",
"priority": "high",
"message": "Consider using IVFFlat index for better search performance with large datasets",
"action": "Change index_type from Flat to IVFFlat"
})
# Memory optimization
if performance["memory_usage"]["avg_mb"] > 2000:
suggestions.append({
"type": "memory_optimization",
"priority": "medium",
"message": "High memory usage detected. Consider using quantization or dimension reduction",
"action": "Implement PQ quantization or reduce embedding dimensions"
})
# Search performance
if performance["search_performance"]["avg_time_ms"] > 100:
suggestions.append({
"type": "search_optimization",
"priority": "medium",
"message": "Search latency is high. Consider index tuning or caching",
"action": "Optimize search parameters or implement result caching"
})
# Error rate
if performance["error_rate"] > 0.01: # 1% error rate
suggestions.append({
"type": "reliability",
"priority": "high",
"message": f"High error rate detected: {performance['error_rate']:.2%}",
"action": "Investigate error patterns and implement better error handling"
})
return suggestions
async def run_performance_benchmark(self,
num_search_queries: int = 100,
num_insert_docs: int = 1000) -> Dict[str, Any]:
"""Run comprehensive performance benchmark"""
self.logger.info("Starting performance benchmark...")
benchmark_results = {
"benchmark_timestamp": datetime.now().isoformat(),
"search_benchmark": {},
"insert_benchmark": {},
"system_info": {
"cpu_count": psutil.cpu_count(),
"memory_total_gb": psutil.virtual_memory().total / (1024**3),
"memory_available_gb": psutil.virtual_memory().available / (1024**3)
}
}
# Search benchmark
if hasattr(self.vector_db, 'dimension'):
dimension = self.vector_db.dimension
else:
dimension = 384 # Default
search_queries = [np.random.rand(dimension).astype(np.float32) for _ in range(num_search_queries)]
search_times = []
for query in search_queries:
async with self.monitor_performance("benchmark_search") as monitor:
try:
results = await self.vector_db.search(query)
search_times.append(monitor.duration)
except Exception as e:
self.logger.error(f"Search benchmark error: {e}")
if search_times:
benchmark_results["search_benchmark"] = {
"avg_latency_ms": np.mean(search_times) * 1000,
"p50_latency_ms": np.percentile(search_times, 50) * 1000,
"p95_latency_ms": np.percentile(search_times, 95) * 1000,
"p99_latency_ms": np.percentile(search_times, 99) * 1000,
"min_latency_ms": np.min(search_times) * 1000,
"max_latency_ms": np.max(search_times) * 1000,
"queries_per_second": len(search_times) / sum(search_times) if sum(search_times) > 0 else 0
}
# Insert benchmark
insert_docs = []
for i in range(num_insert_docs):
doc = VectorDocument(
id=f"benchmark_doc_{i}",
content=f"Benchmark document {i}",
embedding=np.random.rand(dimension).astype(np.float32),
metadata={"benchmark": True, "batch": i // 100}
)
insert_docs.append(doc)
# Insert in batches
batch_size = 100
insert_times = []
for i in range(0, len(insert_docs), batch_size):
batch = insert_docs[i:i + batch_size]
async with self.monitor_performance("benchmark_insert") as monitor:
try:
await self.vector_db.add_documents(batch)
insert_times.append(monitor.duration)
except Exception as e:
self.logger.error(f"Insert benchmark error: {e}")
if insert_times:
total_docs = len(insert_docs)
total_time = sum(insert_times)
benchmark_results["insert_benchmark"] = {
"avg_batch_time_ms": np.mean(insert_times) * 1000,
"total_time_s": total_time,
"docs_per_second": total_docs / total_time if total_time > 0 else 0,
"avg_time_per_doc_ms": (total_time / total_docs) * 1000 if total_docs > 0 else 0
}
# Cleanup benchmark documents
try:
for doc in insert_docs:
await self.vector_db.delete_document(doc.id)
except:
pass # Best effort cleanup
self.logger.info("Performance benchmark completed")
return benchmark_results
class PerformanceMonitor:
"""Context manager for monitoring operation performance"""
def __init__(self, optimizer: VectorDatabaseOptimizer, operation_name: str):
self.optimizer = optimizer
self.operation_name = operation_name
self.start_time = None
self.start_memory = None
self.duration = 0
async def __aenter__(self):
self.start_time = time.time()
self.start_memory = psutil.Process().memory_info().rss / (1024 * 1024) # MB
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.duration = time.time() - self.start_time
end_memory = psutil.Process().memory_info().rss / (1024 * 1024) # MB
metrics = PerformanceMetrics(
operation=self.operation_name,
duration=self.duration,
vectors_processed=1, # Default, can be overridden
memory_usage_mb=end_memory,
timestamp=datetime.now(),
success=exc_type is None,
error_message=str(exc_val) if exc_val else None
)
self.optimizer.record_metrics(metrics)
# Example usage
async def demonstrate_optimization():
"""Demonstrate vector database optimization"""
# Create sample database
from vector_database_implementations import VectorDatabaseFactory
db = VectorDatabaseFactory.create_database(
"faiss",
dimension=384,
index_type="IVFFlat",
db_path="./optimization_test_db"
)
optimizer = VectorDatabaseOptimizer(db)
# Add sample data
documents = []
for i in range(1000):
doc = VectorDocument(
id=f"doc_{i}",
content=f"Sample document {i}",
embedding=np.random.rand(384).astype(np.float32),
metadata={"category": f"cat_{i % 5}"}
)
documents.append(doc)
# Monitor insertion
async with optimizer.monitor_performance("bulk_insert"):
await db.add_documents(documents)
# Run benchmark
benchmark_results = await optimizer.run_performance_benchmark(
num_search_queries=50,
num_insert_docs=100
)
print("Benchmark Results:")
print(json.dumps(benchmark_results, indent=2, default=str))
# Get optimization suggestions
suggestions = await optimizer.suggest_optimizations()
print(f"\nOptimization Suggestions:")
for suggestion in suggestions:
print(f"- {suggestion['message']} (Priority: {suggestion['priority']})")
# Get performance summary
summary = optimizer.get_performance_summary()
print(f"\nPerformance Summary:")
print(json.dumps(summary, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(demonstrate_optimization())
Production Best Practices
Scaling Strategies
- Horizontal Partitioning: Distribute vectors across multiple indices based on metadata
- Index Optimization: Choose appropriate index types based on data characteristics
- Batch Operations: Use batch inserts and searches for better throughput
- Asynchronous Processing: Implement async operations for better concurrency
Monitoring and Alerting
- Performance Metrics: Track search latency, throughput, and resource usage
- Quality Metrics: Monitor embedding quality and search relevance
- System Health: Monitor memory usage, disk space, and error rates
- Automated Alerts: Set up alerts for performance degradation
Security and Privacy
- Access Control: Implement proper authentication and authorization
- Data Encryption: Encrypt embeddings at rest and in transit
- Privacy Compliance: Ensure compliance with data protection regulations
- Audit Logging: Maintain comprehensive audit logs
Conclusion
Vector databases and embeddings are fundamental to modern AI applications, enabling semantic search, recommendation systems, and intelligent information retrieval. The implementations and optimization strategies covered in this guide provide a solid foundation for building production-ready vector database systems.
Key takeaways:
- Choose the Right Database: Select vector database based on scale, performance, and feature requirements
- Optimize for Your Use Case: Tune indices and parameters based on your specific data and query patterns
- Monitor Performance: Implement comprehensive monitoring and optimization
- Plan for Scale: Design with horizontal scaling and performance optimization in mind
As vector database technology continues to evolve, staying current with new developments and optimization techniques will be crucial for maintaining high-performance AI applications.