Deploy AI on Mobile and IoT Devices

David Childs

Master edge AI deployment including model compression, hardware-specific optimization, real-time inference, and resource-constrained deployment strategies.

Edge AI deployment represents the frontier of practical artificial intelligence—bringing sophisticated models to smartphones, IoT devices, and embedded systems. After deploying models to millions of edge devices across automotive, mobile, and industrial IoT applications, I've learned that edge AI requires fundamentally different approaches than cloud deployment. Here's your complete guide to successful edge AI deployment.

Edge AI Architecture and Constraints

Edge Deployment Framework

# edge_deployment.py
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
import torch
import numpy as np
import time
import threading
import queue
from abc import ABC, abstractmethod
import logging

class EdgePlatform(Enum):
    MOBILE_ANDROID = "android"
    MOBILE_IOS = "ios"
    RASPBERRY_PI = "raspberry_pi"
    NVIDIA_JETSON = "nvidia_jetson"
    INTEL_NUC = "intel_nuc"
    ARM_CORTEX = "arm_cortex"
    ESP32 = "esp32"
    ARDUINO = "arduino"

class ResourceProfile(Enum):
    ULTRA_LOW = "ultra_low"    # < 100MB RAM, < 500MHz CPU
    LOW = "low"                # < 1GB RAM, < 1GHz CPU  
    MEDIUM = "medium"          # < 4GB RAM, < 2GHz CPU
    HIGH = "high"              # > 4GB RAM, > 2GHz CPU

@dataclass
class EdgeConstraints:
    max_memory_mb: int
    max_model_size_mb: int
    max_inference_time_ms: int
    max_power_consumption_mw: int
    cpu_cores: int
    has_gpu: bool
    has_npu: bool
    storage_mb: int
    network_connectivity: List[str]  # ['wifi', 'cellular', 'bluetooth']

@dataclass
class EdgeDeploymentConfig:
    platform: EdgePlatform
    constraints: EdgeConstraints
    optimization_level: str  # 'speed', 'size', 'balanced'
    quantization: str  # 'int8', 'int4', 'fp16', 'none'
    compilation_target: str  # 'tflite', 'onnx', 'coreml', 'tensorrt'
    batch_size: int = 1
    enable_caching: bool = True
    enable_threading: bool = True

class EdgeAIFramework:
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
        self.model_cache = {}
        self.inference_queue = queue.Queue(maxsize=10)
        self.performance_monitor = EdgePerformanceMonitor()
        
        # Initialize platform-specific components
        self.platform_adapter = self._create_platform_adapter()
        self.model_optimizer = EdgeModelOptimizer(config)
        self.inference_engine = EdgeInferenceEngine(config)
        
    def _create_platform_adapter(self):
        """Create platform-specific adapter"""
        
        if self.config.platform == EdgePlatform.MOBILE_ANDROID:
            return AndroidAdapter(self.config)
        elif self.config.platform == EdgePlatform.MOBILE_IOS:
            return IOSAdapter(self.config)
        elif self.config.platform == EdgePlatform.NVIDIA_JETSON:
            return JetsonAdapter(self.config)
        elif self.config.platform == EdgePlatform.RASPBERRY_PI:
            return RaspberryPiAdapter(self.config)
        else:
            return GenericEdgeAdapter(self.config)
    
    async def deploy_model(self, 
                         model_path: str,
                         model_metadata: Dict) -> str:
        """Deploy model to edge device"""
        
        deployment_id = f"edge_{int(time.time())}"
        
        try:
            # Step 1: Model optimization
            optimized_model = await self.model_optimizer.optimize_model(
                model_path,
                self.config.constraints
            )
            
            # Step 2: Model compilation
            compiled_model = await self.platform_adapter.compile_model(
                optimized_model,
                self.config.compilation_target
            )
            
            # Step 3: Model deployment
            deployed_model = await self.platform_adapter.deploy_model(
                compiled_model,
                deployment_id
            )
            
            # Step 4: Performance validation
            validation_results = await self._validate_deployment(
                deployed_model,
                self.config.constraints
            )
            
            if not validation_results['passes_constraints']:
                raise ValueError(f"Deployment validation failed: {validation_results['issues']}")
            
            # Store in cache
            self.model_cache[deployment_id] = {
                'model': deployed_model,
                'metadata': model_metadata,
                'deployment_time': time.time(),
                'validation_results': validation_results
            }
            
            return deployment_id
            
        except Exception as e:
            logging.error(f"Model deployment failed: {e}")
            raise
    
    async def run_inference(self,
                          deployment_id: str,
                          input_data: np.ndarray,
                          timeout_ms: int = None) -> Dict:
        """Run inference on edge device"""
        
        if deployment_id not in self.model_cache:
            raise ValueError(f"Deployment {deployment_id} not found")
        
        start_time = time.time()
        
        # Get model
        deployed_model = self.model_cache[deployment_id]['model']
        
        # Apply timeout if specified
        timeout_ms = timeout_ms or self.config.constraints.max_inference_time_ms
        
        try:
            # Run inference with monitoring
            with self.performance_monitor.track_inference(deployment_id):
                result = await self.inference_engine.infer(
                    deployed_model,
                    input_data,
                    timeout_ms
                )
            
            inference_time_ms = (time.time() - start_time) * 1000
            
            return {
                'prediction': result,
                'inference_time_ms': inference_time_ms,
                'deployment_id': deployment_id,
                'timestamp': time.time()
            }
            
        except TimeoutError:
            raise TimeoutError(f"Inference timeout after {timeout_ms}ms")
        except Exception as e:
            logging.error(f"Inference failed: {e}")
            raise

