Master edge AI deployment including model compression, hardware-specific optimization, real-time inference, and resource-constrained deployment strategies.
Edge AI deployment represents the frontier of practical artificial intelligence—bringing sophisticated models to smartphones, IoT devices, and embedded systems. After deploying models to millions of edge devices across automotive, mobile, and industrial IoT applications, I've learned that edge AI requires fundamentally different approaches than cloud deployment. Here's your complete guide to successful edge AI deployment.
Edge AI Architecture and Constraints
Edge Deployment Framework
# edge_deployment.py
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from enum import Enum
import torch
import numpy as np
import time
import threading
import queue
from abc import ABC, abstractmethod
import logging
class EdgePlatform(Enum):
MOBILE_ANDROID = "android"
MOBILE_IOS = "ios"
RASPBERRY_PI = "raspberry_pi"
NVIDIA_JETSON = "nvidia_jetson"
INTEL_NUC = "intel_nuc"
ARM_CORTEX = "arm_cortex"
ESP32 = "esp32"
ARDUINO = "arduino"
class ResourceProfile(Enum):
ULTRA_LOW = "ultra_low" # < 100MB RAM, < 500MHz CPU
LOW = "low" # < 1GB RAM, < 1GHz CPU
MEDIUM = "medium" # < 4GB RAM, < 2GHz CPU
HIGH = "high" # > 4GB RAM, > 2GHz CPU
@dataclass
class EdgeConstraints:
max_memory_mb: int
max_model_size_mb: int
max_inference_time_ms: int
max_power_consumption_mw: int
cpu_cores: int
has_gpu: bool
has_npu: bool
storage_mb: int
network_connectivity: List[str] # ['wifi', 'cellular', 'bluetooth']
@dataclass
class EdgeDeploymentConfig:
platform: EdgePlatform
constraints: EdgeConstraints
optimization_level: str # 'speed', 'size', 'balanced'
quantization: str # 'int8', 'int4', 'fp16', 'none'
compilation_target: str # 'tflite', 'onnx', 'coreml', 'tensorrt'
batch_size: int = 1
enable_caching: bool = True
enable_threading: bool = True
class EdgeAIFramework:
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
self.model_cache = {}
self.inference_queue = queue.Queue(maxsize=10)
self.performance_monitor = EdgePerformanceMonitor()
# Initialize platform-specific components
self.platform_adapter = self._create_platform_adapter()
self.model_optimizer = EdgeModelOptimizer(config)
self.inference_engine = EdgeInferenceEngine(config)
def _create_platform_adapter(self):
"""Create platform-specific adapter"""
if self.config.platform == EdgePlatform.MOBILE_ANDROID:
return AndroidAdapter(self.config)
elif self.config.platform == EdgePlatform.MOBILE_IOS:
return IOSAdapter(self.config)
elif self.config.platform == EdgePlatform.NVIDIA_JETSON:
return JetsonAdapter(self.config)
elif self.config.platform == EdgePlatform.RASPBERRY_PI:
return RaspberryPiAdapter(self.config)
else:
return GenericEdgeAdapter(self.config)
async def deploy_model(self,
model_path: str,
model_metadata: Dict) -> str:
"""Deploy model to edge device"""
deployment_id = f"edge_{int(time.time())}"
try:
# Step 1: Model optimization
optimized_model = await self.model_optimizer.optimize_model(
model_path,
self.config.constraints
)
# Step 2: Model compilation
compiled_model = await self.platform_adapter.compile_model(
optimized_model,
self.config.compilation_target
)
# Step 3: Model deployment
deployed_model = await self.platform_adapter.deploy_model(
compiled_model,
deployment_id
)
# Step 4: Performance validation
validation_results = await self._validate_deployment(
deployed_model,
self.config.constraints
)
if not validation_results['passes_constraints']:
raise ValueError(f"Deployment validation failed: {validation_results['issues']}")
# Store in cache
self.model_cache[deployment_id] = {
'model': deployed_model,
'metadata': model_metadata,
'deployment_time': time.time(),
'validation_results': validation_results
}
return deployment_id
except Exception as e:
logging.error(f"Model deployment failed: {e}")
raise
async def run_inference(self,
deployment_id: str,
input_data: np.ndarray,
timeout_ms: int = None) -> Dict:
"""Run inference on edge device"""
if deployment_id not in self.model_cache:
raise ValueError(f"Deployment {deployment_id} not found")
start_time = time.time()
# Get model
deployed_model = self.model_cache[deployment_id]['model']
# Apply timeout if specified
timeout_ms = timeout_ms or self.config.constraints.max_inference_time_ms
try:
# Run inference with monitoring
with self.performance_monitor.track_inference(deployment_id):
result = await self.inference_engine.infer(
deployed_model,
input_data,
timeout_ms
)
inference_time_ms = (time.time() - start_time) * 1000
return {
'prediction': result,
'inference_time_ms': inference_time_ms,
'deployment_id': deployment_id,
'timestamp': time.time()
}
except TimeoutError:
raise TimeoutError(f"Inference timeout after {timeout_ms}ms")
except Exception as e:
logging.error(f"Inference failed: {e}")
raise
class EdgeModelOptimizer:
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
self.optimization_techniques = {
'quantization': True,
'pruning': True,
'knowledge_distillation': False,
'weight_sharing': True,
'layer_fusion': True
}
async def optimize_model(self,
model_path: str,
constraints: EdgeConstraints) -> str:
"""Apply comprehensive model optimization for edge deployment"""
# Load original model
model = torch.jit.load(model_path)
original_size = self._get_model_size_mb(model)
logging.info(f"Original model size: {original_size:.2f} MB")
# Apply optimizations based on constraints
optimized_model = model
optimizations_applied = []
# Step 1: Quantization
if original_size > constraints.max_model_size_mb:
if self.config.quantization == 'int8':
optimized_model = await self._apply_int8_quantization(optimized_model)
optimizations_applied.append('int8_quantization')
elif self.config.quantization == 'int4':
optimized_model = await self._apply_int4_quantization(optimized_model)
optimizations_applied.append('int4_quantization')
elif self.config.quantization == 'fp16':
optimized_model = await self._apply_fp16_quantization(optimized_model)
optimizations_applied.append('fp16_quantization')
# Step 2: Pruning (if still too large)
current_size = self._get_model_size_mb(optimized_model)
if current_size > constraints.max_model_size_mb:
sparsity_levels = [0.3, 0.5, 0.7, 0.9]
for sparsity in sparsity_levels:
pruned_model = await self._apply_pruning(optimized_model, sparsity)
pruned_size = self._get_model_size_mb(pruned_model)
if pruned_size <= constraints.max_model_size_mb:
optimized_model = pruned_model
optimizations_applied.append(f'pruning_{sparsity}')
break
# Step 3: Layer fusion
if self.optimization_techniques['layer_fusion']:
optimized_model = await self._apply_layer_fusion(optimized_model)
optimizations_applied.append('layer_fusion')
# Step 4: Knowledge distillation (if accuracy is too low)
current_size = self._get_model_size_mb(optimized_model)
if current_size > constraints.max_model_size_mb:
distilled_model = await self._apply_knowledge_distillation(
model, # Teacher model
target_size_mb=constraints.max_model_size_mb
)
optimized_model = distilled_model
optimizations_applied.append('knowledge_distillation')
# Save optimized model
optimized_path = model_path.replace('.pt', '_optimized.pt')
torch.jit.save(optimized_model, optimized_path)
final_size = self._get_model_size_mb(optimized_model)
compression_ratio = original_size / final_size
logging.info(f"Optimized model size: {final_size:.2f} MB")
logging.info(f"Compression ratio: {compression_ratio:.2f}x")
logging.info(f"Optimizations applied: {optimizations_applied}")
return optimized_path
async def _apply_int8_quantization(self, model: torch.jit.ScriptModule):
"""Apply INT8 quantization"""
# Prepare model for quantization
model.eval()
# Apply dynamic quantization
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear, torch.nn.Conv2d},
dtype=torch.qint8
)
return quantized_model
async def _apply_pruning(self,
model: torch.jit.ScriptModule,
sparsity: float):
"""Apply structured pruning"""
import torch.nn.utils.prune as prune
# Create a copy for pruning
model_copy = torch.jit.trace(model, torch.randn(1, 3, 224, 224))
# Apply pruning to convolutional and linear layers
for name, module in model_copy.named_modules():
if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
prune.l1_unstructured(module, name='weight', amount=sparsity)
prune.remove(module, 'weight')
return model_copy
async def _apply_knowledge_distillation(self,
teacher_model: torch.jit.ScriptModule,
target_size_mb: float):
"""Apply knowledge distillation to create smaller student model"""
# This would involve creating a smaller student architecture
# and training it to mimic the teacher model's outputs
# For brevity, returning a simplified version here
# Create student architecture (simplified)
student_model = self._create_student_architecture(
teacher_model,
target_size_mb
)
# Training loop would go here
# For now, return the student architecture
return student_model
def _create_student_architecture(self,
teacher_model: torch.jit.ScriptModule,
target_size_mb: float):
"""Create smaller student model architecture"""
# Simplified approach: reduce layer dimensions
# In practice, this would be more sophisticated
class StudentModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.conv1 = torch.nn.Conv2d(3, 16, 3, padding=1)
self.conv2 = torch.nn.Conv2d(16, 32, 3, padding=1)
self.pool = torch.nn.AdaptiveAvgPool2d(1)
self.fc = torch.nn.Linear(32, 1000)
def forward(self, x):
x = torch.relu(self.conv1(x))
x = torch.relu(self.conv2(x))
x = self.pool(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
return StudentModel()
Platform-Specific Adapters
# platform_adapters.py
class PlatformAdapter(ABC):
@abstractmethod
async def compile_model(self, model_path: str, target: str) -> str:
pass
@abstractmethod
async def deploy_model(self, compiled_model_path: str, deployment_id: str) -> Any:
pass
@abstractmethod
async def get_hardware_info(self) -> Dict:
pass
class AndroidAdapter(PlatformAdapter):
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
async def compile_model(self, model_path: str, target: str = "tflite") -> str:
"""Compile model for Android deployment"""
if target == "tflite":
return await self._convert_to_tflite(model_path)
elif target == "onnx":
return await self._convert_to_onnx(model_path)
else:
raise ValueError(f"Unsupported compilation target: {target}")
async def _convert_to_tflite(self, model_path: str) -> str:
"""Convert PyTorch model to TensorFlow Lite"""
import tensorflow as tf
# Load PyTorch model
pytorch_model = torch.jit.load(model_path)
# Convert to ONNX first (intermediate step)
onnx_path = model_path.replace('.pt', '.onnx')
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
pytorch_model,
dummy_input,
onnx_path,
export_params=True,
opset_version=11,
do_constant_folding=True
)
# Convert ONNX to TensorFlow
import onnx_tf
onnx_model = onnx.load(onnx_path)
tf_model = onnx_tf.backend.prepare(onnx_model)
# Convert to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_concrete_functions([tf_model])
# Apply optimizations
converter.optimizations = [tf.lite.Optimize.DEFAULT]
if self.config.quantization == 'int8':
converter.representative_dataset = self._get_representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_model = converter.convert()
# Save TFLite model
tflite_path = model_path.replace('.pt', '.tflite')
with open(tflite_path, 'wb') as f:
f.write(tflite_model)
return tflite_path
async def deploy_model(self, compiled_model_path: str, deployment_id: str):
"""Deploy TFLite model for Android inference"""
import tflite_runtime.interpreter as tflite
# Create TFLite interpreter
interpreter = tflite.Interpreter(
model_path=compiled_model_path,
num_threads=self.config.constraints.cpu_cores
)
interpreter.allocate_tensors()
return AndroidTFLiteModel(interpreter, deployment_id)
async def get_hardware_info(self) -> Dict:
"""Get Android device hardware information"""
# This would interface with Android APIs
# For now, return mock data
return {
'platform': 'Android',
'cpu_cores': 8,
'ram_mb': 6144,
'has_gpu': True,
'gpu_type': 'Adreno 640',
'has_npu': True,
'npu_type': 'Hexagon 685'
}
class IOSAdapter(PlatformAdapter):
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
async def compile_model(self, model_path: str, target: str = "coreml") -> str:
"""Compile model for iOS deployment"""
if target == "coreml":
return await self._convert_to_coreml(model_path)
else:
raise ValueError(f"Unsupported compilation target for iOS: {target}")
async def _convert_to_coreml(self, model_path: str) -> str:
"""Convert PyTorch model to Core ML"""
try:
import coremltools as ct
# Load PyTorch model
pytorch_model = torch.jit.load(model_path)
pytorch_model.eval()
# Create dummy input
dummy_input = torch.randn(1, 3, 224, 224)
# Trace the model
traced_model = torch.jit.trace(pytorch_model, dummy_input)
# Convert to Core ML
coreml_model = ct.convert(
traced_model,
inputs=[ct.ImageType(
name="input",
shape=dummy_input.shape,
scale=1/255.0,
bias=[0, 0, 0]
)]
)
# Apply optimizations
if self.config.quantization == 'fp16':
coreml_model = ct.models.neural_network.quantization_utils.quantize_weights(
coreml_model, nbits=16
)
elif self.config.quantization == 'int8':
coreml_model = ct.models.neural_network.quantization_utils.quantize_weights(
coreml_model, nbits=8
)
# Save Core ML model
coreml_path = model_path.replace('.pt', '.mlmodel')
coreml_model.save(coreml_path)
return coreml_path
except ImportError:
raise ImportError("coremltools is required for iOS deployment")
async def deploy_model(self, compiled_model_path: str, deployment_id: str):
"""Deploy Core ML model for iOS inference"""
# This would use Core ML APIs
# For now, return a mock implementation
return IOSCoreMLModel(compiled_model_path, deployment_id)
class JetsonAdapter(PlatformAdapter):
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
async def compile_model(self, model_path: str, target: str = "tensorrt") -> str:
"""Compile model for NVIDIA Jetson deployment"""
if target == "tensorrt":
return await self._convert_to_tensorrt(model_path)
else:
return await self._convert_to_onnx(model_path)
async def _convert_to_tensorrt(self, model_path: str) -> str:
"""Convert PyTorch model to TensorRT"""
try:
import tensorrt as trt
# First convert to ONNX
onnx_path = await self._convert_to_onnx(model_path)
# Create TensorRT engine
logger = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(logger)
network = builder.create_network(
1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)
)
parser = trt.OnnxParser(network, logger)
# Parse ONNX model
with open(onnx_path, 'rb') as model_file:
if not parser.parse(model_file.read()):
raise ValueError("Failed to parse ONNX model")
# Build engine with optimizations
config = builder.create_builder_config()
config.max_workspace_size = 1 << 28 # 256MB
# Enable precision optimizations
if self.config.quantization == 'fp16':
if builder.platform_has_fast_fp16:
config.set_flag(trt.BuilderFlag.FP16)
elif self.config.quantization == 'int8':
if builder.platform_has_fast_int8:
config.set_flag(trt.BuilderFlag.INT8)
engine = builder.build_engine(network, config)
# Save engine
tensorrt_path = model_path.replace('.pt', '.trt')
with open(tensorrt_path, 'wb') as f:
f.write(engine.serialize())
return tensorrt_path
except ImportError:
raise ImportError("tensorrt is required for Jetson deployment")
async def _convert_to_onnx(self, model_path: str) -> str:
"""Convert PyTorch model to ONNX"""
# Load PyTorch model
model = torch.jit.load(model_path)
model.eval()
# Create dummy input
dummy_input = torch.randn(1, 3, 224, 224)
# Export to ONNX
onnx_path = model_path.replace('.pt', '.onnx')
torch.onnx.export(
model,
dummy_input,
onnx_path,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
return onnx_path
async def deploy_model(self, compiled_model_path: str, deployment_id: str):
"""Deploy model for Jetson inference"""
if compiled_model_path.endswith('.trt'):
return JetsonTensorRTModel(compiled_model_path, deployment_id)
elif compiled_model_path.endswith('.onnx'):
return JetsonONNXModel(compiled_model_path, deployment_id)
else:
raise ValueError(f"Unsupported model format: {compiled_model_path}")
Real-time Inference Engine
# edge_inference_engine.py
class EdgeInferenceEngine:
def __init__(self, config: EdgeDeploymentConfig):
self.config = config
self.inference_cache = {}
self.preprocessing_cache = {}
# Performance optimization
self.thread_pool = None
if config.enable_threading:
import concurrent.futures
self.thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=min(config.constraints.cpu_cores, 4)
)
async def infer(self,
model: Any,
input_data: np.ndarray,
timeout_ms: int) -> np.ndarray:
"""Run optimized inference with timeout"""
start_time = time.time()
# Input preprocessing
preprocessed_input = await self._preprocess_input(
input_data,
model.get_input_spec()
)
# Cache check
if self.config.enable_caching:
cache_key = self._generate_cache_key(preprocessed_input)
if cache_key in self.inference_cache:
cached_result = self.inference_cache[cache_key]
# Check cache age
if time.time() - cached_result['timestamp'] < 60: # 1 minute cache
return cached_result['result']
# Run inference with timeout
try:
if self.thread_pool:
# Async inference
future = self.thread_pool.submit(
model.predict,
preprocessed_input
)
result = future.result(timeout=timeout_ms/1000)
else:
# Synchronous inference
result = model.predict(preprocessed_input)
# Check timeout
if (time.time() - start_time) * 1000 > timeout_ms:
raise TimeoutError(f"Inference exceeded {timeout_ms}ms timeout")
# Postprocess result
postprocessed_result = await self._postprocess_output(
result,
model.get_output_spec()
)
# Cache result
if self.config.enable_caching:
self.inference_cache[cache_key] = {
'result': postprocessed_result,
'timestamp': time.time()
}
# Manage cache size
if len(self.inference_cache) > 100:
oldest_key = min(
self.inference_cache.keys(),
key=lambda k: self.inference_cache[k]['timestamp']
)
del self.inference_cache[oldest_key]
return postprocessed_result
except Exception as e:
if isinstance(e, TimeoutError):
raise
else:
raise RuntimeError(f"Inference failed: {e}")
async def _preprocess_input(self,
input_data: np.ndarray,
input_spec: Dict) -> np.ndarray:
"""Optimized input preprocessing"""
# Check preprocessing cache
input_hash = hash(input_data.tobytes())
if input_hash in self.preprocessing_cache:
return self.preprocessing_cache[input_hash]
processed = input_data.copy()
# Resize if needed
if 'shape' in input_spec:
target_shape = input_spec['shape']
if processed.shape != target_shape:
processed = self._resize_input(processed, target_shape)
# Normalize
if 'normalize' in input_spec:
normalize_config = input_spec['normalize']
processed = (processed - normalize_config['mean']) / normalize_config['std']
# Data type conversion
if 'dtype' in input_spec:
processed = processed.astype(input_spec['dtype'])
# Cache preprocessing result
self.preprocessing_cache[input_hash] = processed
# Manage cache size
if len(self.preprocessing_cache) > 50:
oldest_key = next(iter(self.preprocessing_cache))
del self.preprocessing_cache[oldest_key]
return processed
def _resize_input(self, input_data: np.ndarray, target_shape: tuple) -> np.ndarray:
"""Fast input resizing"""
if len(input_data.shape) == 4: # Batch of images
import cv2
resized_batch = []
for i in range(input_data.shape[0]):
img = input_data[i]
resized = cv2.resize(
img,
(target_shape[2], target_shape[1]),
interpolation=cv2.INTER_LINEAR
)
resized_batch.append(resized)
return np.stack(resized_batch)
else: # Single image
import cv2
return cv2.resize(
input_data,
(target_shape[1], target_shape[0]),
interpolation=cv2.INTER_LINEAR
)
class EdgePerformanceMonitor:
def __init__(self):
self.metrics = {}
self.resource_monitor = ResourceMonitor()
def track_inference(self, deployment_id: str):
"""Context manager for tracking inference performance"""
return InferenceTracker(deployment_id, self)
def record_inference(self,
deployment_id: str,
inference_time_ms: float,
memory_usage_mb: float,
cpu_usage_percent: float,
power_consumption_mw: float = None):
"""Record inference metrics"""
if deployment_id not in self.metrics:
self.metrics[deployment_id] = {
'inference_count': 0,
'total_inference_time_ms': 0,
'avg_inference_time_ms': 0,
'max_inference_time_ms': 0,
'avg_memory_usage_mb': 0,
'avg_cpu_usage_percent': 0,
'inference_history': []
}
metrics = self.metrics[deployment_id]
# Update counters
metrics['inference_count'] += 1
metrics['total_inference_time_ms'] += inference_time_ms
metrics['avg_inference_time_ms'] = (
metrics['total_inference_time_ms'] / metrics['inference_count']
)
metrics['max_inference_time_ms'] = max(
metrics['max_inference_time_ms'],
inference_time_ms
)
# Update resource usage
metrics['avg_memory_usage_mb'] = (
(metrics['avg_memory_usage_mb'] * (metrics['inference_count'] - 1) +
memory_usage_mb) / metrics['inference_count']
)
metrics['avg_cpu_usage_percent'] = (
(metrics['avg_cpu_usage_percent'] * (metrics['inference_count'] - 1) +
cpu_usage_percent) / metrics['inference_count']
)
# Store history (last 100 inferences)
inference_record = {
'timestamp': time.time(),
'inference_time_ms': inference_time_ms,
'memory_usage_mb': memory_usage_mb,
'cpu_usage_percent': cpu_usage_percent,
'power_consumption_mw': power_consumption_mw
}
metrics['inference_history'].append(inference_record)
if len(metrics['inference_history']) > 100:
metrics['inference_history'].pop(0)
def get_performance_report(self, deployment_id: str) -> Dict:
"""Generate performance report"""
if deployment_id not in self.metrics:
return {'error': 'No metrics available'}
metrics = self.metrics[deployment_id]
# Calculate additional statistics
recent_inferences = metrics['inference_history'][-10:]
recent_avg_time = (
sum(r['inference_time_ms'] for r in recent_inferences) /
len(recent_inferences) if recent_inferences else 0
)
return {
'deployment_id': deployment_id,
'total_inferences': metrics['inference_count'],
'avg_inference_time_ms': metrics['avg_inference_time_ms'],
'max_inference_time_ms': metrics['max_inference_time_ms'],
'recent_avg_inference_time_ms': recent_avg_time,
'avg_memory_usage_mb': metrics['avg_memory_usage_mb'],
'avg_cpu_usage_percent': metrics['avg_cpu_usage_percent'],
'throughput_fps': 1000 / metrics['avg_inference_time_ms'] if metrics['avg_inference_time_ms'] > 0 else 0,
'performance_trend': self._analyze_performance_trend(metrics['inference_history'])
}
def _analyze_performance_trend(self, history: List[Dict]) -> str:
"""Analyze performance trend"""
if len(history) < 10:
return 'insufficient_data'
# Compare first half with second half
midpoint = len(history) // 2
first_half_avg = sum(
r['inference_time_ms'] for r in history[:midpoint]
) / midpoint
second_half_avg = sum(
r['inference_time_ms'] for r in history[midpoint:]
) / (len(history) - midpoint)
if second_half_avg > first_half_avg * 1.1:
return 'degrading'
elif second_half_avg < first_half_avg * 0.9:
return 'improving'
else:
return 'stable'
class InferenceTracker:
def __init__(self, deployment_id: str, monitor: EdgePerformanceMonitor):
self.deployment_id = deployment_id
self.monitor = monitor
self.start_time = None
self.start_memory = None
self.start_cpu = None
def __enter__(self):
self.start_time = time.time()
self.start_memory = self.monitor.resource_monitor.get_memory_usage_mb()
self.start_cpu = self.monitor.resource_monitor.get_cpu_usage_percent()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.time()
end_memory = self.monitor.resource_monitor.get_memory_usage_mb()
end_cpu = self.monitor.resource_monitor.get_cpu_usage_percent()
inference_time_ms = (end_time - self.start_time) * 1000
avg_memory = (self.start_memory + end_memory) / 2
avg_cpu = (self.start_cpu + end_cpu) / 2
self.monitor.record_inference(
self.deployment_id,
inference_time_ms,
avg_memory,
avg_cpu
)
class ResourceMonitor:
def __init__(self):
import psutil
self.psutil = psutil
def get_memory_usage_mb(self) -> float:
"""Get current memory usage in MB"""
try:
process = self.psutil.Process()
return process.memory_info().rss / 1024 / 1024
except:
return 0.0
def get_cpu_usage_percent(self) -> float:
"""Get current CPU usage percentage"""
try:
return self.psutil.cpu_percent(interval=0.1)
except:
return 0.0
def get_gpu_usage_percent(self) -> float:
"""Get GPU usage if available"""
try:
import GPUtil
gpus = GPUtil.getGPUs()
if gpus:
return gpus[0].load * 100
return 0.0
except:
return 0.0
Model Compression and Optimization
Advanced Compression Techniques
# compression_techniques.py
class AdvancedModelCompression:
def __init__(self):
self.compression_methods = {
'weight_clustering': WeightClusteringCompressor(),
'low_rank_approximation': LowRankApproximation(),
'huffman_coding': HuffmanCodingCompressor(),
'binary_quantization': BinaryQuantizer(),
'mixed_precision': MixedPrecisionOptimizer()
}
async def compress_model(self,
model_path: str,
target_size_mb: float,
accuracy_threshold: float = 0.95) -> Dict:
"""Apply multiple compression techniques to achieve target size"""
original_model = torch.jit.load(model_path)
original_size = self._get_model_size_mb(original_model)
original_accuracy = await self._evaluate_model_accuracy(original_model)
compression_results = []
current_model = original_model
current_size = original_size
# Apply compression techniques in order of effectiveness
techniques = [
('weight_clustering', {'clusters': 256}),
('low_rank_approximation', {'rank_reduction': 0.5}),
('huffman_coding', {}),
('binary_quantization', {'layers': ['conv', 'linear']}),
('mixed_precision', {'fp16_layers': ['conv'], 'int8_layers': ['linear']})
]
for technique_name, config in techniques:
if current_size <= target_size_mb:
break
# Apply compression technique
compressor = self.compression_methods[technique_name]
compressed_model = await compressor.compress(current_model, config)
# Evaluate compressed model
compressed_size = self._get_model_size_mb(compressed_model)
compressed_accuracy = await self._evaluate_model_accuracy(compressed_model)
# Check if accuracy is acceptable
accuracy_retention = compressed_accuracy / original_accuracy
if accuracy_retention >= accuracy_threshold:
current_model = compressed_model
current_size = compressed_size
compression_results.append({
'technique': technique_name,
'size_before_mb': current_size,
'size_after_mb': compressed_size,
'compression_ratio': current_size / compressed_size,
'accuracy_retention': accuracy_retention,
'applied': True
})
else:
compression_results.append({
'technique': technique_name,
'accuracy_retention': accuracy_retention,
'applied': False,
'reason': 'accuracy_below_threshold'
})
# Save compressed model
compressed_path = model_path.replace('.pt', '_compressed.pt')
torch.jit.save(current_model, compressed_path)
final_compression_ratio = original_size / current_size
return {
'compressed_model_path': compressed_path,
'original_size_mb': original_size,
'compressed_size_mb': current_size,
'total_compression_ratio': final_compression_ratio,
'accuracy_retention': compressed_accuracy / original_accuracy,
'compression_techniques': compression_results,
'meets_target_size': current_size <= target_size_mb
}
class WeightClusteringCompressor:
async def compress(self, model: torch.nn.Module, config: Dict):
"""Apply weight clustering compression"""
num_clusters = config.get('clusters', 256)
compressed_model = self._create_model_copy(model)
# Apply clustering to each layer
for name, module in compressed_model.named_modules():
if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
compressed_weights = self._cluster_weights(
module.weight.data,
num_clusters
)
module.weight.data = compressed_weights
return compressed_model
def _cluster_weights(self, weights: torch.Tensor, num_clusters: int):
"""Cluster weights using k-means"""
from sklearn.cluster import KMeans
# Flatten weights
original_shape = weights.shape
flattened_weights = weights.flatten().cpu().numpy()
# Apply k-means clustering
kmeans = KMeans(n_clusters=num_clusters, random_state=42)
cluster_labels = kmeans.fit_predict(flattened_weights.reshape(-1, 1))
# Replace weights with cluster centroids
clustered_weights = kmeans.cluster_centers_[cluster_labels].flatten()
# Reshape back to original shape
clustered_tensor = torch.tensor(
clustered_weights.reshape(original_shape),
dtype=weights.dtype,
device=weights.device
)
return clustered_tensor
class LowRankApproximation:
async def compress(self, model: torch.nn.Module, config: Dict):
"""Apply low-rank approximation to linear layers"""
rank_reduction = config.get('rank_reduction', 0.5)
compressed_model = self._create_model_copy(model)
# Apply low-rank approximation to linear layers
for name, module in compressed_model.named_modules():
if isinstance(module, torch.nn.Linear):
compressed_layer = self._approximate_linear_layer(
module,
rank_reduction
)
# Replace layer in model
self._replace_module(compressed_model, name, compressed_layer)
return compressed_model
def _approximate_linear_layer(self, layer: torch.nn.Linear, rank_reduction: float):
"""Apply SVD-based low-rank approximation"""
# Get weight matrix
weight = layer.weight.data
# Apply SVD
U, S, Vt = torch.linalg.svd(weight, full_matrices=False)
# Determine rank
original_rank = min(weight.shape)
target_rank = int(original_rank * rank_reduction)
# Truncate SVD components
U_truncated = U[:, :target_rank]
S_truncated = S[:target_rank]
Vt_truncated = Vt[:target_rank, :]
# Create two linear layers
layer1 = torch.nn.Linear(
weight.shape[1],
target_rank,
bias=False
)
layer2 = torch.nn.Linear(
target_rank,
weight.shape[0],
bias=layer.bias is not None
)
# Set weights
layer1.weight.data = (Vt_truncated * S_truncated.unsqueeze(1))
layer2.weight.data = U_truncated
if layer.bias is not None:
layer2.bias.data = layer.bias.data
# Return sequential container
return torch.nn.Sequential(layer1, layer2)
class HuffmanCodingCompressor:
async def compress(self, model: torch.nn.Module, config: Dict):
"""Apply Huffman coding to weights"""
# This would implement Huffman coding for weight compression
# For brevity, returning the model as-is
# In practice, this would involve:
# 1. Quantizing weights to discrete values
# 2. Building Huffman tree based on weight frequency
# 3. Encoding weights using variable-length codes
# 4. Storing codebook with model
return model
class MixedPrecisionOptimizer:
async def compress(self, model: torch.nn.Module, config: Dict):
"""Apply mixed precision optimization"""
fp16_layers = config.get('fp16_layers', ['conv'])
int8_layers = config.get('int8_layers', ['linear'])
compressed_model = self._create_model_copy(model)
# Apply mixed precision
for name, module in compressed_model.named_modules():
if any(layer_type in name.lower() for layer_type in fp16_layers):
# Convert to FP16
module.half()
elif any(layer_type in name.lower() for layer_type in int8_layers):
# Apply INT8 quantization
if isinstance(module, (torch.nn.Conv2d, torch.nn.Linear)):
quantized_module = torch.quantization.quantize_dynamic(
module,
{type(module)},
dtype=torch.qint8
)
self._replace_module(compressed_model, name, quantized_module)
return compressed_model
Best Practices Checklist
Conclusion
Edge AI deployment requires a fundamentally different approach than cloud deployment, focusing on resource efficiency, real-time performance, and hardware-specific optimization. By implementing proper model compression, platform-specific compilation, efficient inference engines, and comprehensive monitoring, you can successfully deploy sophisticated AI models to resource-constrained edge devices. Remember that edge AI is about finding the optimal balance between model capability and hardware constraints—start with your requirements, optimize relentlessly, and always validate performance on actual target hardware.