Production Computer Vision Systems

David Childs

Build scalable computer vision systems with object detection, image classification, real-time processing, and deployment for production environments.

Computer vision has evolved from research experiments to production systems handling millions of images daily. After building CV systems for manufacturing, retail, and healthcare, I've learned that production computer vision requires careful model selection, efficient preprocessing pipelines, and robust deployment architectures. Here's your complete guide to production-ready visual AI.

Computer Vision Pipeline Architecture

Production CV System Design

# cv_pipeline.py
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from typing import Dict, List, Tuple, Any, Optional
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from abc import ABC, abstractmethod

@dataclass
class DetectionResult:
    class_id: int
    class_name: str
    confidence: float
    bbox: Tuple[int, int, int, int]  # x, y, width, height
    mask: Optional[np.ndarray] = None

@dataclass
class ProcessingResult:
    image_id: str
    detections: List[DetectionResult]
    processing_time_ms: float
    metadata: Dict[str, Any]

class CVProcessor(ABC):
    @abstractmethod
    async def process(self, image: np.ndarray, metadata: Dict) -> ProcessingResult:
        pass

class ProductionCVPipeline:
    def __init__(self, 
                 preprocessor,
                 model_ensemble: List[CVProcessor],
                 postprocessor,
                 batch_size: int = 8):
        
        self.preprocessor = preprocessor
        self.model_ensemble = model_ensemble
        self.postprocessor = postprocessor
        self.batch_size = batch_size
        
        # Performance monitoring
        self.performance_metrics = {
            'total_processed': 0,
            'average_latency_ms': 0,
            'error_count': 0,
            'throughput_fps': 0
        }
        
        # Async executor for parallel processing
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    async def process_single(self, 
                           image: np.ndarray,
                           image_id: str,
                           metadata: Dict = None) -> ProcessingResult:
        """Process single image through complete pipeline"""
        
        start_time = time.time()
        
        try:
            # Preprocessing
            processed_image = await self._preprocess_image(image, metadata)
            
            # Model ensemble processing
            ensemble_results = []
            for model in self.model_ensemble:
                result = await model.process(processed_image, metadata)
                ensemble_results.append(result)
            
            # Combine ensemble results
            combined_result = await self._combine_ensemble_results(
                ensemble_results, image_id
            )
            
            # Postprocessing
            final_result = await self.postprocessor.process(
                combined_result, image, metadata
            )
            
            # Update performance metrics
            processing_time = (time.time() - start_time) * 1000
            self._update_metrics(processing_time, True)
            
            return ProcessingResult(
                image_id=image_id,
                detections=final_result,
                processing_time_ms=processing_time,
                metadata=metadata or {}
            )
            
        except Exception as e:
            self._update_metrics(0, False)
            raise CVProcessingError(f"Pipeline processing failed: {str(e)}")
    
    async def process_batch(self, 
                          images: List[Tuple[np.ndarray, str]],
                          metadata: Dict = None) -> List[ProcessingResult]:
        """Process batch of images efficiently"""
        
        results = []
        
        # Process in batches
        for i in range(0, len(images), self.batch_size):
            batch = images[i:i + self.batch_size]
            
            # Process batch in parallel
            tasks = [
                self.process_single(img, img_id, metadata)
                for img, img_id in batch
            ]
            
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle exceptions
            for result in batch_results:
                if isinstance(result, Exception):
                    # Log error but continue processing
                    print(f"Batch processing error: {result}")
                else:
                    results.append(result)
        
        return results
    
    async def _preprocess_image(self, 
                              image: np.ndarray,
                              metadata: Dict) -> np.ndarray:
        """Preprocess image for model input"""
        
        # Resize to standard input size
        if metadata and 'target_size' in metadata:
            target_size = metadata['target_size']
            image = cv2.resize(image, target_size)
        
        # Normalize pixel values
        image = image.astype(np.float32) / 255.0
        
        # Color space conversion if needed
        if len(image.shape) == 3 and image.shape[2] == 3:
            # Convert BGR to RGB for most models
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        return image
    
    async def _combine_ensemble_results(self,
                                      ensemble_results: List[ProcessingResult],
                                      image_id: str) -> List[DetectionResult]:
        """Combine results from model ensemble using voting"""
        
        if len(ensemble_results) == 1:
            return ensemble_results[0].detections
        
        # Implement Non-Maximum Suppression across ensemble
        all_detections = []
        for result in ensemble_results:
            all_detections.extend(result.detections)
        
        # Apply NMS
        final_detections = self._apply_ensemble_nms(all_detections)
        
        return final_detections
    
    def _apply_ensemble_nms(self, 
                          detections: List[DetectionResult],
                          iou_threshold: float = 0.5) -> List[DetectionResult]:
        """Apply Non-Maximum Suppression across ensemble results"""
        
        if not detections:
            return []
        
        # Group by class
        class_groups = {}
        for detection in detections:
            if detection.class_name not in class_groups:
                class_groups[detection.class_name] = []
            class_groups[detection.class_name].append(detection)
        
        final_detections = []
        
        for class_name, class_detections in class_groups.items():
            # Sort by confidence
            class_detections.sort(key=lambda x: x.confidence, reverse=True)
            
            # Apply NMS
            keep = []
            while class_detections:
                current = class_detections.pop(0)
                keep.append(current)
                
                # Remove overlapping detections
                remaining = []
                for detection in class_detections:
                    iou = self._calculate_iou(current.bbox, detection.bbox)
                    if iou < iou_threshold:
                        remaining.append(detection)
                
                class_detections = remaining
            
            final_detections.extend(keep)
        
        return final_detections
    
    def _calculate_iou(self, 
                      bbox1: Tuple[int, int, int, int],
                      bbox2: Tuple[int, int, int, int]) -> float:
        """Calculate Intersection over Union"""
        
        x1, y1, w1, h1 = bbox1
        x2, y2, w2, h2 = bbox2
        
        # Calculate intersection
        xi = max(x1, x2)
        yi = max(y1, y2)
        wi = min(x1 + w1, x2 + w2) - xi
        hi = min(y1 + h1, y2 + h2) - yi
        
        if wi <= 0 or hi <= 0:
            return 0.0
        
        intersection = wi * hi
        
        # Calculate union
        area1 = w1 * h1
        area2 = w2 * h2
        union = area1 + area2 - intersection
        
        return intersection / union if union > 0 else 0.0

class YOLOProcessor(CVProcessor):
    def __init__(self, model_path: str, class_names: List[str]):
        self.model = self._load_model(model_path)
        self.class_names = class_names
        self.input_size = 640
        
    def _load_model(self, model_path: str):
        """Load YOLO model"""
        try:
            import ultralytics
            model = ultralytics.YOLO(model_path)
            return model
        except ImportError:
            # Fallback to PyTorch loading
            model = torch.jit.load(model_path)
            model.eval()
            return model
    
    async def process(self, 
                     image: np.ndarray,
                     metadata: Dict) -> ProcessingResult:
        """Process image with YOLO model"""
        
        start_time = time.time()
        
        # Prepare input
        input_tensor = self._prepare_input(image)
        
        # Run inference
        with torch.no_grad():
            if hasattr(self.model, 'predict'):
                # Ultralytics YOLO
                results = self.model.predict(image, verbose=False)
                detections = self._parse_ultralytics_results(results[0])
            else:
                # PyTorch YOLO
                outputs = self.model(input_tensor)
                detections = self._parse_pytorch_results(outputs, image.shape)
        
        processing_time = (time.time() - start_time) * 1000
        
        return ProcessingResult(
            image_id=metadata.get('image_id', 'unknown'),
            detections=detections,
            processing_time_ms=processing_time,
            metadata=metadata
        )
    
    def _prepare_input(self, image: np.ndarray) -> torch.Tensor:
        """Prepare image for YOLO input"""
        
        # Resize maintaining aspect ratio
        h, w = image.shape[:2]
        scale = self.input_size / max(h, w)
        new_h, new_w = int(h * scale), int(w * scale)
        
        resized = cv2.resize(image, (new_w, new_h))
        
        # Pad to square
        padded = np.full((self.input_size, self.input_size, 3), 0.5, dtype=np.float32)
        padded[:new_h, :new_w] = resized
        
        # Convert to tensor
        tensor = torch.from_numpy(padded).permute(2, 0, 1).unsqueeze(0)
        
        return tensor
    
    def _parse_ultralytics_results(self, result) -> List[DetectionResult]:
        """Parse Ultralytics YOLO results"""
        
        detections = []
        
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()
            confidences = result.boxes.conf.cpu().numpy()
            class_ids = result.boxes.cls.cpu().numpy().astype(int)
            
            for i in range(len(boxes)):
                x1, y1, x2, y2 = boxes[i]
                
                detection = DetectionResult(
                    class_id=int(class_ids[i]),
                    class_name=self.class_names[int(class_ids[i])],
                    confidence=float(confidences[i]),
                    bbox=(int(x1), int(y1), int(x2-x1), int(y2-y1))
                )
                
                detections.append(detection)
        
        return detections

Real-time Video Processing

# video_processor.py
class RealTimeVideoProcessor:
    def __init__(self, 
                 cv_pipeline: ProductionCVPipeline,
                 buffer_size: int = 30,
                 skip_frames: int = 1):
        
        self.cv_pipeline = cv_pipeline
        self.buffer_size = buffer_size
        self.skip_frames = skip_frames
        
        # Video processing state
        self.frame_buffer = asyncio.Queue(maxsize=buffer_size)
        self.results_buffer = asyncio.Queue()
        self.processing_stats = {
            'frames_processed': 0,
            'frames_dropped': 0,
            'average_fps': 0,
            'current_latency_ms': 0
        }
        
        # Background tasks
        self.processing_task = None
        self.is_running = False
    
    async def start_processing(self, video_source):
        """Start real-time video processing"""
        
        self.is_running = True
        
        # Start background processing task
        self.processing_task = asyncio.create_task(
            self._process_frames_continuously()
        )
        
        # Start frame capture
        await self._capture_frames(video_source)
    
    async def stop_processing(self):
        """Stop video processing"""
        
        self.is_running = False
        
        if self.processing_task:
            self.processing_task.cancel()
            try:
                await self.processing_task
            except asyncio.CancelledError:
                pass
    
    async def _capture_frames(self, video_source):
        """Capture frames from video source"""
        
        cap = cv2.VideoCapture(video_source)
        frame_count = 0
        
        try:
            while self.is_running and cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Skip frames for performance
                if frame_count % (self.skip_frames + 1) != 0:
                    frame_count += 1
                    continue
                
                # Add timestamp
                timestamp = time.time()
                frame_data = {
                    'frame': frame,
                    'timestamp': timestamp,
                    'frame_id': frame_count
                }
                
                try:
                    # Add to buffer (non-blocking)
                    self.frame_buffer.put_nowait(frame_data)
                except asyncio.QueueFull:
                    # Drop oldest frame if buffer full
                    try:
                        self.frame_buffer.get_nowait()
                        self.frame_buffer.put_nowait(frame_data)
                        self.processing_stats['frames_dropped'] += 1
                    except asyncio.QueueEmpty:
                        pass
                
                frame_count += 1
                
                # Small delay to prevent overwhelming
                await asyncio.sleep(0.001)
        
        finally:
            cap.release()
    
    async def _process_frames_continuously(self):
        """Process frames continuously in background"""
        
        while self.is_running:
            try:
                # Get frame from buffer
                frame_data = await asyncio.wait_for(
                    self.frame_buffer.get(),
                    timeout=1.0
                )
                
                # Process frame
                result = await self.cv_pipeline.process_single(
                    frame_data['frame'],
                    str(frame_data['frame_id']),
                    {'timestamp': frame_data['timestamp']}
                )
                
                # Calculate latency
                current_time = time.time()
                latency_ms = (current_time - frame_data['timestamp']) * 1000
                
                # Update stats
                self.processing_stats['frames_processed'] += 1
                self.processing_stats['current_latency_ms'] = latency_ms
                
                # Store result
                try:
                    self.results_buffer.put_nowait({
                        'frame_id': frame_data['frame_id'],
                        'result': result,
                        'latency_ms': latency_ms
                    })
                except asyncio.QueueFull:
                    # Remove oldest result
                    try:
                        self.results_buffer.get_nowait()
                        self.results_buffer.put_nowait({
                            'frame_id': frame_data['frame_id'],
                            'result': result,
                            'latency_ms': latency_ms
                        })
                    except asyncio.QueueEmpty:
                        pass
                
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Frame processing error: {e}")
                continue
    
    async def get_latest_results(self, count: int = 1) -> List[Dict]:
        """Get latest processing results"""
        
        results = []
        for _ in range(min(count, self.results_buffer.qsize())):
            try:
                result = self.results_buffer.get_nowait()
                results.append(result)
            except asyncio.QueueEmpty:
                break
        
        return results

Model Optimization and Deployment

Model Optimization Pipeline

# model_optimization.py
import torch
import torch.quantization as quant
import tensorrt as trt
from typing import Dict, List, Any
import onnx
import onnxruntime as ort

class ModelOptimizer:
    def __init__(self):
        self.optimization_configs = {
            'quantization': {
                'dynamic': True,
                'static': True,
                'qat': True  # Quantization Aware Training
            },
            'pruning': {
                'structured': True,
                'unstructured': True
            },
            'compilation': {
                'tensorrt': True,
                'onnx': True,
                'torchscript': True
            }
        }
    
    def optimize_model(self, 
                      model: torch.nn.Module,
                      sample_input: torch.Tensor,
                      optimization_level: str = 'balanced') -> Dict:
        """Apply comprehensive model optimization"""
        
        optimized_models = {}
        
        # Original model baseline
        original_size = self._get_model_size(model)
        original_latency = self._benchmark_model(model, sample_input)
        
        optimized_models['original'] = {
            'model': model,
            'size_mb': original_size,
            'latency_ms': original_latency,
            'accuracy': 1.0  # Baseline
        }
        
        if optimization_level in ['balanced', 'aggressive']:
            # Dynamic quantization
            quantized_model = self._apply_dynamic_quantization(model)
            quantized_size = self._get_model_size(quantized_model)
            quantized_latency = self._benchmark_model(quantized_model, sample_input)
            
            optimized_models['quantized'] = {
                'model': quantized_model,
                'size_mb': quantized_size,
                'latency_ms': quantized_latency,
                'compression_ratio': original_size / quantized_size
            }
        
        if optimization_level == 'aggressive':
            # Model pruning
            pruned_model = self._apply_pruning(model, sparsity=0.3)
            pruned_size = self._get_model_size(pruned_model)
            pruned_latency = self._benchmark_model(pruned_model, sample_input)
            
            optimized_models['pruned'] = {
                'model': pruned_model,
                'size_mb': pruned_size,
                'latency_ms': pruned_latency,
                'compression_ratio': original_size / pruned_size
            }
        
        # TensorRT optimization (if available)
        try:
            trt_engine = self._convert_to_tensorrt(model, sample_input)
            if trt_engine:
                optimized_models['tensorrt'] = {
                    'engine': trt_engine,
                    'size_mb': self._get_engine_size(trt_engine),
                    'latency_ms': self._benchmark_tensorrt(trt_engine, sample_input)
                }
        except Exception as e:
            print(f"TensorRT optimization failed: {e}")
        
        return optimized_models
    
    def _apply_dynamic_quantization(self, model: torch.nn.Module):
        """Apply dynamic quantization"""
        
        # Specify layers to quantize
        quantized_model = quant.quantize_dynamic(
            model, 
            {torch.nn.Linear, torch.nn.Conv2d}, 
            dtype=torch.qint8
        )
        
        return quantized_model
    
    def _apply_pruning(self, 
                      model: torch.nn.Module, 
                      sparsity: float = 0.3):
        """Apply structured pruning"""
        
        import torch.nn.utils.prune as prune
        
        # Create a copy for pruning
        pruned_model = torch.nn.utils.deepcopy(model)
        
        # Apply pruning to Conv2d and Linear layers
        for name, module in pruned_model.named_modules():
            if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
                prune.l1_unstructured(module, name='weight', amount=sparsity)
                prune.remove(module, 'weight')
        
        return pruned_model
    
    def _convert_to_tensorrt(self, 
                           model: torch.nn.Module, 
                           sample_input: torch.Tensor):
        """Convert model to TensorRT engine"""
        
        try:
            # First convert to ONNX
            onnx_path = "/tmp/model.onnx"
            torch.onnx.export(
                model,
                sample_input,
                onnx_path,
                dynamic_axes={'input': {0: 'batch_size'}},
                do_constant_folding=True
            )
            
            # Convert ONNX to TensorRT
            logger = trt.Logger(trt.Logger.WARNING)
            builder = trt.Builder(logger)
            network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
            parser = trt.OnnxParser(network, logger)
            
            # Parse ONNX model
            with open(onnx_path, 'rb') as model_file:
                if not parser.parse(model_file.read()):
                    print("Failed to parse ONNX model")
                    return None
            
            # Build engine
            config = builder.create_builder_config()
            config.max_workspace_size = 1 << 28  # 256MB
            
            if builder.platform_has_fast_fp16:
                config.set_flag(trt.BuilderFlag.FP16)
            
            engine = builder.build_engine(network, config)
            
            return engine
            
        except Exception as e:
            print(f"TensorRT conversion error: {e}")
            return None
    
    def _benchmark_model(self, 
                        model: torch.nn.Module, 
                        sample_input: torch.Tensor,
                        iterations: int = 100) -> float:
        """Benchmark model inference time"""
        
        model.eval()
        
        # Warmup
        with torch.no_grad():
            for _ in range(10):
                _ = model(sample_input)
        
        # Benchmark
        start_time = time.time()
        with torch.no_grad():
            for _ in range(iterations):
                _ = model(sample_input)
        
        total_time = time.time() - start_time
        average_time_ms = (total_time / iterations) * 1000
        
        return average_time_ms

Edge Deployment Optimization

# edge_deployment.py
class EdgeDeploymentOptimizer:
    def __init__(self):
        self.deployment_targets = {
            'mobile': {
                'max_model_size_mb': 50,
                'max_latency_ms': 100,
                'quantization': 'int8'
            },
            'embedded': {
                'max_model_size_mb': 10,
                'max_latency_ms': 50,
                'quantization': 'int8'
            },
            'edge_server': {
                'max_model_size_mb': 500,
                'max_latency_ms': 20,
                'quantization': 'fp16'
            }
        }
    
    def optimize_for_target(self, 
                           model: torch.nn.Module,
                           target: str,
                           sample_data: torch.Tensor) -> Dict:
        """Optimize model for specific deployment target"""
        
        if target not in self.deployment_targets:
            raise ValueError(f"Unknown target: {target}")
        
        constraints = self.deployment_targets[target]
        
        # Start with baseline
        current_model = model
        optimizations_applied = []
        
        # Check if model meets constraints
        current_size = self._get_model_size(current_model)
        current_latency = self._benchmark_inference(current_model, sample_data)
        
        results = {
            'original_size_mb': current_size,
            'original_latency_ms': current_latency,
            'target_constraints': constraints,
            'optimizations_applied': []
        }
        
        # Apply optimizations if needed
        if current_size > constraints['max_model_size_mb']:
            # Apply quantization
            if constraints['quantization'] == 'int8':
                current_model = self._apply_int8_quantization(current_model)
                optimizations_applied.append('int8_quantization')
            elif constraints['quantization'] == 'fp16':
                current_model = self._apply_fp16_conversion(current_model)
                optimizations_applied.append('fp16_conversion')
        
        # If still too large, apply pruning
        current_size = self._get_model_size(current_model)
        if current_size > constraints['max_model_size_mb']:
            sparsity_levels = [0.3, 0.5, 0.7]
            for sparsity in sparsity_levels:
                pruned_model = self._apply_pruning(current_model, sparsity)
                pruned_size = self._get_model_size(pruned_model)
                
                if pruned_size <= constraints['max_model_size_mb']:
                    current_model = pruned_model
                    optimizations_applied.append(f'pruning_{sparsity}')
                    break
        
        # Check latency constraints
        current_latency = self._benchmark_inference(current_model, sample_data)
        if current_latency > constraints['max_latency_ms']:
            # Try model distillation or architecture search
            distilled_model = self._apply_knowledge_distillation(current_model)
            distilled_latency = self._benchmark_inference(distilled_model, sample_data)
            
            if distilled_latency <= constraints['max_latency_ms']:
                current_model = distilled_model
                optimizations_applied.append('knowledge_distillation')
        
        # Final metrics
        results.update({
            'final_model': current_model,
            'final_size_mb': self._get_model_size(current_model),
            'final_latency_ms': self._benchmark_inference(current_model, sample_data),
            'optimizations_applied': optimizations_applied,
            'meets_constraints': self._check_constraints(current_model, sample_data, constraints)
        })
        
        return results
    
    def create_deployment_package(self, 
                                 optimized_model: torch.nn.Module,
                                 target: str,
                                 metadata: Dict) -> Dict:
        """Create complete deployment package"""
        
        package = {
            'model': optimized_model,
            'target': target,
            'metadata': metadata,
            'inference_config': self._create_inference_config(target),
            'preprocessing_pipeline': self._create_preprocessing_config(target),
            'deployment_script': self._generate_deployment_script(target)
        }
        
        return package
    
    def _create_inference_config(self, target: str) -> Dict:
        """Create inference configuration for target"""
        
        base_config = {
            'batch_size': 1,
            'num_threads': 1,
            'use_gpu': False
        }
        
        if target == 'mobile':
            base_config.update({
                'num_threads': 4,
                'use_nnapi': True,  # Android
                'use_metal': True   # iOS
            })
        elif target == 'embedded':
            base_config.update({
                'num_threads': 2,
                'memory_fraction': 0.8
            })
        elif target == 'edge_server':
            base_config.update({
                'batch_size': 4,
                'num_threads': 8,
                'use_gpu': True
            })
        
        return base_config

Production Monitoring and Maintenance

CV Model Monitoring

# cv_monitoring.py
class CVModelMonitor:
    def __init__(self):
        self.metrics_store = {}
        self.drift_detectors = {}
        self.alert_thresholds = {
            'accuracy_drop': 0.05,
            'latency_increase': 0.3,
            'error_rate_threshold': 0.02
        }
    
    def track_inference(self, 
                       model_id: str,
                       input_image: np.ndarray,
                       prediction: ProcessingResult,
                       ground_truth: Optional[List[DetectionResult]] = None):
        """Track single inference for monitoring"""
        
        timestamp = time.time()
        
        # Image statistics
        image_stats = self._analyze_image(input_image)
        
        # Prediction statistics
        pred_stats = self._analyze_predictions(prediction)
        
        # Accuracy metrics (if ground truth available)
        accuracy_metrics = None
        if ground_truth:
            accuracy_metrics = self._calculate_accuracy(
                prediction.detections,
                ground_truth
            )
        
        # Store metrics
        if model_id not in self.metrics_store:
            self.metrics_store[model_id] = []
        
        self.metrics_store[model_id].append({
            'timestamp': timestamp,
            'image_stats': image_stats,
            'prediction_stats': pred_stats,
            'accuracy_metrics': accuracy_metrics,
            'processing_time_ms': prediction.processing_time_ms
        })
        
        # Check for drift and anomalies
        self._check_drift(model_id)
        self._check_anomalies(model_id, image_stats, pred_stats)
    
    def _analyze_image(self, image: np.ndarray) -> Dict:
        """Analyze image characteristics"""
        
        # Basic statistics
        stats = {
            'mean_brightness': float(np.mean(image)),
            'std_brightness': float(np.std(image)),
            'height': image.shape[0],
            'width': image.shape[1],
            'channels': image.shape[2] if len(image.shape) > 2 else 1
        }
        
        # Color distribution
        if len(image.shape) == 3:
            stats['color_hist'] = [
                np.histogram(image[:,:,i], bins=16)[0].tolist()
                for i in range(3)
            ]
        
        # Edge density (complexity measure)
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) if len(image.shape) == 3 else image
        edges = cv2.Canny(gray.astype(np.uint8), 50, 150)
        stats['edge_density'] = float(np.sum(edges > 0) / edges.size)
        
        return stats
    
    def _analyze_predictions(self, result: ProcessingResult) -> Dict:
        """Analyze prediction characteristics"""
        
        stats = {
            'num_detections': len(result.detections),
            'avg_confidence': 0.0,
            'confidence_distribution': {},
            'class_distribution': {}
        }
        
        if result.detections:
            confidences = [d.confidence for d in result.detections]
            stats['avg_confidence'] = float(np.mean(confidences))
            stats['min_confidence'] = float(np.min(confidences))
            stats['max_confidence'] = float(np.max(confidences))
            
            # Confidence histogram
            hist, bins = np.histogram(confidences, bins=10, range=(0, 1))
            stats['confidence_distribution'] = {
                f'{bins[i]:.1f}-{bins[i+1]:.1f}': int(hist[i])
                for i in range(len(hist))
            }
            
            # Class distribution
            class_counts = {}
            for detection in result.detections:
                class_name = detection.class_name
                class_counts[class_name] = class_counts.get(class_name, 0) + 1
            stats['class_distribution'] = class_counts
        
        return stats
    
    def _check_drift(self, model_id: str):
        """Check for model drift"""
        
        if model_id not in self.metrics_store:
            return
        
        recent_metrics = self.metrics_store[model_id][-100:]  # Last 100 inferences
        
        if len(recent_metrics) < 50:
            return  # Not enough data
        
        # Split into two periods for comparison
        mid_point = len(recent_metrics) // 2
        period1 = recent_metrics[:mid_point]
        period2 = recent_metrics[mid_point:]
        
        # Compare distributions
        drift_detected = False
        drift_metrics = {}
        
        # Check prediction confidence drift
        conf1 = [m['prediction_stats']['avg_confidence'] for m in period1 if m['prediction_stats']['avg_confidence'] > 0]
        conf2 = [m['prediction_stats']['avg_confidence'] for m in period2 if m['prediction_stats']['avg_confidence'] > 0]
        
        if conf1 and conf2:
            from scipy import stats
            stat, p_value = stats.ks_2samp(conf1, conf2)
            
            if p_value < 0.05:  # Significant difference
                drift_detected = True
                drift_metrics['confidence_drift'] = {
                    'p_value': p_value,
                    'mean_change': np.mean(conf2) - np.mean(conf1)
                }
        
        # Check image characteristic drift
        brightness1 = [m['image_stats']['mean_brightness'] for m in period1]
        brightness2 = [m['image_stats']['mean_brightness'] for m in period2]
        
        if brightness1 and brightness2:
            stat, p_value = stats.ks_2samp(brightness1, brightness2)
            
            if p_value < 0.05:
                drift_detected = True
                drift_metrics['brightness_drift'] = {
                    'p_value': p_value,
                    'mean_change': np.mean(brightness2) - np.mean(brightness1)
                }
        
        if drift_detected:
            self._send_drift_alert(model_id, drift_metrics)
    
    def generate_performance_report(self, 
                                  model_id: str,
                                  time_range_hours: int = 24) -> Dict:
        """Generate comprehensive performance report"""
        
        if model_id not in self.metrics_store:
            return {'error': 'No data available'}
        
        # Filter data by time range
        current_time = time.time()
        time_threshold = current_time - (time_range_hours * 3600)
        
        relevant_metrics = [
            m for m in self.metrics_store[model_id]
            if m['timestamp'] >= time_threshold
        ]
        
        if not relevant_metrics:
            return {'error': 'No recent data available'}
        
        report = {
            'model_id': model_id,
            'time_range_hours': time_range_hours,
            'total_inferences': len(relevant_metrics),
            'performance_metrics': self._calculate_performance_metrics(relevant_metrics),
            'data_quality_metrics': self._calculate_data_quality_metrics(relevant_metrics),
            'error_analysis': self._analyze_errors(relevant_metrics),
            'recommendations': self._generate_recommendations(relevant_metrics)
        }
        
        return report

Best Practices Checklist

  • Implement efficient preprocessing pipelines
  • Use model ensembles for better accuracy
  • Apply non-maximum suppression properly
  • Optimize models for target deployment
  • Monitor for data drift and model degradation
  • Implement proper error handling
  • Use batch processing for throughput
  • Cache preprocessed data when appropriate
  • Implement proper logging and monitoring
  • Regular model retraining pipelines
  • A/B test model improvements
  • Validate model outputs continuously
  • Implement fallback mechanisms
  • Optimize for specific hardware
  • Document model architectures and decisions

Conclusion

Production computer vision requires more than accurate models—it demands efficient pipelines, robust deployment strategies, and comprehensive monitoring. By implementing proper preprocessing, model optimization, real-time processing capabilities, and continuous monitoring, you can build CV systems that perform reliably at scale. Remember, the goal is not just accuracy, but consistent performance under real-world conditions with varying data quality and computational constraints.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles