Master MLOps practices for deploying AI models at scale including CI/CD pipelines, monitoring, versioning, and production infrastructure patterns.
MLOps (Machine Learning Operations) bridges the gap between machine learning development and production deployment, ensuring AI models are reliable, scalable, and maintainable in production environments. This comprehensive guide covers the complete MLOps lifecycle from model development to production monitoring and maintenance.
MLOps Fundamentals and Architecture
MLOps encompasses the practices, tools, and cultural philosophies that combine machine learning development with IT operations. The goal is to shorten the ML development lifecycle while delivering high-quality models continuously.
Core MLOps Components
# mlops_architecture.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union, Callable
from enum import Enum
from datetime import datetime
import json
import asyncio
import logging
from pathlib import Path
class ModelStage(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
class DeploymentStrategy(Enum):
BLUE_GREEN = "blue_green"
CANARY = "canary"
ROLLING = "rolling"
A_B_TEST = "a_b_test"
@dataclass
class ModelMetadata:
model_id: str
name: str
version: str
framework: str
algorithm: str
training_data_version: str
performance_metrics: Dict[str, float]
resource_requirements: Dict[str, Any]
created_at: datetime
created_by: str
tags: List[str] = field(default_factory=list)
description: str = ""
@dataclass
class DeploymentConfig:
model_metadata: ModelMetadata
target_environment: str
strategy: DeploymentStrategy
resource_allocation: Dict[str, Any]
scaling_config: Dict[str, Any]
monitoring_config: Dict[str, Any]
rollback_threshold: float = 0.95
traffic_split: Dict[str, float] = field(default_factory=dict)
class ModelRegistry(ABC):
"""Abstract model registry interface"""
@abstractmethod
async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
"""Register a new model version"""
pass
@abstractmethod
async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
"""Get model metadata"""
pass
@abstractmethod
async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
"""List available models"""
pass
@abstractmethod
async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
"""Promote model to different stage"""
pass
@abstractmethod
async def archive_model(self, model_id: str, version: str) -> bool:
"""Archive a model version"""
pass
class ModelDeployer(ABC):
"""Abstract model deployer interface"""
@abstractmethod
async def deploy(self, config: DeploymentConfig) -> Dict[str, Any]:
"""Deploy model to target environment"""
pass
@abstractmethod
async def rollback(self, deployment_id: str) -> Dict[str, Any]:
"""Rollback deployment"""
pass
@abstractmethod
async def scale(self, deployment_id: str, replica_count: int) -> Dict[str, Any]:
"""Scale deployment"""
pass
@abstractmethod
async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
"""Get deployment status"""
pass
class ModelMonitor(ABC):
"""Abstract model monitor interface"""
@abstractmethod
async def track_prediction(self, model_id: str,
input_data: Any,
prediction: Any,
metadata: Dict[str, Any] = None):
"""Track a model prediction"""
pass
@abstractmethod
async def detect_drift(self, model_id: str,
window_size: int = 1000) -> Dict[str, Any]:
"""Detect data/concept drift"""
pass
@abstractmethod
async def get_performance_metrics(self, model_id: str,
time_range: tuple) -> Dict[str, Any]:
"""Get model performance metrics"""
pass
class MLOpsOrchestrator:
"""Main MLOps orchestrator"""
def __init__(self,
model_registry: ModelRegistry,
deployer: ModelDeployer,
monitor: ModelMonitor,
config: Dict[str, Any] = None):
self.model_registry = model_registry
self.deployer = deployer
self.monitor = monitor
self.config = config or {}
self.logger = logging.getLogger(__name__)
# Pipeline state
self.active_deployments: Dict[str, DeploymentConfig] = {}
self.deployment_history: List[Dict[str, Any]] = []
# Event callbacks
self.deployment_callbacks: List[Callable] = []
self.monitoring_callbacks: List[Callable] = []
async def deploy_model(self,
model_id: str,
version: str,
target_environment: str,
strategy: DeploymentStrategy = DeploymentStrategy.ROLLING,
**kwargs) -> Dict[str, Any]:
"""Deploy model with specified strategy"""
deployment_id = f"{model_id}_{version}_{target_environment}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
# Get model metadata
model_metadata = await self.model_registry.get_model(model_id, version)
if not model_metadata:
raise ValueError(f"Model {model_id}:{version} not found in registry")
# Create deployment configuration
deployment_config = DeploymentConfig(
model_metadata=model_metadata,
target_environment=target_environment,
strategy=strategy,
resource_allocation=kwargs.get('resource_allocation', self._get_default_resources()),
scaling_config=kwargs.get('scaling_config', self._get_default_scaling()),
monitoring_config=kwargs.get('monitoring_config', self._get_default_monitoring()),
rollback_threshold=kwargs.get('rollback_threshold', 0.95),
traffic_split=kwargs.get('traffic_split', {})
)
# Validate deployment
validation_result = await self._validate_deployment(deployment_config)
if not validation_result['valid']:
raise ValueError(f"Deployment validation failed: {validation_result['errors']}")
# Execute deployment
self.logger.info(f"Starting deployment {deployment_id}")
deployment_result = await self.deployer.deploy(deployment_config)
# Track deployment
self.active_deployments[deployment_id] = deployment_config
self.deployment_history.append({
'deployment_id': deployment_id,
'model_id': model_id,
'version': version,
'environment': target_environment,
'strategy': strategy.value,
'timestamp': datetime.now(),
'status': 'deployed',
'result': deployment_result
})
# Set up monitoring
await self._setup_monitoring(deployment_id, deployment_config)
# Notify callbacks
for callback in self.deployment_callbacks:
await callback('deployed', deployment_id, deployment_config)
self.logger.info(f"Deployment {deployment_id} completed successfully")
return {
'deployment_id': deployment_id,
'status': 'success',
'endpoint': deployment_result.get('endpoint'),
'metrics': deployment_result.get('metrics', {})
}
except Exception as e:
self.logger.error(f"Deployment {deployment_id} failed: {e}")
# Record failure
self.deployment_history.append({
'deployment_id': deployment_id,
'model_id': model_id,
'version': version,
'environment': target_environment,
'timestamp': datetime.now(),
'status': 'failed',
'error': str(e)
})
return {
'deployment_id': deployment_id,
'status': 'failed',
'error': str(e)
}
async def _validate_deployment(self, config: DeploymentConfig) -> Dict[str, Any]:
"""Validate deployment configuration"""
errors = []
warnings = []
# Check resource requirements
if not config.resource_allocation.get('cpu'):
errors.append("CPU allocation not specified")
if not config.resource_allocation.get('memory'):
errors.append("Memory allocation not specified")
# Check model performance thresholds
performance_metrics = config.model_metadata.performance_metrics
min_accuracy = self.config.get('min_accuracy_threshold', 0.8)
if 'accuracy' in performance_metrics:
if performance_metrics['accuracy'] < min_accuracy:
warnings.append(f"Model accuracy {performance_metrics['accuracy']:.3f} below threshold {min_accuracy}")
# Check environment compatibility
target_env = config.target_environment
supported_envs = self.config.get('supported_environments', ['development', 'staging', 'production'])
if target_env not in supported_envs:
errors.append(f"Unsupported target environment: {target_env}")
return {
'valid': len(errors) == 0,
'errors': errors,
'warnings': warnings
}
async def _setup_monitoring(self, deployment_id: str, config: DeploymentConfig):
"""Set up monitoring for deployment"""
monitoring_config = config.monitoring_config
# Set up drift detection
if monitoring_config.get('drift_detection', True):
asyncio.create_task(
self._monitor_drift(deployment_id, config.model_metadata.model_id)
)
# Set up performance monitoring
if monitoring_config.get('performance_monitoring', True):
asyncio.create_task(
self._monitor_performance(deployment_id, config.model_metadata.model_id)
)
async def _monitor_drift(self, deployment_id: str, model_id: str):
"""Monitor for data drift"""
while deployment_id in self.active_deployments:
try:
drift_result = await self.monitor.detect_drift(model_id)
if drift_result.get('drift_detected'):
self.logger.warning(f"Drift detected for deployment {deployment_id}")
# Notify callbacks
for callback in self.monitoring_callbacks:
await callback('drift_detected', deployment_id, drift_result)
# Wait before next check
await asyncio.sleep(3600) # Check hourly
except Exception as e:
self.logger.error(f"Drift monitoring error for {deployment_id}: {e}")
await asyncio.sleep(3600)
async def _monitor_performance(self, deployment_id: str, model_id: str):
"""Monitor model performance"""
while deployment_id in self.active_deployments:
try:
# Get recent performance metrics
end_time = datetime.now()
start_time = end_time.replace(hour=end_time.hour - 1) # Last hour
performance = await self.monitor.get_performance_metrics(
model_id, (start_time, end_time)
)
# Check against thresholds
config = self.active_deployments[deployment_id]
threshold = config.rollback_threshold
current_performance = performance.get('accuracy', 1.0)
if current_performance < threshold:
self.logger.warning(
f"Performance degradation detected for {deployment_id}: "
f"{current_performance:.3f} < {threshold:.3f}"
)
# Trigger rollback
await self._trigger_automatic_rollback(deployment_id, performance)
await asyncio.sleep(1800) # Check every 30 minutes
except Exception as e:
self.logger.error(f"Performance monitoring error for {deployment_id}: {e}")
await asyncio.sleep(1800)
async def _trigger_automatic_rollback(self, deployment_id: str, performance_data: Dict[str, Any]):
"""Trigger automatic rollback due to performance issues"""
self.logger.info(f"Triggering automatic rollback for {deployment_id}")
try:
rollback_result = await self.deployer.rollback(deployment_id)
# Update deployment status
if deployment_id in self.active_deployments:
del self.active_deployments[deployment_id]
# Record rollback
self.deployment_history.append({
'deployment_id': deployment_id,
'action': 'rollback',
'reason': 'performance_degradation',
'performance_data': performance_data,
'timestamp': datetime.now(),
'result': rollback_result
})
# Notify callbacks
for callback in self.deployment_callbacks:
await callback('rollback', deployment_id, {'reason': 'performance_degradation'})
except Exception as e:
self.logger.error(f"Automatic rollback failed for {deployment_id}: {e}")
def _get_default_resources(self) -> Dict[str, Any]:
"""Get default resource allocation"""
return {
'cpu': '1000m',
'memory': '2Gi',
'gpu': 0
}
def _get_default_scaling(self) -> Dict[str, Any]:
"""Get default scaling configuration"""
return {
'min_replicas': 1,
'max_replicas': 10,
'target_cpu_utilization': 70,
'scale_up_threshold': 80,
'scale_down_threshold': 30
}
def _get_default_monitoring(self) -> Dict[str, Any]:
"""Get default monitoring configuration"""
return {
'drift_detection': True,
'performance_monitoring': True,
'logging_enabled': True,
'metrics_collection': True
}
def add_deployment_callback(self, callback: Callable):
"""Add deployment event callback"""
self.deployment_callbacks.append(callback)
def add_monitoring_callback(self, callback: Callable):
"""Add monitoring event callback"""
self.monitoring_callbacks.append(callback)
async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
"""Get comprehensive deployment status"""
if deployment_id not in self.active_deployments:
return {'status': 'not_found'}
config = self.active_deployments[deployment_id]
# Get status from deployer
deployer_status = await self.deployer.get_deployment_status(deployment_id)
# Get performance metrics
end_time = datetime.now()
start_time = end_time.replace(hour=end_time.hour - 24) # Last 24 hours
performance = await self.monitor.get_performance_metrics(
config.model_metadata.model_id, (start_time, end_time)
)
return {
'deployment_id': deployment_id,
'model_id': config.model_metadata.model_id,
'version': config.model_metadata.version,
'environment': config.target_environment,
'strategy': config.strategy.value,
'deployer_status': deployer_status,
'performance_metrics': performance,
'created_at': [h for h in self.deployment_history if h['deployment_id'] == deployment_id][0]['timestamp']
}
Model Registry Implementation
# model_registry.py
import asyncio
import json
import sqlite3
import pickle
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import hashlib
import shutil
import boto3
from botocore.exceptions import ClientError
class FileSystemModelRegistry(ModelRegistry):
"""File system-based model registry"""
def __init__(self, registry_path: str = "./model_registry"):
self.registry_path = Path(registry_path)
self.registry_path.mkdir(exist_ok=True)
# Initialize metadata database
self.db_path = self.registry_path / "registry.db"
self._init_database()
self.logger = logging.getLogger(__name__)
def _init_database(self):
"""Initialize SQLite database for metadata"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS models (
model_id TEXT,
version TEXT,
name TEXT,
framework TEXT,
algorithm TEXT,
training_data_version TEXT,
performance_metrics TEXT,
resource_requirements TEXT,
created_at TEXT,
created_by TEXT,
tags TEXT,
description TEXT,
stage TEXT,
artifacts_path TEXT,
PRIMARY KEY (model_id, version)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_model_stage ON models(model_id, stage)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_created_at ON models(created_at)
""")
async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
"""Register a new model version"""
# Create version-specific directory
model_dir = self.registry_path / model_metadata.model_id / model_metadata.version
model_dir.mkdir(parents=True, exist_ok=True)
# Save artifacts
artifacts_path = model_dir / "artifacts"
artifacts_path.mkdir(exist_ok=True)
for artifact_name, artifact_data in model_artifacts.items():
artifact_file = artifacts_path / artifact_name
if isinstance(artifact_data, (str, Path)):
# Copy file
shutil.copy2(artifact_data, artifact_file)
elif hasattr(artifact_data, 'save'):
# Model object with save method
artifact_data.save(str(artifact_file))
else:
# Pickle other objects
with open(artifact_file, 'wb') as f:
pickle.dump(artifact_data, f)
# Save metadata
metadata_file = model_dir / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump({
'model_id': model_metadata.model_id,
'name': model_metadata.name,
'version': model_metadata.version,
'framework': model_metadata.framework,
'algorithm': model_metadata.algorithm,
'training_data_version': model_metadata.training_data_version,
'performance_metrics': model_metadata.performance_metrics,
'resource_requirements': model_metadata.resource_requirements,
'created_at': model_metadata.created_at.isoformat(),
'created_by': model_metadata.created_by,
'tags': model_metadata.tags,
'description': model_metadata.description
}, f, indent=2)
# Store in database
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO models
(model_id, version, name, framework, algorithm, training_data_version,
performance_metrics, resource_requirements, created_at, created_by,
tags, description, stage, artifacts_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
model_metadata.model_id,
model_metadata.version,
model_metadata.name,
model_metadata.framework,
model_metadata.algorithm,
model_metadata.training_data_version,
json.dumps(model_metadata.performance_metrics),
json.dumps(model_metadata.resource_requirements),
model_metadata.created_at.isoformat(),
model_metadata.created_by,
json.dumps(model_metadata.tags),
model_metadata.description,
ModelStage.DEVELOPMENT.value,
str(artifacts_path)
))
self.logger.info(f"Registered model {model_metadata.model_id}:{model_metadata.version}")
return f"{model_metadata.model_id}:{model_metadata.version}"
async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
"""Get model metadata"""
with sqlite3.connect(self.db_path) as conn:
if version:
cursor = conn.execute("""
SELECT * FROM models WHERE model_id = ? AND version = ?
""", (model_id, version))
else:
# Get latest version
cursor = conn.execute("""
SELECT * FROM models WHERE model_id = ?
ORDER BY created_at DESC LIMIT 1
""", (model_id,))
row = cursor.fetchone()
if not row:
return None
(model_id, version, name, framework, algorithm, training_data_version,
performance_metrics, resource_requirements, created_at, created_by,
tags, description, stage, artifacts_path) = row
return ModelMetadata(
model_id=model_id,
name=name,
version=version,
framework=framework,
algorithm=algorithm,
training_data_version=training_data_version,
performance_metrics=json.loads(performance_metrics),
resource_requirements=json.loads(resource_requirements),
created_at=datetime.fromisoformat(created_at),
created_by=created_by,
tags=json.loads(tags),
description=description
)
async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
"""List available models"""
with sqlite3.connect(self.db_path) as conn:
if stage:
cursor = conn.execute("""
SELECT * FROM models WHERE stage = ? ORDER BY created_at DESC
""", (stage.value,))
else:
cursor = conn.execute("""
SELECT * FROM models ORDER BY created_at DESC
""")
models = []
for row in cursor.fetchall():
(model_id, version, name, framework, algorithm, training_data_version,
performance_metrics, resource_requirements, created_at, created_by,
tags, description, stage, artifacts_path) = row
models.append(ModelMetadata(
model_id=model_id,
name=name,
version=version,
framework=framework,
algorithm=algorithm,
training_data_version=training_data_version,
performance_metrics=json.loads(performance_metrics),
resource_requirements=json.loads(resource_requirements),
created_at=datetime.fromisoformat(created_at),
created_by=created_by,
tags=json.loads(tags),
description=description
))
return models
async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
"""Promote model to different stage"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
UPDATE models SET stage = ? WHERE model_id = ? AND version = ?
""", (target_stage.value, model_id, version))
if cursor.rowcount > 0:
self.logger.info(f"Promoted model {model_id}:{version} to {target_stage.value}")
return True
return False
async def archive_model(self, model_id: str, version: str) -> bool:
"""Archive a model version"""
return await self.promote_model(model_id, version, ModelStage.ARCHIVED)
def get_model_artifacts_path(self, model_id: str, version: str) -> Optional[Path]:
"""Get path to model artifacts"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT artifacts_path FROM models WHERE model_id = ? AND version = ?
""", (model_id, version))
row = cursor.fetchone()
if row:
return Path(row[0])
return None
class S3ModelRegistry(ModelRegistry):
"""S3-based model registry with DynamoDB metadata"""
def __init__(self,
s3_bucket: str,
dynamodb_table: str,
aws_region: str = "us-east-1"):
self.s3_bucket = s3_bucket
self.dynamodb_table = dynamodb_table
self.aws_region = aws_region
# Initialize AWS clients
self.s3_client = boto3.client('s3', region_name=aws_region)
self.dynamodb = boto3.resource('dynamodb', region_name=aws_region)
self.table = self.dynamodb.Table(dynamodb_table)
self.logger = logging.getLogger(__name__)
async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
"""Register model in S3 and DynamoDB"""
model_key = f"{model_metadata.model_id}/{model_metadata.version}"
# Upload artifacts to S3
artifact_keys = {}
for artifact_name, artifact_data in model_artifacts.items():
s3_key = f"{model_key}/artifacts/{artifact_name}"
if isinstance(artifact_data, (str, Path)):
# Upload file
self.s3_client.upload_file(str(artifact_data), self.s3_bucket, s3_key)
else:
# Serialize and upload object
import tempfile
with tempfile.NamedTemporaryFile() as tmp_file:
if hasattr(artifact_data, 'save'):
artifact_data.save(tmp_file.name)
else:
with open(tmp_file.name, 'wb') as f:
pickle.dump(artifact_data, f)
self.s3_client.upload_file(tmp_file.name, self.s3_bucket, s3_key)
artifact_keys[artifact_name] = s3_key
# Store metadata in DynamoDB
item = {
'ModelId': model_metadata.model_id,
'Version': model_metadata.version,
'Name': model_metadata.name,
'Framework': model_metadata.framework,
'Algorithm': model_metadata.algorithm,
'TrainingDataVersion': model_metadata.training_data_version,
'PerformanceMetrics': model_metadata.performance_metrics,
'ResourceRequirements': model_metadata.resource_requirements,
'CreatedAt': model_metadata.created_at.isoformat(),
'CreatedBy': model_metadata.created_by,
'Tags': model_metadata.tags,
'Description': model_metadata.description,
'Stage': ModelStage.DEVELOPMENT.value,
'ArtifactKeys': artifact_keys,
'S3Bucket': self.s3_bucket
}
self.table.put_item(Item=item)
self.logger.info(f"Registered model {model_metadata.model_id}:{model_metadata.version} in S3")
return f"{model_metadata.model_id}:{model_metadata.version}"
async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
"""Get model metadata from DynamoDB"""
try:
if version:
response = self.table.get_item(
Key={'ModelId': model_id, 'Version': version}
)
else:
# Query for latest version
response = self.table.query(
KeyConditionExpression='ModelId = :model_id',
ExpressionAttributeValues={':model_id': model_id},
ScanIndexForward=False,
Limit=1
)
if response.get('Items'):
item = response['Items'][0]
else:
return None
if 'Item' in response:
item = response['Item']
elif not response.get('Items'):
return None
return ModelMetadata(
model_id=item['ModelId'],
name=item['Name'],
version=item['Version'],
framework=item['Framework'],
algorithm=item['Algorithm'],
training_data_version=item['TrainingDataVersion'],
performance_metrics=item['PerformanceMetrics'],
resource_requirements=item['ResourceRequirements'],
created_at=datetime.fromisoformat(item['CreatedAt']),
created_by=item['CreatedBy'],
tags=item['Tags'],
description=item['Description']
)
except ClientError as e:
self.logger.error(f"Error retrieving model {model_id}:{version}: {e}")
return None
async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
"""List models from DynamoDB"""
try:
if stage:
response = self.table.scan(
FilterExpression='Stage = :stage',
ExpressionAttributeValues={':stage': stage.value}
)
else:
response = self.table.scan()
models = []
for item in response.get('Items', []):
models.append(ModelMetadata(
model_id=item['ModelId'],
name=item['Name'],
version=item['Version'],
framework=item['Framework'],
algorithm=item['Algorithm'],
training_data_version=item['TrainingDataVersion'],
performance_metrics=item['PerformanceMetrics'],
resource_requirements=item['ResourceRequirements'],
created_at=datetime.fromisoformat(item['CreatedAt']),
created_by=item['CreatedBy'],
tags=item['Tags'],
description=item['Description']
))
return models
except ClientError as e:
self.logger.error(f"Error listing models: {e}")
return []
async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
"""Promote model stage in DynamoDB"""
try:
self.table.update_item(
Key={'ModelId': model_id, 'Version': version},
UpdateExpression='SET Stage = :stage',
ExpressionAttributeValues={':stage': target_stage.value}
)
self.logger.info(f"Promoted model {model_id}:{version} to {target_stage.value}")
return True
except ClientError as e:
self.logger.error(f"Error promoting model {model_id}:{version}: {e}")
return False
async def archive_model(self, model_id: str, version: str) -> bool:
"""Archive model version"""
return await self.promote_model(model_id, version, ModelStage.ARCHIVED)
Kubernetes Model Deployment
# kubernetes_deployer.py
import asyncio
import yaml
import json
from typing import Dict, List, Any, Optional
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import base64
import tempfile
class KubernetesModelDeployer(ModelDeployer):
"""Kubernetes-based model deployer"""
def __init__(self,
namespace: str = "default",
kubeconfig_path: Optional[str] = None,
registry_url: str = "localhost:5000"):
self.namespace = namespace
self.registry_url = registry_url
# Load Kubernetes configuration
if kubeconfig_path:
config.load_kube_config(config_file=kubeconfig_path)
else:
try:
config.load_incluster_config()
except:
config.load_kube_config()
# Initialize Kubernetes clients
self.apps_v1 = client.AppsV1Api()
self.core_v1 = client.CoreV1Api()
self.autoscaling_v1 = client.AutoscalingV1Api()
self.networking_v1 = client.NetworkingV1Api()
self.logger = logging.getLogger(__name__)
async def deploy(self, config: DeploymentConfig) -> Dict[str, Any]:
"""Deploy model to Kubernetes"""
deployment_name = f"{config.model_metadata.model_id}-{config.model_metadata.version}".lower()
try:
# Create Docker image (simplified - in practice, use CI/CD pipeline)
image_name = await self._build_model_image(config)
# Create Kubernetes resources
deployment = await self._create_deployment(config, deployment_name, image_name)
service = await self._create_service(config, deployment_name)
if config.strategy == DeploymentStrategy.CANARY:
await self._setup_canary_deployment(config, deployment_name)
elif config.strategy == DeploymentStrategy.A_B_TEST:
await self._setup_ab_test(config, deployment_name)
# Create HPA if autoscaling is enabled
if config.scaling_config.get('enabled', True):
await self._create_hpa(config, deployment_name)
# Wait for deployment to be ready
await self._wait_for_deployment_ready(deployment_name)
endpoint = f"http://{service.status.load_balancer.ingress[0].ip}" if service.status.load_balancer.ingress else f"http://{deployment_name}.{self.namespace}.svc.cluster.local"
return {
'deployment_name': deployment_name,
'endpoint': endpoint,
'image': image_name,
'replicas': deployment.spec.replicas,
'status': 'deployed'
}
except Exception as e:
self.logger.error(f"Deployment failed: {e}")
raise
async def _build_model_image(self, config: DeploymentConfig) -> str:
"""Build Docker image for model (simplified)"""
# In production, this would integrate with CI/CD pipeline
model_metadata = config.model_metadata
image_name = f"{self.registry_url}/{model_metadata.model_id}:{model_metadata.version}"
# Create Dockerfile
dockerfile_content = self._generate_dockerfile(config)
# This would typically use Docker API or buildah
# For now, return the image name
self.logger.info(f"Built image: {image_name}")
return image_name
def _generate_dockerfile(self, config: DeploymentConfig) -> str:
"""Generate Dockerfile for model"""
model_metadata = config.model_metadata
base_images = {
'tensorflow': 'tensorflow/serving:latest',
'pytorch': 'pytorch/torchserve:latest',
'sklearn': 'python:3.9-slim',
'xgboost': 'python:3.9-slim'
}
base_image = base_images.get(model_metadata.framework.lower(), 'python:3.9-slim')
dockerfile = f"""
FROM {base_image}
WORKDIR /app
# Copy model artifacts
COPY model/ /app/model/
# Copy serving code
COPY serve.py /app/
COPY requirements.txt /app/
# Install dependencies
RUN pip install -r requirements.txt
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:8080/health || exit 1
# Run server
CMD ["python", "serve.py"]
"""
return dockerfile
async def _create_deployment(self, config: DeploymentConfig,
deployment_name: str, image_name: str) -> client.V1Deployment:
"""Create Kubernetes deployment"""
model_metadata = config.model_metadata
resource_allocation = config.resource_allocation
# Container specification
container = client.V1Container(
name="model-server",
image=image_name,
ports=[client.V1ContainerPort(container_port=8080)],
resources=client.V1ResourceRequirements(
requests={
"cpu": resource_allocation.get('cpu', '500m'),
"memory": resource_allocation.get('memory', '1Gi')
},
limits={
"cpu": resource_allocation.get('cpu_limit', resource_allocation.get('cpu', '1000m')),
"memory": resource_allocation.get('memory_limit', resource_allocation.get('memory', '2Gi'))
}
),
env=[
client.V1EnvVar(name="MODEL_ID", value=model_metadata.model_id),
client.V1EnvVar(name="MODEL_VERSION", value=model_metadata.version),
client.V1EnvVar(name="FRAMEWORK", value=model_metadata.framework)
],
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(path="/health", port=8080),
initial_delay_seconds=30,
period_seconds=10
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(path="/ready", port=8080),
initial_delay_seconds=5,
period_seconds=5
)
)
# Add GPU resources if needed
if resource_allocation.get('gpu', 0) > 0:
if not container.resources.requests:
container.resources.requests = {}
if not container.resources.limits:
container.resources.limits = {}
gpu_count = str(resource_allocation['gpu'])
container.resources.requests['nvidia.com/gpu'] = gpu_count
container.resources.limits['nvidia.com/gpu'] = gpu_count
# Pod template
pod_template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={
"app": deployment_name,
"model-id": model_metadata.model_id,
"model-version": model_metadata.version,
"framework": model_metadata.framework
}
),
spec=client.V1PodSpec(containers=[container])
)
# Deployment specification
deployment_spec = client.V1DeploymentSpec(
replicas=config.scaling_config.get('min_replicas', 1),
selector=client.V1LabelSelector(
match_labels={"app": deployment_name}
),
template=pod_template,
strategy=client.V1DeploymentStrategy(
type="RollingUpdate",
rolling_update=client.V1RollingUpdateDeployment(
max_surge=1,
max_unavailable=0
)
)
)
# Create deployment
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(
name=deployment_name,
namespace=self.namespace,
labels={
"app": deployment_name,
"model-id": model_metadata.model_id,
"model-version": model_metadata.version
}
),
spec=deployment_spec
)
try:
result = self.apps_v1.create_namespaced_deployment(
namespace=self.namespace,
body=deployment
)
self.logger.info(f"Created deployment: {deployment_name}")
return result
except ApiException as e:
if e.status == 409: # Already exists
result = self.apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=self.namespace,
body=deployment
)
self.logger.info(f"Updated deployment: {deployment_name}")
return result
else:
raise
async def _create_service(self, config: DeploymentConfig,
deployment_name: str) -> client.V1Service:
"""Create Kubernetes service"""
service_spec = client.V1ServiceSpec(
selector={"app": deployment_name},
ports=[
client.V1ServicePort(
port=80,
target_port=8080,
protocol="TCP"
)
],
type="LoadBalancer"
)
service = client.V1Service(
api_version="v1",
kind="Service",
metadata=client.V1ObjectMeta(
name=f"{deployment_name}-service",
namespace=self.namespace
),
spec=service_spec
)
try:
result = self.core_v1.create_namespaced_service(
namespace=self.namespace,
body=service
)
self.logger.info(f"Created service: {deployment_name}-service")
return result
except ApiException as e:
if e.status == 409: # Already exists
result = self.core_v1.patch_namespaced_service(
name=f"{deployment_name}-service",
namespace=self.namespace,
body=service
)
self.logger.info(f"Updated service: {deployment_name}-service")
return result
else:
raise
async def _create_hpa(self, config: DeploymentConfig, deployment_name: str):
"""Create Horizontal Pod Autoscaler"""
scaling_config = config.scaling_config
hpa_spec = client.V1HorizontalPodAutoscalerSpec(
scale_target_ref=client.V1CrossVersionObjectReference(
api_version="apps/v1",
kind="Deployment",
name=deployment_name
),
min_replicas=scaling_config.get('min_replicas', 1),
max_replicas=scaling_config.get('max_replicas', 10),
target_cpu_utilization_percentage=scaling_config.get('target_cpu_utilization', 70)
)
hpa = client.V1HorizontalPodAutoscaler(
api_version="autoscaling/v1",
kind="HorizontalPodAutoscaler",
metadata=client.V1ObjectMeta(
name=f"{deployment_name}-hpa",
namespace=self.namespace
),
spec=hpa_spec
)
try:
self.autoscaling_v1.create_namespaced_horizontal_pod_autoscaler(
namespace=self.namespace,
body=hpa
)
self.logger.info(f"Created HPA: {deployment_name}-hpa")
except ApiException as e:
if e.status == 409: # Already exists
self.autoscaling_v1.patch_namespaced_horizontal_pod_autoscaler(
name=f"{deployment_name}-hpa",
namespace=self.namespace,
body=hpa
)
self.logger.info(f"Updated HPA: {deployment_name}-hpa")
else:
self.logger.error(f"Failed to create HPA: {e}")
async def _setup_canary_deployment(self, config: DeploymentConfig, deployment_name: str):
"""Set up canary deployment with traffic splitting"""
traffic_split = config.traffic_split
canary_percentage = traffic_split.get('canary', 10) # Default 10%
# Create Istio VirtualService for traffic splitting
virtual_service = {
"apiVersion": "networking.istio.io/v1alpha3",
"kind": "VirtualService",
"metadata": {
"name": f"{deployment_name}-vs",
"namespace": self.namespace
},
"spec": {
"http": [
{
"match": [{"headers": {"canary": {"exact": "true"}}}],
"route": [{"destination": {"host": f"{deployment_name}-canary"}}]
},
{
"route": [
{
"destination": {"host": f"{deployment_name}-stable"},
"weight": 100 - canary_percentage
},
{
"destination": {"host": f"{deployment_name}-canary"},
"weight": canary_percentage
}
]
}
]
}
}
# Apply using kubectl (simplified)
self.logger.info(f"Set up canary deployment with {canary_percentage}% traffic")
async def _setup_ab_test(self, config: DeploymentConfig, deployment_name: str):
"""Set up A/B test deployment"""
traffic_split = config.traffic_split
# Create separate deployments for A and B variants
# This would involve creating multiple deployments with different configurations
self.logger.info("Set up A/B test deployment")
async def _wait_for_deployment_ready(self, deployment_name: str, timeout: int = 300):
"""Wait for deployment to be ready"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
try:
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=self.namespace
)
if (deployment.status.ready_replicas and
deployment.status.ready_replicas == deployment.spec.replicas):
self.logger.info(f"Deployment {deployment_name} is ready")
return
await asyncio.sleep(10)
except ApiException as e:
self.logger.error(f"Error checking deployment status: {e}")
await asyncio.sleep(10)
raise TimeoutError(f"Deployment {deployment_name} not ready within {timeout} seconds")
async def rollback(self, deployment_id: str) -> Dict[str, Any]:
"""Rollback deployment"""
try:
# Get deployment name from deployment_id
deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
# Rollback to previous revision
self.apps_v1.create_namespaced_deployment_rollback(
name=deployment_name,
namespace=self.namespace,
body=client.V1DeploymentRollback(
name=deployment_name,
rollback_to=client.V1RollbackConfig(revision=0) # Previous revision
)
)
self.logger.info(f"Rolled back deployment: {deployment_name}")
return {
'status': 'success',
'deployment_name': deployment_name,
'action': 'rollback'
}
except ApiException as e:
self.logger.error(f"Rollback failed: {e}")
return {
'status': 'failed',
'error': str(e)
}
async def scale(self, deployment_id: str, replica_count: int) -> Dict[str, Any]:
"""Scale deployment"""
try:
deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
# Scale deployment
self.apps_v1.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=self.namespace,
body=client.V1Scale(
spec=client.V1ScaleSpec(replicas=replica_count)
)
)
self.logger.info(f"Scaled deployment {deployment_name} to {replica_count} replicas")
return {
'status': 'success',
'deployment_name': deployment_name,
'replicas': replica_count
}
except ApiException as e:
self.logger.error(f"Scaling failed: {e}")
return {
'status': 'failed',
'error': str(e)
}
async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
"""Get deployment status"""
try:
deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
deployment = self.apps_v1.read_namespaced_deployment(
name=deployment_name,
namespace=self.namespace
)
return {
'deployment_name': deployment_name,
'replicas': deployment.spec.replicas,
'ready_replicas': deployment.status.ready_replicas or 0,
'available_replicas': deployment.status.available_replicas or 0,
'conditions': [
{
'type': condition.type,
'status': condition.status,
'reason': condition.reason,
'message': condition.message
}
for condition in (deployment.status.conditions or [])
],
'status': 'ready' if deployment.status.ready_replicas == deployment.spec.replicas else 'pending'
}
except ApiException as e:
return {
'status': 'error',
'error': str(e)
}
Model Monitoring and Observability
# model_monitoring.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from collections import deque, defaultdict
import json
import logging
from abc import ABC, abstractmethod
import sqlite3
@dataclass
class PredictionLog:
model_id: str
prediction_id: str
timestamp: datetime
input_data: Any
prediction: Any
confidence: Optional[float] = None
latency_ms: Optional[float] = None
metadata: Dict[str, Any] = None
@dataclass
class DriftDetectionResult:
model_id: str
drift_detected: bool
drift_score: float
drift_type: str # "data", "concept", "prior"
affected_features: List[str]
recommendation: str
timestamp: datetime
class DatabaseModelMonitor(ModelMonitor):
"""Database-based model monitoring implementation"""
def __init__(self, db_path: str = "./model_monitoring.db"):
self.db_path = db_path
self._init_database()
# In-memory buffers for real-time monitoring
self.prediction_buffer = defaultdict(lambda: deque(maxlen=10000))
self.performance_cache = defaultdict(dict)
# Drift detection parameters
self.drift_detection_window = 1000
self.drift_threshold = 0.05
self.logger = logging.getLogger(__name__)
def _init_database(self):
"""Initialize monitoring database"""
with sqlite3.connect(self.db_path) as conn:
# Predictions table
conn.execute("""
CREATE TABLE IF NOT EXISTS predictions (
prediction_id TEXT PRIMARY KEY,
model_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
input_data TEXT NOT NULL,
prediction TEXT NOT NULL,
confidence REAL,
latency_ms REAL,
metadata TEXT
)
""")
# Performance metrics table
conn.execute("""
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
timestamp TEXT NOT NULL,
time_window_start TEXT,
time_window_end TEXT
)
""")
# Drift detection results table
conn.execute("""
CREATE TABLE IF NOT EXISTS drift_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id TEXT NOT NULL,
drift_type TEXT NOT NULL,
drift_score REAL NOT NULL,
affected_features TEXT,
recommendation TEXT,
timestamp TEXT NOT NULL
)
""")
# Create indices
conn.execute("CREATE INDEX IF NOT EXISTS idx_predictions_model_timestamp ON predictions(model_id, timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_performance_model_timestamp ON performance_metrics(model_id, timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_drift_model_timestamp ON drift_events(model_id, timestamp)")
async def track_prediction(self, model_id: str,
input_data: Any,
prediction: Any,
metadata: Dict[str, Any] = None):
"""Track a model prediction"""
import uuid
prediction_id = str(uuid.uuid4())
timestamp = datetime.now()
# Create prediction log
prediction_log = PredictionLog(
model_id=model_id,
prediction_id=prediction_id,
timestamp=timestamp,
input_data=input_data,
prediction=prediction,
confidence=metadata.get('confidence') if metadata else None,
latency_ms=metadata.get('latency_ms') if metadata else None,
metadata=metadata or {}
)
# Add to buffer for real-time monitoring
self.prediction_buffer[model_id].append(prediction_log)
# Store in database
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO predictions
(prediction_id, model_id, timestamp, input_data, prediction, confidence, latency_ms, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
prediction_id,
model_id,
timestamp.isoformat(),
json.dumps(input_data, default=str),
json.dumps(prediction, default=str),
prediction_log.confidence,
prediction_log.latency_ms,
json.dumps(metadata or {})
))
# Trigger drift detection if buffer is full
if len(self.prediction_buffer[model_id]) >= self.drift_detection_window:
asyncio.create_task(self._background_drift_detection(model_id))
async def detect_drift(self, model_id: str, window_size: int = 1000) -> Dict[str, Any]:
"""Detect data/concept drift"""
# Get recent predictions
recent_predictions = await self._get_recent_predictions(model_id, window_size)
if len(recent_predictions) < window_size // 2:
return {
'drift_detected': False,
'reason': 'insufficient_data',
'predictions_count': len(recent_predictions)
}
# Get reference data (older predictions for comparison)
reference_predictions = await self._get_reference_predictions(model_id, window_size)
if len(reference_predictions) < window_size // 2:
return {
'drift_detected': False,
'reason': 'insufficient_reference_data',
'reference_count': len(reference_predictions)
}
# Perform drift detection
drift_results = []
# Data drift detection (input distribution changes)
data_drift = await self._detect_data_drift(recent_predictions, reference_predictions)
if data_drift['drift_detected']:
drift_results.append(data_drift)
# Concept drift detection (prediction distribution changes)
concept_drift = await self._detect_concept_drift(recent_predictions, reference_predictions)
if concept_drift['drift_detected']:
drift_results.append(concept_drift)
# Prior drift detection (class distribution changes for classification)
prior_drift = await self._detect_prior_drift(recent_predictions, reference_predictions)
if prior_drift['drift_detected']:
drift_results.append(prior_drift)
# Overall drift assessment
overall_drift_detected = len(drift_results) > 0
result = {
'model_id': model_id,
'drift_detected': overall_drift_detected,
'drift_types': [d['drift_type'] for d in drift_results],
'drift_details': drift_results,
'timestamp': datetime.now().isoformat(),
'analysis_window': window_size
}
# Store drift events
if overall_drift_detected:
await self._store_drift_events(drift_results)
return result
async def _detect_data_drift(self, recent_data: List[PredictionLog],
reference_data: List[PredictionLog]) -> Dict[str, Any]:
"""Detect data drift using statistical tests"""
try:
# Extract features from input data
recent_features = self._extract_numerical_features(recent_data)
reference_features = self._extract_numerical_features(reference_data)
if not recent_features or not reference_features:
return {'drift_detected': False, 'reason': 'no_numerical_features'}
# Perform KS test for each feature
from scipy import stats
drift_scores = {}
p_values = {}
for feature_name in recent_features.keys():
if feature_name in reference_features:
recent_values = recent_features[feature_name]
reference_values = reference_features[feature_name]
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(recent_values, reference_values)
drift_scores[feature_name] = ks_stat
p_values[feature_name] = p_value
# Determine if drift is detected
significant_drifts = [
feature for feature, p_val in p_values.items()
if p_val < self.drift_threshold
]
drift_detected = len(significant_drifts) > 0
result = {
'drift_detected': drift_detected,
'drift_type': 'data',
'drift_score': max(drift_scores.values()) if drift_scores else 0,
'affected_features': significant_drifts,
'p_values': p_values,
'recommendation': 'Consider retraining model with recent data' if drift_detected else 'No action needed'
}
return result
except Exception as e:
self.logger.error(f"Data drift detection failed: {e}")
return {'drift_detected': False, 'error': str(e)}
async def _detect_concept_drift(self, recent_data: List[PredictionLog],
reference_data: List[PredictionLog]) -> Dict[str, Any]:
"""Detect concept drift (prediction distribution changes)"""
try:
# Extract predictions
recent_predictions = [log.prediction for log in recent_data]
reference_predictions = [log.prediction for log in reference_data]
# Handle different prediction types
if all(isinstance(p, (int, float)) for p in recent_predictions):
# Regression: compare distributions
from scipy import stats
ks_stat, p_value = stats.ks_2samp(recent_predictions, reference_predictions)
drift_detected = p_value < self.drift_threshold
result = {
'drift_detected': drift_detected,
'drift_type': 'concept',
'drift_score': ks_stat,
'p_value': p_value,
'recommendation': 'Model predictions have shifted significantly' if drift_detected else 'Predictions stable'
}
elif all(isinstance(p, (list, dict)) for p in recent_predictions):
# Classification: compare class distributions
recent_classes = self._extract_predicted_classes(recent_predictions)
reference_classes = self._extract_predicted_classes(reference_predictions)
# Chi-square test for class distribution
from collections import Counter
recent_counts = Counter(recent_classes)
reference_counts = Counter(reference_classes)
# Align class counts
all_classes = set(recent_counts.keys()) | set(reference_counts.keys())
recent_freq = [recent_counts.get(cls, 0) for cls in all_classes]
reference_freq = [reference_counts.get(cls, 0) for cls in all_classes]
if sum(reference_freq) > 0:
from scipy.stats import chisquare
chi2_stat, p_value = chisquare(recent_freq, reference_freq)
drift_detected = p_value < self.drift_threshold
result = {
'drift_detected': drift_detected,
'drift_type': 'concept',
'drift_score': chi2_stat,
'p_value': p_value,
'class_distributions': {
'recent': dict(recent_counts),
'reference': dict(reference_counts)
},
'recommendation': 'Class prediction patterns have changed' if drift_detected else 'Prediction patterns stable'
}
else:
result = {'drift_detected': False, 'reason': 'insufficient_reference_data'}
else:
result = {'drift_detected': False, 'reason': 'unsupported_prediction_type'}
return result
except Exception as e:
self.logger.error(f"Concept drift detection failed: {e}")
return {'drift_detected': False, 'error': str(e)}
async def _detect_prior_drift(self, recent_data: List[PredictionLog],
reference_data: List[PredictionLog]) -> Dict[str, Any]:
"""Detect prior drift (changes in input label distribution)"""
# This would require ground truth labels, which we typically don't have for predictions
# For now, we'll use confidence scores as a proxy
try:
recent_confidences = [log.confidence for log in recent_data if log.confidence is not None]
reference_confidences = [log.confidence for log in reference_data if log.confidence is not None]
if len(recent_confidences) < 10 or len(reference_confidences) < 10:
return {'drift_detected': False, 'reason': 'insufficient_confidence_data'}
# Compare confidence distributions
from scipy import stats
ks_stat, p_value = stats.ks_2samp(recent_confidences, reference_confidences)
drift_detected = p_value < self.drift_threshold
return {
'drift_detected': drift_detected,
'drift_type': 'prior',
'drift_score': ks_stat,
'p_value': p_value,
'recent_avg_confidence': np.mean(recent_confidences),
'reference_avg_confidence': np.mean(reference_confidences),
'recommendation': 'Model confidence patterns have changed' if drift_detected else 'Confidence patterns stable'
}
except Exception as e:
self.logger.error(f"Prior drift detection failed: {e}")
return {'drift_detected': False, 'error': str(e)}
def _extract_numerical_features(self, predictions: List[PredictionLog]) -> Dict[str, List[float]]:
"""Extract numerical features from input data"""
features = defaultdict(list)
for pred_log in predictions:
input_data = pred_log.input_data
if isinstance(input_data, dict):
for key, value in input_data.items():
if isinstance(value, (int, float)):
features[key].append(float(value))
elif isinstance(input_data, (list, tuple)):
for i, value in enumerate(input_data):
if isinstance(value, (int, float)):
features[f'feature_{i}'].append(float(value))
elif isinstance(input_data, (int, float)):
features['value'].append(float(input_data))
return dict(features)
def _extract_predicted_classes(self, predictions: List[Any]) -> List[str]:
"""Extract predicted classes from predictions"""
classes = []
for prediction in predictions:
if isinstance(prediction, dict):
# Assume class with highest probability
if 'class' in prediction:
classes.append(str(prediction['class']))
elif 'label' in prediction:
classes.append(str(prediction['label']))
else:
# Find key with max value
max_key = max(prediction.keys(), key=lambda k: prediction[k] if isinstance(prediction[k], (int, float)) else 0)
classes.append(str(max_key))
elif isinstance(prediction, list):
# Assume index of max value is the class
max_idx = np.argmax(prediction)
classes.append(str(max_idx))
else:
classes.append(str(prediction))
return classes
async def _get_recent_predictions(self, model_id: str, window_size: int) -> List[PredictionLog]:
"""Get recent predictions from database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT prediction_id, timestamp, input_data, prediction, confidence, latency_ms, metadata
FROM predictions
WHERE model_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (model_id, window_size))
predictions = []
for row in cursor.fetchall():
pred_id, timestamp, input_data, prediction, confidence, latency_ms, metadata = row
predictions.append(PredictionLog(
model_id=model_id,
prediction_id=pred_id,
timestamp=datetime.fromisoformat(timestamp),
input_data=json.loads(input_data),
prediction=json.loads(prediction),
confidence=confidence,
latency_ms=latency_ms,
metadata=json.loads(metadata) if metadata else {}
))
return predictions
async def _get_reference_predictions(self, model_id: str, window_size: int) -> List[PredictionLog]:
"""Get reference predictions (older data) from database"""
# Get predictions from 2-3 windows ago as reference
offset = window_size * 2
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT prediction_id, timestamp, input_data, prediction, confidence, latency_ms, metadata
FROM predictions
WHERE model_id = ?
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""", (model_id, window_size, offset))
predictions = []
for row in cursor.fetchall():
pred_id, timestamp, input_data, prediction, confidence, latency_ms, metadata = row
predictions.append(PredictionLog(
model_id=model_id,
prediction_id=pred_id,
timestamp=datetime.fromisoformat(timestamp),
input_data=json.loads(input_data),
prediction=json.loads(prediction),
confidence=confidence,
latency_ms=latency_ms,
metadata=json.loads(metadata) if metadata else {}
))
return predictions
async def _store_drift_events(self, drift_results: List[Dict[str, Any]]):
"""Store drift detection events"""
with sqlite3.connect(self.db_path) as conn:
for drift_result in drift_results:
conn.execute("""
INSERT INTO drift_events
(model_id, drift_type, drift_score, affected_features, recommendation, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (
drift_result.get('model_id', ''),
drift_result['drift_type'],
drift_result['drift_score'],
json.dumps(drift_result.get('affected_features', [])),
drift_result.get('recommendation', ''),
datetime.now().isoformat()
))
async def _background_drift_detection(self, model_id: str):
"""Background drift detection task"""
try:
drift_result = await self.detect_drift(model_id)
if drift_result['drift_detected']:
self.logger.warning(f"Drift detected for model {model_id}: {drift_result}")
except Exception as e:
self.logger.error(f"Background drift detection failed for {model_id}: {e}")
async def get_performance_metrics(self, model_id: str,
time_range: tuple) -> Dict[str, Any]:
"""Get model performance metrics for time range"""
start_time, end_time = time_range
with sqlite3.connect(self.db_path) as conn:
# Get predictions in time range
cursor = conn.execute("""
SELECT prediction, confidence, latency_ms, timestamp
FROM predictions
WHERE model_id = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""", (model_id, start_time.isoformat(), end_time.isoformat()))
predictions = []
confidences = []
latencies = []
timestamps = []
for row in cursor.fetchall():
prediction, confidence, latency_ms, timestamp = row
predictions.append(json.loads(prediction))
if confidence is not None:
confidences.append(confidence)
if latency_ms is not None:
latencies.append(latency_ms)
timestamps.append(datetime.fromisoformat(timestamp))
if not predictions:
return {'error': 'No predictions found for time range'}
# Calculate metrics
metrics = {
'total_predictions': len(predictions),
'time_range': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
},
'prediction_rate': len(predictions) / ((end_time - start_time).total_seconds() / 3600) # per hour
}
if confidences:
metrics['confidence'] = {
'mean': np.mean(confidences),
'std': np.std(confidences),
'min': np.min(confidences),
'max': np.max(confidences),
'p50': np.percentile(confidences, 50),
'p95': np.percentile(confidences, 95)
}
if latencies:
metrics['latency_ms'] = {
'mean': np.mean(latencies),
'std': np.std(latencies),
'min': np.min(latencies),
'max': np.max(latencies),
'p50': np.percentile(latencies, 50),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99)
}
# Get stored performance metrics
cursor = conn.execute("""
SELECT metric_name, metric_value, timestamp
FROM performance_metrics
WHERE model_id = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""", (model_id, start_time.isoformat(), end_time.isoformat()))
stored_metrics = defaultdict(list)
for row in cursor.fetchall():
metric_name, metric_value, timestamp = row
stored_metrics[metric_name].append({
'value': metric_value,
'timestamp': timestamp
})
if stored_metrics:
metrics['historical_metrics'] = dict(stored_metrics)
return metrics
# Example usage and testing
async def demonstrate_mlops():
"""Demonstrate MLOps pipeline"""
# Initialize components
model_registry = FileSystemModelRegistry("./demo_registry")
deployer = KubernetesModelDeployer(namespace="mlops-demo")
monitor = DatabaseModelMonitor("./demo_monitoring.db")
# Create MLOps orchestrator
orchestrator = MLOpsOrchestrator(
model_registry=model_registry,
deployer=deployer,
monitor=monitor
)
# Register a model
model_metadata = ModelMetadata(
model_id="sentiment_classifier",
name="Sentiment Classification Model",
version="v1.0.0",
framework="scikit-learn",
algorithm="logistic_regression",
training_data_version="20241101",
performance_metrics={
"accuracy": 0.85,
"precision": 0.83,
"recall": 0.87,
"f1_score": 0.85
},
resource_requirements={
"cpu": "500m",
"memory": "1Gi"
},
created_at=datetime.now(),
created_by="ml_engineer",
tags=["nlp", "classification"],
description="Sentiment classification model for customer reviews"
)
# Mock model artifacts
model_artifacts = {
"model.pkl": "path/to/model.pkl",
"vectorizer.pkl": "path/to/vectorizer.pkl",
"requirements.txt": "scikit-learn==1.0.2\nnumpy==1.21.0"
}
model_version = await model_registry.register_model(model_metadata, model_artifacts)
print(f"Registered model: {model_version}")
# Deploy model
deployment_result = await orchestrator.deploy_model(
model_id="sentiment_classifier",
version="v1.0.0",
target_environment="staging",
strategy=DeploymentStrategy.ROLLING,
resource_allocation={"cpu": "500m", "memory": "1Gi"},
scaling_config={"min_replicas": 1, "max_replicas": 5}
)
print(f"Deployment result: {deployment_result}")
if __name__ == "__main__":
asyncio.run(demonstrate_mlops())
CI/CD Pipeline for ML Models
# mlops_pipeline.py
import asyncio
import yaml
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import subprocess
import tempfile
import shutil
@dataclass
class PipelineStage:
name: str
script: str
environment: Dict[str, str]
dependencies: List[str] = None
timeout_minutes: int = 30
retry_count: int = 1
@dataclass
class PipelineConfig:
name: str
trigger: Dict[str, Any]
stages: List[PipelineStage]
environment_variables: Dict[str, str] = None
notifications: Dict[str, Any] = None
class MLOpsPipeline:
"""ML-specific CI/CD pipeline"""
def __init__(self,
config_path: str,
workspace_path: str = "./pipeline_workspace"):
self.config_path = Path(config_path)
self.workspace_path = Path(workspace_path)
self.workspace_path.mkdir(exist_ok=True)
# Load pipeline configuration
with open(self.config_path) as f:
config_data = yaml.safe_load(f)
self.config = PipelineConfig(
name=config_data['name'],
trigger=config_data['trigger'],
stages=[
PipelineStage(
name=stage['name'],
script=stage['script'],
environment=stage.get('environment', {}),
dependencies=stage.get('dependencies', []),
timeout_minutes=stage.get('timeout_minutes', 30),
retry_count=stage.get('retry_count', 1)
)
for stage in config_data['stages']
],
environment_variables=config_data.get('environment_variables', {}),
notifications=config_data.get('notifications', {})
)
# Pipeline state
self.execution_history = []
self.current_execution = None
self.logger = logging.getLogger(__name__)
async def execute_pipeline(self,
trigger_data: Dict[str, Any] = None) -> Dict[str, Any]:
"""Execute the complete pipeline"""
execution_id = f"{self.config.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
execution = {
'execution_id': execution_id,
'pipeline_name': self.config.name,
'start_time': datetime.now(),
'trigger_data': trigger_data or {},
'stages': [],
'status': 'running'
}
self.current_execution = execution
try:
self.logger.info(f"Starting pipeline execution: {execution_id}")
# Create execution workspace
execution_workspace = self.workspace_path / execution_id
execution_workspace.mkdir(exist_ok=True)
# Set up environment
environment = self._prepare_environment(execution_workspace)
# Execute stages in order
for stage in self.config.stages:
stage_result = await self._execute_stage(
stage, execution_workspace, environment
)
execution['stages'].append(stage_result)
if not stage_result['success']:
execution['status'] = 'failed'
execution['failed_stage'] = stage.name
break
if execution['status'] == 'running':
execution['status'] = 'success'
execution['end_time'] = datetime.now()
execution['duration'] = (execution['end_time'] - execution['start_time']).total_seconds()
# Send notifications
await self._send_notifications(execution)
self.logger.info(f"Pipeline execution completed: {execution_id} - {execution['status']}")
except Exception as e:
execution['status'] = 'error'
execution['error'] = str(e)
execution['end_time'] = datetime.now()
self.logger.error(f"Pipeline execution failed: {execution_id} - {e}")
finally:
# Clean up workspace (optional)
# shutil.rmtree(execution_workspace, ignore_errors=True)
self.execution_history.append(execution)
self.current_execution = None
return execution
def _prepare_environment(self, workspace: Path) -> Dict[str, str]:
"""Prepare execution environment"""
environment = dict(os.environ)
# Add pipeline environment variables
if self.config.environment_variables:
environment.update(self.config.environment_variables)
# Add workspace paths
environment['PIPELINE_WORKSPACE'] = str(workspace)
environment['PIPELINE_ROOT'] = str(self.workspace_path)
return environment
async def _execute_stage(self,
stage: PipelineStage,
workspace: Path,
base_environment: Dict[str, str]) -> Dict[str, Any]:
"""Execute a single pipeline stage"""
stage_result = {
'name': stage.name,
'start_time': datetime.now(),
'success': False,
'attempts': 0,
'logs': []
}
self.logger.info(f"Executing stage: {stage.name}")
# Prepare stage environment
environment = base_environment.copy()
environment.update(stage.environment)
# Create stage workspace
stage_workspace = workspace / stage.name
stage_workspace.mkdir(exist_ok=True)
environment['STAGE_WORKSPACE'] = str(stage_workspace)
# Execute with retries
for attempt in range(stage.retry_count):
stage_result['attempts'] = attempt + 1
try:
# Execute stage script
result = await self._run_script(
stage.script,
stage_workspace,
environment,
timeout_minutes=stage.timeout_minutes
)
stage_result['logs'].append(result)
if result['return_code'] == 0:
stage_result['success'] = True
break
else:
self.logger.warning(f"Stage {stage.name} attempt {attempt + 1} failed")
except Exception as e:
stage_result['logs'].append({
'error': str(e),
'attempt': attempt + 1
})
self.logger.error(f"Stage {stage.name} attempt {attempt + 1} error: {e}")
stage_result['end_time'] = datetime.now()
stage_result['duration'] = (stage_result['end_time'] - stage_result['start_time']).total_seconds()
if stage_result['success']:
self.logger.info(f"Stage {stage.name} completed successfully")
else:
self.logger.error(f"Stage {stage.name} failed after {stage.retry_count} attempts")
return stage_result
async def _run_script(self,
script: str,
workspace: Path,
environment: Dict[str, str],
timeout_minutes: int) -> Dict[str, Any]:
"""Run a script in the given environment"""
# Create script file
script_file = workspace / "stage_script.sh"
with open(script_file, 'w') as f:
f.write("#!/bin/bash\n")
f.write("set -e\n") # Exit on error
f.write(script)
script_file.chmod(0o755)
# Execute script
process = await asyncio.create_subprocess_exec(
str(script_file),
cwd=workspace,
env=environment,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout_minutes * 60
)
return {
'return_code': process.returncode,
'stdout': stdout.decode('utf-8'),
'stderr': stderr.decode('utf-8'),
'timeout': False
}
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
'return_code': -1,
'stdout': '',
'stderr': f'Script timed out after {timeout_minutes} minutes',
'timeout': True
}
async def _send_notifications(self, execution: Dict[str, Any]):
"""Send pipeline execution notifications"""
if not self.config.notifications:
return
notification_config = self.config.notifications
# Email notifications
if 'email' in notification_config:
await self._send_email_notification(execution, notification_config['email'])
# Slack notifications
if 'slack' in notification_config:
await self._send_slack_notification(execution, notification_config['slack'])
# Webhook notifications
if 'webhook' in notification_config:
await self._send_webhook_notification(execution, notification_config['webhook'])
async def _send_email_notification(self, execution: Dict[str, Any], email_config: Dict[str, Any]):
"""Send email notification"""
# This would integrate with actual email service
self.logger.info(f"Email notification sent for execution {execution['execution_id']}")
async def _send_slack_notification(self, execution: Dict[str, Any], slack_config: Dict[str, Any]):
"""Send Slack notification"""
# This would integrate with Slack API
self.logger.info(f"Slack notification sent for execution {execution['execution_id']}")
async def _send_webhook_notification(self, execution: Dict[str, Any], webhook_config: Dict[str, Any]):
"""Send webhook notification"""
import aiohttp
webhook_url = webhook_config.get('url')
if not webhook_url:
return
payload = {
'execution_id': execution['execution_id'],
'pipeline_name': execution['pipeline_name'],
'status': execution['status'],
'duration': execution.get('duration'),
'timestamp': execution['start_time'].isoformat()
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
if response.status == 200:
self.logger.info(f"Webhook notification sent successfully")
else:
self.logger.warning(f"Webhook notification failed: {response.status}")
except Exception as e:
self.logger.error(f"Webhook notification error: {e}")
def get_execution_status(self, execution_id: str) -> Optional[Dict[str, Any]]:
"""Get execution status"""
if self.current_execution and self.current_execution['execution_id'] == execution_id:
return self.current_execution
for execution in self.execution_history:
if execution['execution_id'] == execution_id:
return execution
return None
def list_executions(self, limit: int = 10) -> List[Dict[str, Any]]:
"""List recent pipeline executions"""
executions = []
if self.current_execution:
executions.append(self.current_execution)
executions.extend(self.execution_history[-limit:])
return executions
# Example pipeline configuration
def create_example_pipeline_config():
"""Create example ML pipeline configuration"""
config = {
'name': 'ml_model_pipeline',
'trigger': {
'branch': 'main',
'paths': ['models/**', 'data/**', 'requirements.txt']
},
'environment_variables': {
'PYTHONPATH': '/app',
'MODEL_REGISTRY_URL': 'http://model-registry:8080'
},
'stages': [
{
'name': 'data_validation',
'script': '''
echo "Validating training data..."
python scripts/validate_data.py --data-path data/train.csv
echo "Data validation completed"
''',
'environment': {
'DATA_VERSION': 'v1.0'
},
'timeout_minutes': 10
},
{
'name': 'model_training',
'script': '''
echo "Training model..."
python scripts/train_model.py --config config/training.yaml
echo "Model training completed"
''',
'dependencies': ['data_validation'],
'timeout_minutes': 60,
'retry_count': 2
},
{
'name': 'model_evaluation',
'script': '''
echo "Evaluating model..."
python scripts/evaluate_model.py --model-path models/latest
echo "Model evaluation completed"
''',
'dependencies': ['model_training'],
'timeout_minutes': 20
},
{
'name': 'model_registration',
'script': '''
echo "Registering model..."
python scripts/register_model.py --model-path models/latest
echo "Model registration completed"
''',
'dependencies': ['model_evaluation'],
'timeout_minutes': 10
},
{
'name': 'deploy_staging',
'script': '''
echo "Deploying to staging..."
python scripts/deploy_model.py --environment staging --model-id $MODEL_ID --version $MODEL_VERSION
echo "Staging deployment completed"
''',
'dependencies': ['model_registration'],
'environment': {
'DEPLOYMENT_ENV': 'staging'
},
'timeout_minutes': 15
}
],
'notifications': {
'slack': {
'webhook_url': 'https://hooks.slack.com/services/...',
'channel': '#ml-ops'
},
'email': {
'recipients': ['ml-team@company.com'],
'smtp_server': 'smtp.company.com'
}
}
}
return config
# Example usage
async def run_example_pipeline():
"""Run example ML pipeline"""
# Create pipeline configuration
config = create_example_pipeline_config()
# Save configuration to file
config_path = "example_pipeline.yaml"
with open(config_path, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
# Create and run pipeline
pipeline = MLOpsPipeline(config_path)
execution_result = await pipeline.execute_pipeline({
'commit_id': 'abc123',
'branch': 'main',
'author': 'ml_engineer'
})
print(f"Pipeline execution result:")
print(json.dumps(execution_result, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(run_example_pipeline())
Production Best Practices
Model Versioning and Governance
- Semantic Versioning: Use semantic versioning for models (major.minor.patch)
- Lineage Tracking: Track data and model lineage for reproducibility
- Approval Workflows: Implement approval processes for production deployments
- Compliance: Ensure regulatory compliance and audit trails
Deployment Strategies
- Blue-Green Deployments: Zero-downtime deployments with instant rollback
- Canary Releases: Gradual rollout with traffic splitting
- A/B Testing: Compare model variants with controlled experiments
- Feature Flags: Control model features and experiments
Monitoring and Alerting
- Model Performance: Track accuracy, latency, and throughput
- Data Drift: Monitor for changes in input data distribution
- Concept Drift: Detect changes in model predictions
- Resource Usage: Monitor CPU, memory, and GPU utilization
Security and Compliance
- Model Security: Protect against adversarial attacks
- Data Privacy: Implement privacy-preserving techniques
- Access Control: Role-based access to models and data
- Audit Trails: Comprehensive logging for compliance
Conclusion
MLOps is essential for scaling AI applications in production environments. The practices, patterns, and implementations covered in this guide provide a comprehensive foundation for building robust, scalable, and maintainable ML systems.
Key success factors:
- Automation: Automate as much of the ML lifecycle as possible
- Monitoring: Implement comprehensive monitoring at all levels
- Governance: Establish clear processes for model lifecycle management
- Collaboration: Foster collaboration between ML and operations teams
- Continuous Improvement: Iterate and improve based on production experience
As MLOps practices continue to evolve, staying current with new tools, techniques, and best practices will be crucial for maintaining competitive AI systems.