class EdgeModelOptimizer:
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
        self.optimization_techniques = {
            'quantization': True,
            'pruning': True,
            'knowledge_distillation': False,
            'weight_sharing': True,
            'layer_fusion': True
        }
    
    async def optimize_model(self, 
                           model_path: str,
                           constraints: EdgeConstraints) -> str:
        """Apply comprehensive model optimization for edge deployment"""
        
        # Load original model
        model = torch.jit.load(model_path)
        original_size = self._get_model_size_mb(model)
        
        logging.info(f"Original model size: {original_size:.2f} MB")
        
        # Apply optimizations based on constraints
        optimized_model = model
        optimizations_applied = []
        
        # Step 1: Quantization
        if original_size > constraints.max_model_size_mb:
            if self.config.quantization == 'int8':
                optimized_model = await self._apply_int8_quantization(optimized_model)
                optimizations_applied.append('int8_quantization')
            elif self.config.quantization == 'int4':
                optimized_model = await self._apply_int4_quantization(optimized_model)
                optimizations_applied.append('int4_quantization')
            elif self.config.quantization == 'fp16':
                optimized_model = await self._apply_fp16_quantization(optimized_model)
                optimizations_applied.append('fp16_quantization')
        
        # Step 2: Pruning (if still too large)
        current_size = self._get_model_size_mb(optimized_model)
        if current_size > constraints.max_model_size_mb:
            sparsity_levels = [0.3, 0.5, 0.7, 0.9]
            
            for sparsity in sparsity_levels:
                pruned_model = await self._apply_pruning(optimized_model, sparsity)
                pruned_size = self._get_model_size_mb(pruned_model)
                
                if pruned_size <= constraints.max_model_size_mb:
                    optimized_model = pruned_model
                    optimizations_applied.append(f'pruning_{sparsity}')
                    break
        
        # Step 3: Layer fusion
        if self.optimization_techniques['layer_fusion']:
            optimized_model = await self._apply_layer_fusion(optimized_model)
            optimizations_applied.append('layer_fusion')
        
        # Step 4: Knowledge distillation (if accuracy is too low)
        current_size = self._get_model_size_mb(optimized_model)
        if current_size > constraints.max_model_size_mb:
            distilled_model = await self._apply_knowledge_distillation(
                model,  # Teacher model
                target_size_mb=constraints.max_model_size_mb
            )
            optimized_model = distilled_model
            optimizations_applied.append('knowledge_distillation')
        
        # Save optimized model
        optimized_path = model_path.replace('.pt', '_optimized.pt')
        torch.jit.save(optimized_model, optimized_path)
        
        final_size = self._get_model_size_mb(optimized_model)
        compression_ratio = original_size / final_size
        
        logging.info(f"Optimized model size: {final_size:.2f} MB")
        logging.info(f"Compression ratio: {compression_ratio:.2f}x")
        logging.info(f"Optimizations applied: {optimizations_applied}")
        
        return optimized_path
    
    async def _apply_int8_quantization(self, model: torch.jit.ScriptModule):
        """Apply INT8 quantization"""
        
        # Prepare model for quantization
        model.eval()
        
        # Apply dynamic quantization
        quantized_model = torch.quantization.quantize_dynamic(
            model,
            {torch.nn.Linear, torch.nn.Conv2d},
            dtype=torch.qint8
        )
        
        return quantized_model
    
    async def _apply_pruning(self, 
                           model: torch.jit.ScriptModule,
                           sparsity: float):
        """Apply structured pruning"""
        
        import torch.nn.utils.prune as prune
        
        # Create a copy for pruning
        model_copy = torch.jit.trace(model, torch.randn(1, 3, 224, 224))
        
        # Apply pruning to convolutional and linear layers
        for name, module in model_copy.named_modules():
            if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
                prune.l1_unstructured(module, name='weight', amount=sparsity)
                prune.remove(module, 'weight')
        
        return model_copy
    
    async def _apply_knowledge_distillation(self,
                                          teacher_model: torch.jit.ScriptModule,
                                          target_size_mb: float):
        """Apply knowledge distillation to create smaller student model"""
        
        # This would involve creating a smaller student architecture
        # and training it to mimic the teacher model's outputs
        # For brevity, returning a simplified version here
        
        # Create student architecture (simplified)
        student_model = self._create_student_architecture(
            teacher_model,
            target_size_mb
        )
        
        # Training loop would go here
        # For now, return the student architecture
        return student_model
    
    def _create_student_architecture(self,
                                   teacher_model: torch.jit.ScriptModule,
                                   target_size_mb: float):
        """Create smaller student model architecture"""
        
        # Simplified approach: reduce layer dimensions
        # In practice, this would be more sophisticated
        
        class StudentModel(torch.nn.Module):
            def __init__(self):
                super().__init__()
                self.conv1 = torch.nn.Conv2d(3, 16, 3, padding=1)
                self.conv2 = torch.nn.Conv2d(16, 32, 3, padding=1)
                self.pool = torch.nn.AdaptiveAvgPool2d(1)
                self.fc = torch.nn.Linear(32, 1000)
                
            def forward(self, x):
                x = torch.relu(self.conv1(x))
                x = torch.relu(self.conv2(x))
                x = self.pool(x)
                x = x.view(x.size(0), -1)
                x = self.fc(x)
                return x
        
        return StudentModel()

