Build scalable computer vision systems with object detection, image classification, real-time processing, and deployment for production environments.
Computer vision has evolved from research experiments to production systems handling millions of images daily. After building CV systems for manufacturing, retail, and healthcare, I've learned that production computer vision requires careful model selection, efficient preprocessing pipelines, and robust deployment architectures. Here's your complete guide to production-ready visual AI.
Computer Vision Pipeline Architecture
Production CV System Design
# cv_pipeline.py
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from typing import Dict, List, Tuple, Any, Optional
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from abc import ABC, abstractmethod
@dataclass
class DetectionResult:
class_id: int
class_name: str
confidence: float
bbox: Tuple[int, int, int, int] # x, y, width, height
mask: Optional[np.ndarray] = None
@dataclass
class ProcessingResult:
image_id: str
detections: List[DetectionResult]
processing_time_ms: float
metadata: Dict[str, Any]
class CVProcessor(ABC):
@abstractmethod
async def process(self, image: np.ndarray, metadata: Dict) -> ProcessingResult:
pass
class ProductionCVPipeline:
def __init__(self,
preprocessor,
model_ensemble: List[CVProcessor],
postprocessor,
batch_size: int = 8):
self.preprocessor = preprocessor
self.model_ensemble = model_ensemble
self.postprocessor = postprocessor
self.batch_size = batch_size
# Performance monitoring
self.performance_metrics = {
'total_processed': 0,
'average_latency_ms': 0,
'error_count': 0,
'throughput_fps': 0
}
# Async executor for parallel processing
self.executor = ThreadPoolExecutor(max_workers=4)
async def process_single(self,
image: np.ndarray,
image_id: str,
metadata: Dict = None) -> ProcessingResult:
"""Process single image through complete pipeline"""
start_time = time.time()
try:
# Preprocessing
processed_image = await self._preprocess_image(image, metadata)
# Model ensemble processing
ensemble_results = []
for model in self.model_ensemble:
result = await model.process(processed_image, metadata)
ensemble_results.append(result)
# Combine ensemble results
combined_result = await self._combine_ensemble_results(
ensemble_results, image_id
)
# Postprocessing
final_result = await self.postprocessor.process(
combined_result, image, metadata
)
# Update performance metrics
processing_time = (time.time() - start_time) * 1000
self._update_metrics(processing_time, True)
return ProcessingResult(
image_id=image_id,
detections=final_result,
processing_time_ms=processing_time,
metadata=metadata or {}
)
except Exception as e:
self._update_metrics(0, False)
raise CVProcessingError(f"Pipeline processing failed: {str(e)}")
async def process_batch(self,
images: List[Tuple[np.ndarray, str]],
metadata: Dict = None) -> List[ProcessingResult]:
"""Process batch of images efficiently"""
results = []
# Process in batches
for i in range(0, len(images), self.batch_size):
batch = images[i:i + self.batch_size]
# Process batch in parallel
tasks = [
self.process_single(img, img_id, metadata)
for img, img_id in batch
]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
for result in batch_results:
if isinstance(result, Exception):
# Log error but continue processing
print(f"Batch processing error: {result}")
else:
results.append(result)
return results
async def _preprocess_image(self,
image: np.ndarray,
metadata: Dict) -> np.ndarray:
"""Preprocess image for model input"""
# Resize to standard input size
if metadata and 'target_size' in metadata:
target_size = metadata['target_size']
image = cv2.resize(image, target_size)
# Normalize pixel values
image = image.astype(np.float32) / 255.0
# Color space conversion if needed
if len(image.shape) == 3 and image.shape[2] == 3:
# Convert BGR to RGB for most models
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image
async def _combine_ensemble_results(self,
ensemble_results: List[ProcessingResult],
image_id: str) -> List[DetectionResult]:
"""Combine results from model ensemble using voting"""
if len(ensemble_results) == 1:
return ensemble_results[0].detections
# Implement Non-Maximum Suppression across ensemble
all_detections = []
for result in ensemble_results:
all_detections.extend(result.detections)
# Apply NMS
final_detections = self._apply_ensemble_nms(all_detections)
return final_detections
def _apply_ensemble_nms(self,
detections: List[DetectionResult],
iou_threshold: float = 0.5) -> List[DetectionResult]:
"""Apply Non-Maximum Suppression across ensemble results"""
if not detections:
return []
# Group by class
class_groups = {}
for detection in detections:
if detection.class_name not in class_groups:
class_groups[detection.class_name] = []
class_groups[detection.class_name].append(detection)
final_detections = []
for class_name, class_detections in class_groups.items():
# Sort by confidence
class_detections.sort(key=lambda x: x.confidence, reverse=True)
# Apply NMS
keep = []
while class_detections:
current = class_detections.pop(0)
keep.append(current)
# Remove overlapping detections
remaining = []
for detection in class_detections:
iou = self._calculate_iou(current.bbox, detection.bbox)
if iou < iou_threshold:
remaining.append(detection)
class_detections = remaining
final_detections.extend(keep)
return final_detections
def _calculate_iou(self,
bbox1: Tuple[int, int, int, int],
bbox2: Tuple[int, int, int, int]) -> float:
"""Calculate Intersection over Union"""
x1, y1, w1, h1 = bbox1
x2, y2, w2, h2 = bbox2
# Calculate intersection
xi = max(x1, x2)
yi = max(y1, y2)
wi = min(x1 + w1, x2 + w2) - xi
hi = min(y1 + h1, y2 + h2) - yi
if wi <= 0 or hi <= 0:
return 0.0
intersection = wi * hi
# Calculate union
area1 = w1 * h1
area2 = w2 * h2
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
class YOLOProcessor(CVProcessor):
def __init__(self, model_path: str, class_names: List[str]):
self.model = self._load_model(model_path)
self.class_names = class_names
self.input_size = 640
def _load_model(self, model_path: str):
"""Load YOLO model"""
try:
import ultralytics
model = ultralytics.YOLO(model_path)
return model
except ImportError:
# Fallback to PyTorch loading
model = torch.jit.load(model_path)
model.eval()
return model
async def process(self,
image: np.ndarray,
metadata: Dict) -> ProcessingResult:
"""Process image with YOLO model"""
start_time = time.time()
# Prepare input
input_tensor = self._prepare_input(image)
# Run inference
with torch.no_grad():
if hasattr(self.model, 'predict'):
# Ultralytics YOLO
results = self.model.predict(image, verbose=False)
detections = self._parse_ultralytics_results(results[0])
else:
# PyTorch YOLO
outputs = self.model(input_tensor)
detections = self._parse_pytorch_results(outputs, image.shape)
processing_time = (time.time() - start_time) * 1000
return ProcessingResult(
image_id=metadata.get('image_id', 'unknown'),
detections=detections,
processing_time_ms=processing_time,
metadata=metadata
)
def _prepare_input(self, image: np.ndarray) -> torch.Tensor:
"""Prepare image for YOLO input"""
# Resize maintaining aspect ratio
h, w = image.shape[:2]
scale = self.input_size / max(h, w)
new_h, new_w = int(h * scale), int(w * scale)
resized = cv2.resize(image, (new_w, new_h))
# Pad to square
padded = np.full((self.input_size, self.input_size, 3), 0.5, dtype=np.float32)
padded[:new_h, :new_w] = resized
# Convert to tensor
tensor = torch.from_numpy(padded).permute(2, 0, 1).unsqueeze(0)
return tensor
def _parse_ultralytics_results(self, result) -> List[DetectionResult]:
"""Parse Ultralytics YOLO results"""
detections = []
if result.boxes is not None:
boxes = result.boxes.xyxy.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
class_ids = result.boxes.cls.cpu().numpy().astype(int)
for i in range(len(boxes)):
x1, y1, x2, y2 = boxes[i]
detection = DetectionResult(
class_id=int(class_ids[i]),
class_name=self.class_names[int(class_ids[i])],
confidence=float(confidences[i]),
bbox=(int(x1), int(y1), int(x2-x1), int(y2-y1))
)
detections.append(detection)
return detections
Real-time Video Processing
# video_processor.py
class RealTimeVideoProcessor:
def __init__(self,
cv_pipeline: ProductionCVPipeline,
buffer_size: int = 30,
skip_frames: int = 1):
self.cv_pipeline = cv_pipeline
self.buffer_size = buffer_size
self.skip_frames = skip_frames
# Video processing state
self.frame_buffer = asyncio.Queue(maxsize=buffer_size)
self.results_buffer = asyncio.Queue()
self.processing_stats = {
'frames_processed': 0,
'frames_dropped': 0,
'average_fps': 0,
'current_latency_ms': 0
}
# Background tasks
self.processing_task = None
self.is_running = False
async def start_processing(self, video_source):
"""Start real-time video processing"""
self.is_running = True
# Start background processing task
self.processing_task = asyncio.create_task(
self._process_frames_continuously()
)
# Start frame capture
await self._capture_frames(video_source)
async def stop_processing(self):
"""Stop video processing"""
self.is_running = False
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
async def _capture_frames(self, video_source):
"""Capture frames from video source"""
cap = cv2.VideoCapture(video_source)
frame_count = 0
try:
while self.is_running and cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Skip frames for performance
if frame_count % (self.skip_frames + 1) != 0:
frame_count += 1
continue
# Add timestamp
timestamp = time.time()
frame_data = {
'frame': frame,
'timestamp': timestamp,
'frame_id': frame_count
}
try:
# Add to buffer (non-blocking)
self.frame_buffer.put_nowait(frame_data)
except asyncio.QueueFull:
# Drop oldest frame if buffer full
try:
self.frame_buffer.get_nowait()
self.frame_buffer.put_nowait(frame_data)
self.processing_stats['frames_dropped'] += 1
except asyncio.QueueEmpty:
pass
frame_count += 1
# Small delay to prevent overwhelming
await asyncio.sleep(0.001)
finally:
cap.release()
async def _process_frames_continuously(self):
"""Process frames continuously in background"""
while self.is_running:
try:
# Get frame from buffer
frame_data = await asyncio.wait_for(
self.frame_buffer.get(),
timeout=1.0
)
# Process frame
result = await self.cv_pipeline.process_single(
frame_data['frame'],
str(frame_data['frame_id']),
{'timestamp': frame_data['timestamp']}
)
# Calculate latency
current_time = time.time()
latency_ms = (current_time - frame_data['timestamp']) * 1000
# Update stats
self.processing_stats['frames_processed'] += 1
self.processing_stats['current_latency_ms'] = latency_ms
# Store result
try:
self.results_buffer.put_nowait({
'frame_id': frame_data['frame_id'],
'result': result,
'latency_ms': latency_ms
})
except asyncio.QueueFull:
# Remove oldest result
try:
self.results_buffer.get_nowait()
self.results_buffer.put_nowait({
'frame_id': frame_data['frame_id'],
'result': result,
'latency_ms': latency_ms
})
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Frame processing error: {e}")
continue
async def get_latest_results(self, count: int = 1) -> List[Dict]:
"""Get latest processing results"""
results = []
for _ in range(min(count, self.results_buffer.qsize())):
try:
result = self.results_buffer.get_nowait()
results.append(result)
except asyncio.QueueEmpty:
break
return results
Model Optimization and Deployment
Model Optimization Pipeline
# model_optimization.py
import torch
import torch.quantization as quant
import tensorrt as trt
from typing import Dict, List, Any
import onnx
import onnxruntime as ort
class ModelOptimizer:
def __init__(self):
self.optimization_configs = {
'quantization': {
'dynamic': True,
'static': True,
'qat': True # Quantization Aware Training
},
'pruning': {
'structured': True,
'unstructured': True
},
'compilation': {
'tensorrt': True,
'onnx': True,
'torchscript': True
}
}
def optimize_model(self,
model: torch.nn.Module,
sample_input: torch.Tensor,
optimization_level: str = 'balanced') -> Dict:
"""Apply comprehensive model optimization"""
optimized_models = {}
# Original model baseline
original_size = self._get_model_size(model)
original_latency = self._benchmark_model(model, sample_input)
optimized_models['original'] = {
'model': model,
'size_mb': original_size,
'latency_ms': original_latency,
'accuracy': 1.0 # Baseline
}
if optimization_level in ['balanced', 'aggressive']:
# Dynamic quantization
quantized_model = self._apply_dynamic_quantization(model)
quantized_size = self._get_model_size(quantized_model)
quantized_latency = self._benchmark_model(quantized_model, sample_input)
optimized_models['quantized'] = {
'model': quantized_model,
'size_mb': quantized_size,
'latency_ms': quantized_latency,
'compression_ratio': original_size / quantized_size
}
if optimization_level == 'aggressive':
# Model pruning
pruned_model = self._apply_pruning(model, sparsity=0.3)
pruned_size = self._get_model_size(pruned_model)
pruned_latency = self._benchmark_model(pruned_model, sample_input)
optimized_models['pruned'] = {
'model': pruned_model,
'size_mb': pruned_size,
'latency_ms': pruned_latency,
'compression_ratio': original_size / pruned_size
}
# TensorRT optimization (if available)
try:
trt_engine = self._convert_to_tensorrt(model, sample_input)
if trt_engine:
optimized_models['tensorrt'] = {
'engine': trt_engine,
'size_mb': self._get_engine_size(trt_engine),
'latency_ms': self._benchmark_tensorrt(trt_engine, sample_input)
}
except Exception as e:
print(f"TensorRT optimization failed: {e}")
return optimized_models
def _apply_dynamic_quantization(self, model: torch.nn.Module):
"""Apply dynamic quantization"""
# Specify layers to quantize
quantized_model = quant.quantize_dynamic(
model,
{torch.nn.Linear, torch.nn.Conv2d},
dtype=torch.qint8
)
return quantized_model
def _apply_pruning(self,
model: torch.nn.Module,
sparsity: float = 0.3):
"""Apply structured pruning"""
import torch.nn.utils.prune as prune
# Create a copy for pruning
pruned_model = torch.nn.utils.deepcopy(model)
# Apply pruning to Conv2d and Linear layers
for name, module in pruned_model.named_modules():
if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
prune.l1_unstructured(module, name='weight', amount=sparsity)
prune.remove(module, 'weight')
return pruned_model
def _convert_to_tensorrt(self,
model: torch.nn.Module,
sample_input: torch.Tensor):
"""Convert model to TensorRT engine"""
try:
# First convert to ONNX
onnx_path = "/tmp/model.onnx"
torch.onnx.export(
model,
sample_input,
onnx_path,
dynamic_axes={'input': {0: 'batch_size'}},
do_constant_folding=True
)
# Convert ONNX to TensorRT
logger = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(logger)
network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
parser = trt.OnnxParser(network, logger)
# Parse ONNX model
with open(onnx_path, 'rb') as model_file:
if not parser.parse(model_file.read()):
print("Failed to parse ONNX model")
return None
# Build engine
config = builder.create_builder_config()
config.max_workspace_size = 1 << 28 # 256MB
if builder.platform_has_fast_fp16:
config.set_flag(trt.BuilderFlag.FP16)
engine = builder.build_engine(network, config)
return engine
except Exception as e:
print(f"TensorRT conversion error: {e}")
return None
def _benchmark_model(self,
model: torch.nn.Module,
sample_input: torch.Tensor,
iterations: int = 100) -> float:
"""Benchmark model inference time"""
model.eval()
# Warmup
with torch.no_grad():
for _ in range(10):
_ = model(sample_input)
# Benchmark
start_time = time.time()
with torch.no_grad():
for _ in range(iterations):
_ = model(sample_input)
total_time = time.time() - start_time
average_time_ms = (total_time / iterations) * 1000
return average_time_ms
Edge Deployment Optimization
# edge_deployment.py
class EdgeDeploymentOptimizer:
def __init__(self):
self.deployment_targets = {
'mobile': {
'max_model_size_mb': 50,
'max_latency_ms': 100,
'quantization': 'int8'
},
'embedded': {
'max_model_size_mb': 10,
'max_latency_ms': 50,
'quantization': 'int8'
},
'edge_server': {
'max_model_size_mb': 500,
'max_latency_ms': 20,
'quantization': 'fp16'
}
}
def optimize_for_target(self,
model: torch.nn.Module,
target: str,
sample_data: torch.Tensor) -> Dict:
"""Optimize model for specific deployment target"""
if target not in self.deployment_targets:
raise ValueError(f"Unknown target: {target}")
constraints = self.deployment_targets[target]
# Start with baseline
current_model = model
optimizations_applied = []
# Check if model meets constraints
current_size = self._get_model_size(current_model)
current_latency = self._benchmark_inference(current_model, sample_data)
results = {
'original_size_mb': current_size,
'original_latency_ms': current_latency,
'target_constraints': constraints,
'optimizations_applied': []
}
# Apply optimizations if needed
if current_size > constraints['max_model_size_mb']:
# Apply quantization
if constraints['quantization'] == 'int8':
current_model = self._apply_int8_quantization(current_model)
optimizations_applied.append('int8_quantization')
elif constraints['quantization'] == 'fp16':
current_model = self._apply_fp16_conversion(current_model)
optimizations_applied.append('fp16_conversion')
# If still too large, apply pruning
current_size = self._get_model_size(current_model)
if current_size > constraints['max_model_size_mb']:
sparsity_levels = [0.3, 0.5, 0.7]
for sparsity in sparsity_levels:
pruned_model = self._apply_pruning(current_model, sparsity)
pruned_size = self._get_model_size(pruned_model)
if pruned_size <= constraints['max_model_size_mb']:
current_model = pruned_model
optimizations_applied.append(f'pruning_{sparsity}')
break
# Check latency constraints
current_latency = self._benchmark_inference(current_model, sample_data)
if current_latency > constraints['max_latency_ms']:
# Try model distillation or architecture search
distilled_model = self._apply_knowledge_distillation(current_model)
distilled_latency = self._benchmark_inference(distilled_model, sample_data)
if distilled_latency <= constraints['max_latency_ms']:
current_model = distilled_model
optimizations_applied.append('knowledge_distillation')
# Final metrics
results.update({
'final_model': current_model,
'final_size_mb': self._get_model_size(current_model),
'final_latency_ms': self._benchmark_inference(current_model, sample_data),
'optimizations_applied': optimizations_applied,
'meets_constraints': self._check_constraints(current_model, sample_data, constraints)
})
return results
def create_deployment_package(self,
optimized_model: torch.nn.Module,
target: str,
metadata: Dict) -> Dict:
"""Create complete deployment package"""
package = {
'model': optimized_model,
'target': target,
'metadata': metadata,
'inference_config': self._create_inference_config(target),
'preprocessing_pipeline': self._create_preprocessing_config(target),
'deployment_script': self._generate_deployment_script(target)
}
return package
def _create_inference_config(self, target: str) -> Dict:
"""Create inference configuration for target"""
base_config = {
'batch_size': 1,
'num_threads': 1,
'use_gpu': False
}
if target == 'mobile':
base_config.update({
'num_threads': 4,
'use_nnapi': True, # Android
'use_metal': True # iOS
})
elif target == 'embedded':
base_config.update({
'num_threads': 2,
'memory_fraction': 0.8
})
elif target == 'edge_server':
base_config.update({
'batch_size': 4,
'num_threads': 8,
'use_gpu': True
})
return base_config
Production Monitoring and Maintenance
CV Model Monitoring
# cv_monitoring.py
class CVModelMonitor:
def __init__(self):
self.metrics_store = {}
self.drift_detectors = {}
self.alert_thresholds = {
'accuracy_drop': 0.05,
'latency_increase': 0.3,
'error_rate_threshold': 0.02
}
def track_inference(self,
model_id: str,
input_image: np.ndarray,
prediction: ProcessingResult,
ground_truth: Optional[List[DetectionResult]] = None):
"""Track single inference for monitoring"""
timestamp = time.time()
# Image statistics
image_stats = self._analyze_image(input_image)
# Prediction statistics
pred_stats = self._analyze_predictions(prediction)
# Accuracy metrics (if ground truth available)
accuracy_metrics = None
if ground_truth:
accuracy_metrics = self._calculate_accuracy(
prediction.detections,
ground_truth
)
# Store metrics
if model_id not in self.metrics_store:
self.metrics_store[model_id] = []
self.metrics_store[model_id].append({
'timestamp': timestamp,
'image_stats': image_stats,
'prediction_stats': pred_stats,
'accuracy_metrics': accuracy_metrics,
'processing_time_ms': prediction.processing_time_ms
})
# Check for drift and anomalies
self._check_drift(model_id)
self._check_anomalies(model_id, image_stats, pred_stats)
def _analyze_image(self, image: np.ndarray) -> Dict:
"""Analyze image characteristics"""
# Basic statistics
stats = {
'mean_brightness': float(np.mean(image)),
'std_brightness': float(np.std(image)),
'height': image.shape[0],
'width': image.shape[1],
'channels': image.shape[2] if len(image.shape) > 2 else 1
}
# Color distribution
if len(image.shape) == 3:
stats['color_hist'] = [
np.histogram(image[:,:,i], bins=16)[0].tolist()
for i in range(3)
]
# Edge density (complexity measure)
gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
edges = cv2.Canny(gray.astype(np.uint8), 50, 150)
stats['edge_density'] = float(np.sum(edges > 0) / edges.size)
return stats
def _analyze_predictions(self, result: ProcessingResult) -> Dict:
"""Analyze prediction characteristics"""
stats = {
'num_detections': len(result.detections),
'avg_confidence': 0.0,
'confidence_distribution': {},
'class_distribution': {}
}
if result.detections:
confidences = [d.confidence for d in result.detections]
stats['avg_confidence'] = float(np.mean(confidences))
stats['min_confidence'] = float(np.min(confidences))
stats['max_confidence'] = float(np.max(confidences))
# Confidence histogram
hist, bins = np.histogram(confidences, bins=10, range=(0, 1))
stats['confidence_distribution'] = {
f'{bins[i]:.1f}-{bins[i+1]:.1f}': int(hist[i])
for i in range(len(hist))
}
# Class distribution
class_counts = {}
for detection in result.detections:
class_name = detection.class_name
class_counts[class_name] = class_counts.get(class_name, 0) + 1
stats['class_distribution'] = class_counts
return stats
def _check_drift(self, model_id: str):
"""Check for model drift"""
if model_id not in self.metrics_store:
return
recent_metrics = self.metrics_store[model_id][-100:] # Last 100 inferences
if len(recent_metrics) < 50:
return # Not enough data
# Split into two periods for comparison
mid_point = len(recent_metrics) // 2
period1 = recent_metrics[:mid_point]
period2 = recent_metrics[mid_point:]
# Compare distributions
drift_detected = False
drift_metrics = {}
# Check prediction confidence drift
conf1 = [m['prediction_stats']['avg_confidence'] for m in period1 if m['prediction_stats']['avg_confidence'] > 0]
conf2 = [m['prediction_stats']['avg_confidence'] for m in period2 if m['prediction_stats']['avg_confidence'] > 0]
if conf1 and conf2:
from scipy import stats
stat, p_value = stats.ks_2samp(conf1, conf2)
if p_value < 0.05: # Significant difference
drift_detected = True
drift_metrics['confidence_drift'] = {
'p_value': p_value,
'mean_change': np.mean(conf2) - np.mean(conf1)
}
# Check image characteristic drift
brightness1 = [m['image_stats']['mean_brightness'] for m in period1]
brightness2 = [m['image_stats']['mean_brightness'] for m in period2]
if brightness1 and brightness2:
stat, p_value = stats.ks_2samp(brightness1, brightness2)
if p_value < 0.05:
drift_detected = True
drift_metrics['brightness_drift'] = {
'p_value': p_value,
'mean_change': np.mean(brightness2) - np.mean(brightness1)
}
if drift_detected:
self._send_drift_alert(model_id, drift_metrics)
def generate_performance_report(self,
model_id: str,
time_range_hours: int = 24) -> Dict:
"""Generate comprehensive performance report"""
if model_id not in self.metrics_store:
return {'error': 'No data available'}
# Filter data by time range
current_time = time.time()
time_threshold = current_time - (time_range_hours * 3600)
relevant_metrics = [
m for m in self.metrics_store[model_id]
if m['timestamp'] >= time_threshold
]
if not relevant_metrics:
return {'error': 'No recent data available'}
report = {
'model_id': model_id,
'time_range_hours': time_range_hours,
'total_inferences': len(relevant_metrics),
'performance_metrics': self._calculate_performance_metrics(relevant_metrics),
'data_quality_metrics': self._calculate_data_quality_metrics(relevant_metrics),
'error_analysis': self._analyze_errors(relevant_metrics),
'recommendations': self._generate_recommendations(relevant_metrics)
}
return report
Best Practices Checklist
Conclusion
Production computer vision requires more than accurate models—it demands efficient pipelines, robust deployment strategies, and comprehensive monitoring. By implementing proper preprocessing, model optimization, real-time processing capabilities, and continuous monitoring, you can build CV systems that perform reliably at scale. Remember, the goal is not just accuracy, but consistent performance under real-world conditions with varying data quality and computational constraints.