Platform-Specific Adapters

# platform_adapters.py
class PlatformAdapter(ABC):
    @abstractmethod
    async def compile_model(self, model_path: str, target: str) -> str:
        pass
    
    @abstractmethod
    async def deploy_model(self, compiled_model_path: str, deployment_id: str) -> Any:
        pass
    
    @abstractmethod
    async def get_hardware_info(self) -> Dict:
        pass

class AndroidAdapter(PlatformAdapter):
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
    
    async def compile_model(self, model_path: str, target: str = "tflite") -> str:
        """Compile model for Android deployment"""
        
        if target == "tflite":
            return await self._convert_to_tflite(model_path)
        elif target == "onnx":
            return await self._convert_to_onnx(model_path)
        else:
            raise ValueError(f"Unsupported compilation target: {target}")
    
    async def _convert_to_tflite(self, model_path: str) -> str:
        """Convert PyTorch model to TensorFlow Lite"""
        
        import tensorflow as tf
        
        # Load PyTorch model
        pytorch_model = torch.jit.load(model_path)
        
        # Convert to ONNX first (intermediate step)
        onnx_path = model_path.replace('.pt', '.onnx')
        dummy_input = torch.randn(1, 3, 224, 224)
        
        torch.onnx.export(
            pytorch_model,
            dummy_input,
            onnx_path,
            export_params=True,
            opset_version=11,
            do_constant_folding=True
        )
        
        # Convert ONNX to TensorFlow
        import onnx_tf
        onnx_model = onnx.load(onnx_path)
        tf_model = onnx_tf.backend.prepare(onnx_model)
        
        # Convert to TensorFlow Lite
        converter = tf.lite.TFLiteConverter.from_concrete_functions([tf_model])
        
        # Apply optimizations
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        
        if self.config.quantization == 'int8':
            converter.representative_dataset = self._get_representative_dataset
            converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
            converter.inference_input_type = tf.int8
            converter.inference_output_type = tf.int8
        
        tflite_model = converter.convert()
        
        # Save TFLite model
        tflite_path = model_path.replace('.pt', '.tflite')
        with open(tflite_path, 'wb') as f:
            f.write(tflite_model)
        
        return tflite_path
    
    async def deploy_model(self, compiled_model_path: str, deployment_id: str):
        """Deploy TFLite model for Android inference"""
        
        import tflite_runtime.interpreter as tflite
        
        # Create TFLite interpreter
        interpreter = tflite.Interpreter(
            model_path=compiled_model_path,
            num_threads=self.config.constraints.cpu_cores
        )
        
        interpreter.allocate_tensors()
        
        return AndroidTFLiteModel(interpreter, deployment_id)
    
    async def get_hardware_info(self) -> Dict:
        """Get Android device hardware information"""
        
        # This would interface with Android APIs
        # For now, return mock data
        return {
            'platform': 'Android',
            'cpu_cores': 8,
            'ram_mb': 6144,
            'has_gpu': True,
            'gpu_type': 'Adreno 640',
            'has_npu': True,
            'npu_type': 'Hexagon 685'
        }

class IOSAdapter(PlatformAdapter):
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
    
    async def compile_model(self, model_path: str, target: str = "coreml") -> str:
        """Compile model for iOS deployment"""
        
        if target == "coreml":
            return await self._convert_to_coreml(model_path)
        else:
            raise ValueError(f"Unsupported compilation target for iOS: {target}")
    
    async def _convert_to_coreml(self, model_path: str) -> str:
        """Convert PyTorch model to Core ML"""
        
        try:
            import coremltools as ct
            
            # Load PyTorch model
            pytorch_model = torch.jit.load(model_path)
            pytorch_model.eval()
            
            # Create dummy input
            dummy_input = torch.randn(1, 3, 224, 224)
            
            # Trace the model
            traced_model = torch.jit.trace(pytorch_model, dummy_input)
            
            # Convert to Core ML
            coreml_model = ct.convert(
                traced_model,
                inputs=[ct.ImageType(
                    name="input",
                    shape=dummy_input.shape,
                    scale=1/255.0,
                    bias=[0, 0, 0]
                )]
            )
            
            # Apply optimizations
            if self.config.quantization == 'fp16':
                coreml_model = ct.models.neural_network.quantization_utils.quantize_weights(
                    coreml_model, nbits=16
                )
            elif self.config.quantization == 'int8':
                coreml_model = ct.models.neural_network.quantization_utils.quantize_weights(
                    coreml_model, nbits=8
                )
            
            # Save Core ML model
            coreml_path = model_path.replace('.pt', '.mlmodel')
            coreml_model.save(coreml_path)
            
            return coreml_path
            
        except ImportError:
            raise ImportError("coremltools is required for iOS deployment")
    
    async def deploy_model(self, compiled_model_path: str, deployment_id: str):
        """Deploy Core ML model for iOS inference"""
        
        # This would use Core ML APIs
        # For now, return a mock implementation
        return IOSCoreMLModel(compiled_model_path, deployment_id)

class JetsonAdapter(PlatformAdapter):
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
    
    async def compile_model(self, model_path: str, target: str = "tensorrt") -> str:
        """Compile model for NVIDIA Jetson deployment"""
        
        if target == "tensorrt":
            return await self._convert_to_tensorrt(model_path)
        else:
            return await self._convert_to_onnx(model_path)
    
    async def _convert_to_tensorrt(self, model_path: str) -> str:
        """Convert PyTorch model to TensorRT"""
        
        try:
            import tensorrt as trt
            
            # First convert to ONNX
            onnx_path = await self._convert_to_onnx(model_path)
            
            # Create TensorRT engine
            logger = trt.Logger(trt.Logger.WARNING)
            builder = trt.Builder(logger)
            network = builder.create_network(
                1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
            )
            parser = trt.OnnxParser(network, logger)
            
            # Parse ONNX model
            with open(onnx_path, 'rb') as model_file:
                if not parser.parse(model_file.read()):
                    raise ValueError("Failed to parse ONNX model")
            
            # Build engine with optimizations
            config = builder.create_builder_config()
            config.max_workspace_size = 1 << 28  # 256MB
            
            # Enable precision optimizations
            if self.config.quantization == 'fp16':
                if builder.platform_has_fast_fp16:
                    config.set_flag(trt.BuilderFlag.FP16)
            elif self.config.quantization == 'int8':
                if builder.platform_has_fast_int8:
                    config.set_flag(trt.BuilderFlag.INT8)
            
            engine = builder.build_engine(network, config)
            
            # Save engine
            tensorrt_path = model_path.replace('.pt', '.trt')
            with open(tensorrt_path, 'wb') as f:
                f.write(engine.serialize())
            
            return tensorrt_path
            
        except ImportError:
            raise ImportError("tensorrt is required for Jetson deployment")
    
    async def _convert_to_onnx(self, model_path: str) -> str:
        """Convert PyTorch model to ONNX"""
        
        # Load PyTorch model
        model = torch.jit.load(model_path)
        model.eval()
        
        # Create dummy input
        dummy_input = torch.randn(1, 3, 224, 224)
        
        # Export to ONNX
        onnx_path = model_path.replace('.pt', '.onnx')
        torch.onnx.export(
            model,
            dummy_input,
            onnx_path,
            export_params=True,
            opset_version=11,
            do_constant_folding=True,
            input_names=['input'],
            output_names=['output'],
            dynamic_axes={
                'input': {0: 'batch_size'},
                'output': {0: 'batch_size'}
            }
        )
        
        return onnx_path
    
    async def deploy_model(self, compiled_model_path: str, deployment_id: str):
        """Deploy model for Jetson inference"""
        
        if compiled_model_path.endswith('.trt'):
            return JetsonTensorRTModel(compiled_model_path, deployment_id)
        elif compiled_model_path.endswith('.onnx'):
            return JetsonONNXModel(compiled_model_path, deployment_id)
        else:
            raise ValueError(f"Unsupported model format: {compiled_model_path}")

Real-time Inference Engine

# edge_inference_engine.py
class EdgeInferenceEngine:
    def __init__(self, config: EdgeDeploymentConfig):
        self.config = config
        self.inference_cache = {}
        self.preprocessing_cache = {}
        
        # Performance optimization
        self.thread_pool = None
        if config.enable_threading:
            import concurrent.futures
            self.thread_pool = concurrent.futures.ThreadPoolExecutor(
                max_workers=min(config.constraints.cpu_cores, 4)
            )
    
    async def infer(self,
                   model: Any,
                   input_data: np.ndarray,
                   timeout_ms: int) -> np.ndarray:
        """Run optimized inference with timeout"""
        
        start_time = time.time()
        
        # Input preprocessing
        preprocessed_input = await self._preprocess_input(
            input_data,
            model.get_input_spec()
        )
        
        # Cache check
        if self.config.enable_caching:
            cache_key = self._generate_cache_key(preprocessed_input)
            if cache_key in self.inference_cache:
                cached_result = self.inference_cache[cache_key]
                # Check cache age
                if time.time() - cached_result['timestamp'] < 60:  # 1 minute cache
                    return cached_result['result']
        
        # Run inference with timeout
        try:
            if self.thread_pool:
                # Async inference
                future = self.thread_pool.submit(
                    model.predict,
                    preprocessed_input
                )
                result = future.result(timeout=timeout_ms/1000)
            else:
                # Synchronous inference
                result = model.predict(preprocessed_input)
            
            # Check timeout
            if (time.time() - start_time) * 1000 > timeout_ms:
                raise TimeoutError(f"Inference exceeded {timeout_ms}ms timeout")
            
            # Postprocess result
            postprocessed_result = await self._postprocess_output(
                result,
                model.get_output_spec()
            )
            
            # Cache result
            if self.config.enable_caching:
                self.inference_cache[cache_key] = {
                    'result': postprocessed_result,
                    'timestamp': time.time()
                }
                
                # Manage cache size
                if len(self.inference_cache) > 100:
                    oldest_key = min(
                        self.inference_cache.keys(),
                        key=lambda k: self.inference_cache[k]['timestamp']
                    )
                    del self.inference_cache[oldest_key]
            
            return postprocessed_result
            
        except Exception as e:
            if isinstance(e, TimeoutError):
                raise
            else:
                raise RuntimeError(f"Inference failed: {e}")
    
    async def _preprocess_input(self,
                              input_data: np.ndarray,
                              input_spec: Dict) -> np.ndarray:
        """Optimized input preprocessing"""
        
        # Check preprocessing cache
        input_hash = hash(input_data.tobytes())
        if input_hash in self.preprocessing_cache:
            return self.preprocessing_cache[input_hash]
        
        processed = input_data.copy()
        
        # Resize if needed
        if 'shape' in input_spec:
            target_shape = input_spec['shape']
            if processed.shape != target_shape:
                processed = self._resize_input(processed, target_shape)
        
        # Normalize
        if 'normalize' in input_spec:
            normalize_config = input_spec['normalize']
            processed = (processed - normalize_config['mean']) / normalize_config['std']
        
        # Data type conversion
        if 'dtype' in input_spec:
            processed = processed.astype(input_spec['dtype'])
        
        # Cache preprocessing result
        self.preprocessing_cache[input_hash] = processed
        
        # Manage cache size
        if len(self.preprocessing_cache) > 50:
            oldest_key = next(iter(self.preprocessing_cache))
            del self.preprocessing_cache[oldest_key]
        
        return processed
    
    def _resize_input(self, input_data: np.ndarray, target_shape: tuple) -> np.ndarray:
        """Fast input resizing"""
        
        if len(input_data.shape) == 4:  # Batch of images
            import cv2
            resized_batch = []
            
            for i in range(input_data.shape[0]):
                img = input_data[i]
                resized = cv2.resize(
                    img,
                    (target_shape[2], target_shape[1]),
                    interpolation=cv2.INTER_LINEAR
                )
                resized_batch.append(resized)
            
            return np.stack(resized_batch)
        
        else:  # Single image
            import cv2
            return cv2.resize(
                input_data,
                (target_shape[1], target_shape[0]),
                interpolation=cv2.INTER_LINEAR
            )

class EdgePerformanceMonitor:
    def __init__(self):
        self.metrics = {}
        self.resource_monitor = ResourceMonitor()
    
    def track_inference(self, deployment_id: str):
        """Context manager for tracking inference performance"""
        
        return InferenceTracker(deployment_id, self)
    
    def record_inference(self,
                        deployment_id: str,
                        inference_time_ms: float,
                        memory_usage_mb: float,
                        cpu_usage_percent: float,
                        power_consumption_mw: float = None):
        """Record inference metrics"""
        
        if deployment_id not in self.metrics:
            self.metrics[deployment_id] = {
                'inference_count': 0,
                'total_inference_time_ms': 0,
                'avg_inference_time_ms': 0,
                'max_inference_time_ms': 0,
                'avg_memory_usage_mb': 0,
                'avg_cpu_usage_percent': 0,
                'inference_history': []
            }
        
        metrics = self.metrics[deployment_id]
        
        # Update counters
        metrics['inference_count'] += 1
        metrics['total_inference_time_ms'] += inference_time_ms
        metrics['avg_inference_time_ms'] = (
            metrics['total_inference_time_ms'] / metrics['inference_count']
        )
        metrics['max_inference_time_ms'] = max(
            metrics['max_inference_time_ms'],
            inference_time_ms
        )
        
        # Update resource usage
        metrics['avg_memory_usage_mb'] = (
            (metrics['avg_memory_usage_mb'] * (metrics['inference_count'] - 1) + 
             memory_usage_mb) / metrics['inference_count']
        )
        
        metrics['avg_cpu_usage_percent'] = (
            (metrics['avg_cpu_usage_percent'] * (metrics['inference_count'] - 1) + 
             cpu_usage_percent) / metrics['inference_count']
        )
        
        # Store history (last 100 inferences)
        inference_record = {
            'timestamp': time.time(),
            'inference_time_ms': inference_time_ms,
            'memory_usage_mb': memory_usage_mb,
            'cpu_usage_percent': cpu_usage_percent,
            'power_consumption_mw': power_consumption_mw
        }
        
        metrics['inference_history'].append(inference_record)
        
        if len(metrics['inference_history']) > 100:
            metrics['inference_history'].pop(0)
    
    def get_performance_report(self, deployment_id: str) -> Dict:
        """Generate performance report"""
        
        if deployment_id not in self.metrics:
            return {'error': 'No metrics available'}
        
        metrics = self.metrics[deployment_id]
        
        # Calculate additional statistics
        recent_inferences = metrics['inference_history'][-10:]
        recent_avg_time = (
            sum(r['inference_time_ms'] for r in recent_inferences) / 
            len(recent_inferences) if recent_inferences else 0
        )
        
        return {
            'deployment_id': deployment_id,
            'total_inferences': metrics['inference_count'],
            'avg_inference_time_ms': metrics['avg_inference_time_ms'],
            'max_inference_time_ms': metrics['max_inference_time_ms'],
            'recent_avg_inference_time_ms': recent_avg_time,
            'avg_memory_usage_mb': metrics['avg_memory_usage_mb'],
            'avg_cpu_usage_percent': metrics['avg_cpu_usage_percent'],
            'throughput_fps': 1000 / metrics['avg_inference_time_ms'] if metrics['avg_inference_time_ms'] > 0 else 0,
            'performance_trend': self._analyze_performance_trend(metrics['inference_history'])
        }
    
    def _analyze_performance_trend(self, history: List[Dict]) -> str:
        """Analyze performance trend"""
        
        if len(history) < 10:
            return 'insufficient_data'
        
        # Compare first half with second half
        midpoint = len(history) // 2
        first_half_avg = sum(
            r['inference_time_ms'] for r in history[:midpoint]
        ) / midpoint
        
        second_half_avg = sum(
            r['inference_time_ms'] for r in history[midpoint:]
        ) / (len(history) - midpoint)
        
        if second_half_avg > first_half_avg * 1.1:
            return 'degrading'
        elif second_half_avg < first_half_avg * 0.9:
            return 'improving'
        else:
            return 'stable'

class InferenceTracker:
    def __init__(self, deployment_id: str, monitor: EdgePerformanceMonitor):
        self.deployment_id = deployment_id
        self.monitor = monitor
        self.start_time = None
        self.start_memory = None
        self.start_cpu = None
    
    def __enter__(self):
        self.start_time = time.time()
        self.start_memory = self.monitor.resource_monitor.get_memory_usage_mb()
        self.start_cpu = self.monitor.resource_monitor.get_cpu_usage_percent()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        end_time = time.time()
        end_memory = self.monitor.resource_monitor.get_memory_usage_mb()
        end_cpu = self.monitor.resource_monitor.get_cpu_usage_percent()
        
        inference_time_ms = (end_time - self.start_time) * 1000
        avg_memory = (self.start_memory + end_memory) / 2
        avg_cpu = (self.start_cpu + end_cpu) / 2
        
        self.monitor.record_inference(
            self.deployment_id,
            inference_time_ms,
            avg_memory,
            avg_cpu
        )

class ResourceMonitor:
    def __init__(self):
        import psutil
        self.psutil = psutil
    
    def get_memory_usage_mb(self) -> float:
        """Get current memory usage in MB"""
        try:
            process = self.psutil.Process()
            return process.memory_info().rss / 1024 / 1024
        except:
            return 0.0
    
    def get_cpu_usage_percent(self) -> float:
        """Get current CPU usage percentage"""
        try:
            return self.psutil.cpu_percent(interval=0.1)
        except:
            return 0.0
    
    def get_gpu_usage_percent(self) -> float:
        """Get GPU usage if available"""
        try:
            import GPUtil
            gpus = GPUtil.getGPUs()
            if gpus:
                return gpus[0].load * 100
            return 0.0
        except:
            return 0.0

Model Compression and Optimization

Advanced Compression Techniques

# compression_techniques.py
class AdvancedModelCompression:
    def __init__(self):
        self.compression_methods = {
            'weight_clustering': WeightClusteringCompressor(),
            'low_rank_approximation': LowRankApproximation(),
            'huffman_coding': HuffmanCodingCompressor(),
            'binary_quantization': BinaryQuantizer(),
            'mixed_precision': MixedPrecisionOptimizer()
        }
    
    async def compress_model(self,
                           model_path: str,
                           target_size_mb: float,
                           accuracy_threshold: float = 0.95) -> Dict:
        """Apply multiple compression techniques to achieve target size"""
        
        original_model = torch.jit.load(model_path)
        original_size = self._get_model_size_mb(original_model)
        original_accuracy = await self._evaluate_model_accuracy(original_model)
        
        compression_results = []
        current_model = original_model
        current_size = original_size
        
        # Apply compression techniques in order of effectiveness
        techniques = [
            ('weight_clustering', {'clusters': 256}),
            ('low_rank_approximation', {'rank_reduction': 0.5}),
            ('huffman_coding', {}),
            ('binary_quantization', {'layers': ['conv', 'linear']}),
            ('mixed_precision', {'fp16_layers': ['conv'], 'int8_layers': ['linear']})
        ]
        
        for technique_name, config in techniques:
            if current_size <= target_size_mb:
                break
            
            # Apply compression technique
            compressor = self.compression_methods[technique_name]
            compressed_model = await compressor.compress(current_model, config)
            
            # Evaluate compressed model
            compressed_size = self._get_model_size_mb(compressed_model)
            compressed_accuracy = await self._evaluate_model_accuracy(compressed_model)
            
            # Check if accuracy is acceptable
            accuracy_retention = compressed_accuracy / original_accuracy
            
            if accuracy_retention >= accuracy_threshold:
                current_model = compressed_model
                current_size = compressed_size
                
                compression_results.append({
                    'technique': technique_name,
                    'size_before_mb': current_size,
                    'size_after_mb': compressed_size,
                    'compression_ratio': current_size / compressed_size,
                    'accuracy_retention': accuracy_retention,
                    'applied': True
                })
            else:
                compression_results.append({
                    'technique': technique_name,
                    'accuracy_retention': accuracy_retention,
                    'applied': False,
                    'reason': 'accuracy_below_threshold'
                })
        
        # Save compressed model
        compressed_path = model_path.replace('.pt', '_compressed.pt')
        torch.jit.save(current_model, compressed_path)
        
        final_compression_ratio = original_size / current_size
        
        return {
            'compressed_model_path': compressed_path,
            'original_size_mb': original_size,
            'compressed_size_mb': current_size,
            'total_compression_ratio': final_compression_ratio,
            'accuracy_retention': compressed_accuracy / original_accuracy,
            'compression_techniques': compression_results,
            'meets_target_size': current_size <= target_size_mb
        }

class WeightClusteringCompressor:
    async def compress(self, model: torch.nn.Module, config: Dict):
        """Apply weight clustering compression"""
        
        num_clusters = config.get('clusters', 256)
        
        compressed_model = self._create_model_copy(model)
        
        # Apply clustering to each layer
        for name, module in compressed_model.named_modules():
            if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
                compressed_weights = self._cluster_weights(
                    module.weight.data,
                    num_clusters
                )
                module.weight.data = compressed_weights
        
        return compressed_model
    
    def _cluster_weights(self, weights: torch.Tensor, num_clusters: int):
        """Cluster weights using k-means"""
        
        from sklearn.cluster import KMeans
        
        # Flatten weights
        original_shape = weights.shape
        flattened_weights = weights.flatten().cpu().numpy()
        
        # Apply k-means clustering
        kmeans = KMeans(n_clusters=num_clusters, random_state=42)
        cluster_labels = kmeans.fit_predict(flattened_weights.reshape(-1, 1))
        
        # Replace weights with cluster centroids
        clustered_weights = kmeans.cluster_centers_[cluster_labels].flatten()
        
        # Reshape back to original shape
        clustered_tensor = torch.tensor(
            clustered_weights.reshape(original_shape),
            dtype=weights.dtype,
            device=weights.device
        )
        
        return clustered_tensor

class LowRankApproximation:
    async def compress(self, model: torch.nn.Module, config: Dict):
        """Apply low-rank approximation to linear layers"""
        
        rank_reduction = config.get('rank_reduction', 0.5)
        
        compressed_model = self._create_model_copy(model)
        
        # Apply low-rank approximation to linear layers
        for name, module in compressed_model.named_modules():
            if isinstance(module, torch.nn.Linear):
                compressed_layer = self._approximate_linear_layer(
                    module,
                    rank_reduction
                )
                # Replace layer in model
                self._replace_module(compressed_model, name, compressed_layer)
        
        return compressed_model
    
    def _approximate_linear_layer(self, layer: torch.nn.Linear, rank_reduction: float):
        """Apply SVD-based low-rank approximation"""
        
        # Get weight matrix
        weight = layer.weight.data
        
        # Apply SVD
        U, S, Vt = torch.linalg.svd(weight, full_matrices=False)
        
        # Determine rank
        original_rank = min(weight.shape)
        target_rank = int(original_rank * rank_reduction)
        
        # Truncate SVD components
        U_truncated = U[:, :target_rank]
        S_truncated = S[:target_rank]
        Vt_truncated = Vt[:target_rank, :]
        
        # Create two linear layers
        layer1 = torch.nn.Linear(
            weight.shape[1],
            target_rank,
            bias=False
        )
        layer2 = torch.nn.Linear(
            target_rank,
            weight.shape[0],
            bias=layer.bias is not None
        )
        
        # Set weights
        layer1.weight.data = (Vt_truncated * S_truncated.unsqueeze(1))
        layer2.weight.data = U_truncated
        
        if layer.bias is not None:
            layer2.bias.data = layer.bias.data
        
        # Return sequential container
        return torch.nn.Sequential(layer1, layer2)

class HuffmanCodingCompressor:
    async def compress(self, model: torch.nn.Module, config: Dict):
        """Apply Huffman coding to weights"""
        
        # This would implement Huffman coding for weight compression
        # For brevity, returning the model as-is
        # In practice, this would involve:
        # 1. Quantizing weights to discrete values
        # 2. Building Huffman tree based on weight frequency
        # 3. Encoding weights using variable-length codes
        # 4. Storing codebook with model
        
        return model

class MixedPrecisionOptimizer:
    async def compress(self, model: torch.nn.Module, config: Dict):
        """Apply mixed precision optimization"""
        
        fp16_layers = config.get('fp16_layers', ['conv'])
        int8_layers = config.get('int8_layers', ['linear'])
        
        compressed_model = self._create_model_copy(model)
        
        # Apply mixed precision
        for name, module in compressed_model.named_modules():
            if any(layer_type in name.lower() for layer_type in fp16_layers):
                # Convert to FP16
                module.half()
            elif any(layer_type in name.lower() for layer_type in int8_layers):
                # Apply INT8 quantization
                if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
                    quantized_module = torch.quantization.quantize_dynamic(
                        module,
                        {type(module)},
                        dtype=torch.qint8
                    )
                    self._replace_module(compressed_model, name, quantized_module)
        
        return compressed_model

Best Practices Checklist

  • Profile target hardware constraints thoroughly
  • Apply appropriate quantization for platform
  • Implement model compression techniques
  • Optimize preprocessing pipelines
  • Use platform-specific compilation targets
  • Implement efficient caching strategies
  • Monitor inference performance continuously
  • Set up proper timeout handling
  • Implement graceful degradation
  • Use batch processing when possible
  • Optimize memory allocation patterns
  • Implement model versioning for edge devices
  • Set up remote monitoring and updates
  • Test across different device conditions
  • Implement proper error handling and recovery

Conclusion

Edge AI deployment requires a fundamentally different approach than cloud deployment, focusing on resource efficiency, real-time performance, and hardware-specific optimization. By implementing proper model compression, platform-specific compilation, efficient inference engines, and comprehensive monitoring, you can successfully deploy sophisticated AI models to resource-constrained edge devices. Remember that edge AI is about finding the optimal balance between model capability and hardware constraints—start with your requirements, optimize relentlessly, and always validate performance on actual target hardware.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles