Deploy Production-Ready AI with MLOps

David Childs

Master MLOps practices for deploying AI models at scale including CI/CD pipelines, monitoring, versioning, and production infrastructure patterns.

MLOps (Machine Learning Operations) bridges the gap between machine learning development and production deployment, ensuring AI models are reliable, scalable, and maintainable in production environments. This comprehensive guide covers the complete MLOps lifecycle from model development to production monitoring and maintenance.

MLOps Fundamentals and Architecture

MLOps encompasses the practices, tools, and cultural philosophies that combine machine learning development with IT operations. The goal is to shorten the ML development lifecycle while delivering high-quality models continuously.

Core MLOps Components

# mlops_architecture.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union, Callable
from enum import Enum
from datetime import datetime
import json
import asyncio
import logging
from pathlib import Path

class ModelStage(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"
    ARCHIVED = "archived"

class DeploymentStrategy(Enum):
    BLUE_GREEN = "blue_green"
    CANARY = "canary"
    ROLLING = "rolling"
    A_B_TEST = "a_b_test"

@dataclass
class ModelMetadata:
    model_id: str
    name: str
    version: str
    framework: str
    algorithm: str
    training_data_version: str
    performance_metrics: Dict[str, float]
    resource_requirements: Dict[str, Any]
    created_at: datetime
    created_by: str
    tags: List[str] = field(default_factory=list)
    description: str = ""

@dataclass
class DeploymentConfig:
    model_metadata: ModelMetadata
    target_environment: str
    strategy: DeploymentStrategy
    resource_allocation: Dict[str, Any]
    scaling_config: Dict[str, Any]
    monitoring_config: Dict[str, Any]
    rollback_threshold: float = 0.95
    traffic_split: Dict[str, float] = field(default_factory=dict)

class ModelRegistry(ABC):
    """Abstract model registry interface"""
    
    @abstractmethod
    async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
        """Register a new model version"""
        pass
    
    @abstractmethod
    async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
        """Get model metadata"""
        pass
    
    @abstractmethod
    async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
        """List available models"""
        pass
    
    @abstractmethod
    async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
        """Promote model to different stage"""
        pass
    
    @abstractmethod
    async def archive_model(self, model_id: str, version: str) -> bool:
        """Archive a model version"""
        pass

class ModelDeployer(ABC):
    """Abstract model deployer interface"""
    
    @abstractmethod
    async def deploy(self, config: DeploymentConfig) -> Dict[str, Any]:
        """Deploy model to target environment"""
        pass
    
    @abstractmethod
    async def rollback(self, deployment_id: str) -> Dict[str, Any]:
        """Rollback deployment"""
        pass
    
    @abstractmethod
    async def scale(self, deployment_id: str, replica_count: int) -> Dict[str, Any]:
        """Scale deployment"""
        pass
    
    @abstractmethod
    async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
        """Get deployment status"""
        pass

class ModelMonitor(ABC):
    """Abstract model monitor interface"""
    
    @abstractmethod
    async def track_prediction(self, model_id: str, 
                             input_data: Any, 
                             prediction: Any,
                             metadata: Dict[str, Any] = None):
        """Track a model prediction"""
        pass
    
    @abstractmethod
    async def detect_drift(self, model_id: str, 
                         window_size: int = 1000) -> Dict[str, Any]:
        """Detect data/concept drift"""
        pass
    
    @abstractmethod
    async def get_performance_metrics(self, model_id: str,
                                    time_range: tuple) -> Dict[str, Any]:
        """Get model performance metrics"""
        pass

class MLOpsOrchestrator:
    """Main MLOps orchestrator"""
    
    def __init__(self,
                 model_registry: ModelRegistry,
                 deployer: ModelDeployer,
                 monitor: ModelMonitor,
                 config: Dict[str, Any] = None):
        
        self.model_registry = model_registry
        self.deployer = deployer
        self.monitor = monitor
        self.config = config or {}
        
        self.logger = logging.getLogger(__name__)
        
        # Pipeline state
        self.active_deployments: Dict[str, DeploymentConfig] = {}
        self.deployment_history: List[Dict[str, Any]] = []
        
        # Event callbacks
        self.deployment_callbacks: List[Callable] = []
        self.monitoring_callbacks: List[Callable] = []
    
    async def deploy_model(self, 
                          model_id: str,
                          version: str,
                          target_environment: str,
                          strategy: DeploymentStrategy = DeploymentStrategy.ROLLING,
                          **kwargs) -> Dict[str, Any]:
        """Deploy model with specified strategy"""
        
        deployment_id = f"{model_id}_{version}_{target_environment}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        try:
            # Get model metadata
            model_metadata = await self.model_registry.get_model(model_id, version)
            if not model_metadata:
                raise ValueError(f"Model {model_id}:{version} not found in registry")
            
            # Create deployment configuration
            deployment_config = DeploymentConfig(
                model_metadata=model_metadata,
                target_environment=target_environment,
                strategy=strategy,
                resource_allocation=kwargs.get('resource_allocation', self._get_default_resources()),
                scaling_config=kwargs.get('scaling_config', self._get_default_scaling()),
                monitoring_config=kwargs.get('monitoring_config', self._get_default_monitoring()),
                rollback_threshold=kwargs.get('rollback_threshold', 0.95),
                traffic_split=kwargs.get('traffic_split', {})
            )
            
            # Validate deployment
            validation_result = await self._validate_deployment(deployment_config)
            if not validation_result['valid']:
                raise ValueError(f"Deployment validation failed: {validation_result['errors']}")
            
            # Execute deployment
            self.logger.info(f"Starting deployment {deployment_id}")
            deployment_result = await self.deployer.deploy(deployment_config)
            
            # Track deployment
            self.active_deployments[deployment_id] = deployment_config
            self.deployment_history.append({
                'deployment_id': deployment_id,
                'model_id': model_id,
                'version': version,
                'environment': target_environment,
                'strategy': strategy.value,
                'timestamp': datetime.now(),
                'status': 'deployed',
                'result': deployment_result
            })
            
            # Set up monitoring
            await self._setup_monitoring(deployment_id, deployment_config)
            
            # Notify callbacks
            for callback in self.deployment_callbacks:
                await callback('deployed', deployment_id, deployment_config)
            
            self.logger.info(f"Deployment {deployment_id} completed successfully")
            
            return {
                'deployment_id': deployment_id,
                'status': 'success',
                'endpoint': deployment_result.get('endpoint'),
                'metrics': deployment_result.get('metrics', {})
            }
            
        except Exception as e:
            self.logger.error(f"Deployment {deployment_id} failed: {e}")
            
            # Record failure
            self.deployment_history.append({
                'deployment_id': deployment_id,
                'model_id': model_id,
                'version': version,
                'environment': target_environment,
                'timestamp': datetime.now(),
                'status': 'failed',
                'error': str(e)
            })
            
            return {
                'deployment_id': deployment_id,
                'status': 'failed',
                'error': str(e)
            }
    
    async def _validate_deployment(self, config: DeploymentConfig) -> Dict[str, Any]:
        """Validate deployment configuration"""
        
        errors = []
        warnings = []
        
        # Check resource requirements
        if not config.resource_allocation.get('cpu'):
            errors.append("CPU allocation not specified")
        
        if not config.resource_allocation.get('memory'):
            errors.append("Memory allocation not specified")
        
        # Check model performance thresholds
        performance_metrics = config.model_metadata.performance_metrics
        min_accuracy = self.config.get('min_accuracy_threshold', 0.8)
        
        if 'accuracy' in performance_metrics:
            if performance_metrics['accuracy'] < min_accuracy:
                warnings.append(f"Model accuracy {performance_metrics['accuracy']:.3f} below threshold {min_accuracy}")
        
        # Check environment compatibility
        target_env = config.target_environment
        supported_envs = self.config.get('supported_environments', ['development', 'staging', 'production'])
        
        if target_env not in supported_envs:
            errors.append(f"Unsupported target environment: {target_env}")
        
        return {
            'valid': len(errors) == 0,
            'errors': errors,
            'warnings': warnings
        }
    
    async def _setup_monitoring(self, deployment_id: str, config: DeploymentConfig):
        """Set up monitoring for deployment"""
        
        monitoring_config = config.monitoring_config
        
        # Set up drift detection
        if monitoring_config.get('drift_detection', True):
            asyncio.create_task(
                self._monitor_drift(deployment_id, config.model_metadata.model_id)
            )
        
        # Set up performance monitoring
        if monitoring_config.get('performance_monitoring', True):
            asyncio.create_task(
                self._monitor_performance(deployment_id, config.model_metadata.model_id)
            )
    
    async def _monitor_drift(self, deployment_id: str, model_id: str):
        """Monitor for data drift"""
        
        while deployment_id in self.active_deployments:
            try:
                drift_result = await self.monitor.detect_drift(model_id)
                
                if drift_result.get('drift_detected'):
                    self.logger.warning(f"Drift detected for deployment {deployment_id}")
                    
                    # Notify callbacks
                    for callback in self.monitoring_callbacks:
                        await callback('drift_detected', deployment_id, drift_result)
                
                # Wait before next check
                await asyncio.sleep(3600)  # Check hourly
                
            except Exception as e:
                self.logger.error(f"Drift monitoring error for {deployment_id}: {e}")
                await asyncio.sleep(3600)
    
    async def _monitor_performance(self, deployment_id: str, model_id: str):
        """Monitor model performance"""
        
        while deployment_id in self.active_deployments:
            try:
                # Get recent performance metrics
                end_time = datetime.now()
                start_time = end_time.replace(hour=end_time.hour - 1)  # Last hour
                
                performance = await self.monitor.get_performance_metrics(
                    model_id, (start_time, end_time)
                )
                
                # Check against thresholds
                config = self.active_deployments[deployment_id]
                threshold = config.rollback_threshold
                
                current_performance = performance.get('accuracy', 1.0)
                
                if current_performance < threshold:
                    self.logger.warning(
                        f"Performance degradation detected for {deployment_id}: "
                        f"{current_performance:.3f} < {threshold:.3f}"
                    )
                    
                    # Trigger rollback
                    await self._trigger_automatic_rollback(deployment_id, performance)
                
                await asyncio.sleep(1800)  # Check every 30 minutes
                
            except Exception as e:
                self.logger.error(f"Performance monitoring error for {deployment_id}: {e}")
                await asyncio.sleep(1800)
    
    async def _trigger_automatic_rollback(self, deployment_id: str, performance_data: Dict[str, Any]):
        """Trigger automatic rollback due to performance issues"""
        
        self.logger.info(f"Triggering automatic rollback for {deployment_id}")
        
        try:
            rollback_result = await self.deployer.rollback(deployment_id)
            
            # Update deployment status
            if deployment_id in self.active_deployments:
                del self.active_deployments[deployment_id]
            
            # Record rollback
            self.deployment_history.append({
                'deployment_id': deployment_id,
                'action': 'rollback',
                'reason': 'performance_degradation',
                'performance_data': performance_data,
                'timestamp': datetime.now(),
                'result': rollback_result
            })
            
            # Notify callbacks
            for callback in self.deployment_callbacks:
                await callback('rollback', deployment_id, {'reason': 'performance_degradation'})
            
        except Exception as e:
            self.logger.error(f"Automatic rollback failed for {deployment_id}: {e}")
    
    def _get_default_resources(self) -> Dict[str, Any]:
        """Get default resource allocation"""
        return {
            'cpu': '1000m',
            'memory': '2Gi',
            'gpu': 0
        }
    
    def _get_default_scaling(self) -> Dict[str, Any]:
        """Get default scaling configuration"""
        return {
            'min_replicas': 1,
            'max_replicas': 10,
            'target_cpu_utilization': 70,
            'scale_up_threshold': 80,
            'scale_down_threshold': 30
        }
    
    def _get_default_monitoring(self) -> Dict[str, Any]:
        """Get default monitoring configuration"""
        return {
            'drift_detection': True,
            'performance_monitoring': True,
            'logging_enabled': True,
            'metrics_collection': True
        }
    
    def add_deployment_callback(self, callback: Callable):
        """Add deployment event callback"""
        self.deployment_callbacks.append(callback)
    
    def add_monitoring_callback(self, callback: Callable):
        """Add monitoring event callback"""
        self.monitoring_callbacks.append(callback)
    
    async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
        """Get comprehensive deployment status"""
        
        if deployment_id not in self.active_deployments:
            return {'status': 'not_found'}
        
        config = self.active_deployments[deployment_id]
        
        # Get status from deployer
        deployer_status = await self.deployer.get_deployment_status(deployment_id)
        
        # Get performance metrics
        end_time = datetime.now()
        start_time = end_time.replace(hour=end_time.hour - 24)  # Last 24 hours
        
        performance = await self.monitor.get_performance_metrics(
            config.model_metadata.model_id, (start_time, end_time)
        )
        
        return {
            'deployment_id': deployment_id,
            'model_id': config.model_metadata.model_id,
            'version': config.model_metadata.version,
            'environment': config.target_environment,
            'strategy': config.strategy.value,
            'deployer_status': deployer_status,
            'performance_metrics': performance,
            'created_at': [h for h in self.deployment_history if h['deployment_id'] == deployment_id][0]['timestamp']
        }

Model Registry Implementation

# model_registry.py
import asyncio
import json
import sqlite3
import pickle
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
import hashlib
import shutil
import boto3
from botocore.exceptions import ClientError

class FileSystemModelRegistry(ModelRegistry):
    """File system-based model registry"""
    
    def __init__(self, registry_path: str = "./model_registry"):
        self.registry_path = Path(registry_path)
        self.registry_path.mkdir(exist_ok=True)
        
        # Initialize metadata database
        self.db_path = self.registry_path / "registry.db"
        self._init_database()
        
        self.logger = logging.getLogger(__name__)
    
    def _init_database(self):
        """Initialize SQLite database for metadata"""
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS models (
                    model_id TEXT,
                    version TEXT,
                    name TEXT,
                    framework TEXT,
                    algorithm TEXT,
                    training_data_version TEXT,
                    performance_metrics TEXT,
                    resource_requirements TEXT,
                    created_at TEXT,
                    created_by TEXT,
                    tags TEXT,
                    description TEXT,
                    stage TEXT,
                    artifacts_path TEXT,
                    PRIMARY KEY (model_id, version)
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_model_stage ON models(model_id, stage)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_created_at ON models(created_at)
            """)
    
    async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
        """Register a new model version"""
        
        # Create version-specific directory
        model_dir = self.registry_path / model_metadata.model_id / model_metadata.version
        model_dir.mkdir(parents=True, exist_ok=True)
        
        # Save artifacts
        artifacts_path = model_dir / "artifacts"
        artifacts_path.mkdir(exist_ok=True)
        
        for artifact_name, artifact_data in model_artifacts.items():
            artifact_file = artifacts_path / artifact_name
            
            if isinstance(artifact_data, (str, Path)):
                # Copy file
                shutil.copy2(artifact_data, artifact_file)
            elif hasattr(artifact_data, 'save'):
                # Model object with save method
                artifact_data.save(str(artifact_file))
            else:
                # Pickle other objects
                with open(artifact_file, 'wb') as f:
                    pickle.dump(artifact_data, f)
        
        # Save metadata
        metadata_file = model_dir / "metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump({
                'model_id': model_metadata.model_id,
                'name': model_metadata.name,
                'version': model_metadata.version,
                'framework': model_metadata.framework,
                'algorithm': model_metadata.algorithm,
                'training_data_version': model_metadata.training_data_version,
                'performance_metrics': model_metadata.performance_metrics,
                'resource_requirements': model_metadata.resource_requirements,
                'created_at': model_metadata.created_at.isoformat(),
                'created_by': model_metadata.created_by,
                'tags': model_metadata.tags,
                'description': model_metadata.description
            }, f, indent=2)
        
        # Store in database
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT OR REPLACE INTO models 
                (model_id, version, name, framework, algorithm, training_data_version,
                 performance_metrics, resource_requirements, created_at, created_by,
                 tags, description, stage, artifacts_path)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                model_metadata.model_id,
                model_metadata.version,
                model_metadata.name,
                model_metadata.framework,
                model_metadata.algorithm,
                model_metadata.training_data_version,
                json.dumps(model_metadata.performance_metrics),
                json.dumps(model_metadata.resource_requirements),
                model_metadata.created_at.isoformat(),
                model_metadata.created_by,
                json.dumps(model_metadata.tags),
                model_metadata.description,
                ModelStage.DEVELOPMENT.value,
                str(artifacts_path)
            ))
        
        self.logger.info(f"Registered model {model_metadata.model_id}:{model_metadata.version}")
        
        return f"{model_metadata.model_id}:{model_metadata.version}"
    
    async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
        """Get model metadata"""
        
        with sqlite3.connect(self.db_path) as conn:
            if version:
                cursor = conn.execute("""
                    SELECT * FROM models WHERE model_id = ? AND version = ?
                """, (model_id, version))
            else:
                # Get latest version
                cursor = conn.execute("""
                    SELECT * FROM models WHERE model_id = ? 
                    ORDER BY created_at DESC LIMIT 1
                """, (model_id,))
            
            row = cursor.fetchone()
            if not row:
                return None
            
            (model_id, version, name, framework, algorithm, training_data_version,
             performance_metrics, resource_requirements, created_at, created_by,
             tags, description, stage, artifacts_path) = row
            
            return ModelMetadata(
                model_id=model_id,
                name=name,
                version=version,
                framework=framework,
                algorithm=algorithm,
                training_data_version=training_data_version,
                performance_metrics=json.loads(performance_metrics),
                resource_requirements=json.loads(resource_requirements),
                created_at=datetime.fromisoformat(created_at),
                created_by=created_by,
                tags=json.loads(tags),
                description=description
            )
    
    async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
        """List available models"""
        
        with sqlite3.connect(self.db_path) as conn:
            if stage:
                cursor = conn.execute("""
                    SELECT * FROM models WHERE stage = ? ORDER BY created_at DESC
                """, (stage.value,))
            else:
                cursor = conn.execute("""
                    SELECT * FROM models ORDER BY created_at DESC
                """)
            
            models = []
            for row in cursor.fetchall():
                (model_id, version, name, framework, algorithm, training_data_version,
                 performance_metrics, resource_requirements, created_at, created_by,
                 tags, description, stage, artifacts_path) = row
                
                models.append(ModelMetadata(
                    model_id=model_id,
                    name=name,
                    version=version,
                    framework=framework,
                    algorithm=algorithm,
                    training_data_version=training_data_version,
                    performance_metrics=json.loads(performance_metrics),
                    resource_requirements=json.loads(resource_requirements),
                    created_at=datetime.fromisoformat(created_at),
                    created_by=created_by,
                    tags=json.loads(tags),
                    description=description
                ))
            
            return models
    
    async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
        """Promote model to different stage"""
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                UPDATE models SET stage = ? WHERE model_id = ? AND version = ?
            """, (target_stage.value, model_id, version))
            
            if cursor.rowcount > 0:
                self.logger.info(f"Promoted model {model_id}:{version} to {target_stage.value}")
                return True
            
            return False
    
    async def archive_model(self, model_id: str, version: str) -> bool:
        """Archive a model version"""
        
        return await self.promote_model(model_id, version, ModelStage.ARCHIVED)
    
    def get_model_artifacts_path(self, model_id: str, version: str) -> Optional[Path]:
        """Get path to model artifacts"""
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT artifacts_path FROM models WHERE model_id = ? AND version = ?
            """, (model_id, version))
            
            row = cursor.fetchone()
            if row:
                return Path(row[0])
            
            return None

class S3ModelRegistry(ModelRegistry):
    """S3-based model registry with DynamoDB metadata"""
    
    def __init__(self, 
                 s3_bucket: str,
                 dynamodb_table: str,
                 aws_region: str = "us-east-1"):
        
        self.s3_bucket = s3_bucket
        self.dynamodb_table = dynamodb_table
        self.aws_region = aws_region
        
        # Initialize AWS clients
        self.s3_client = boto3.client('s3', region_name=aws_region)
        self.dynamodb = boto3.resource('dynamodb', region_name=aws_region)
        self.table = self.dynamodb.Table(dynamodb_table)
        
        self.logger = logging.getLogger(__name__)
    
    async def register_model(self, model_metadata: ModelMetadata, model_artifacts: Dict[str, Any]) -> str:
        """Register model in S3 and DynamoDB"""
        
        model_key = f"{model_metadata.model_id}/{model_metadata.version}"
        
        # Upload artifacts to S3
        artifact_keys = {}
        for artifact_name, artifact_data in model_artifacts.items():
            s3_key = f"{model_key}/artifacts/{artifact_name}"
            
            if isinstance(artifact_data, (str, Path)):
                # Upload file
                self.s3_client.upload_file(str(artifact_data), self.s3_bucket, s3_key)
            else:
                # Serialize and upload object
                import tempfile
                with tempfile.NamedTemporaryFile() as tmp_file:
                    if hasattr(artifact_data, 'save'):
                        artifact_data.save(tmp_file.name)
                    else:
                        with open(tmp_file.name, 'wb') as f:
                            pickle.dump(artifact_data, f)
                    
                    self.s3_client.upload_file(tmp_file.name, self.s3_bucket, s3_key)
            
            artifact_keys[artifact_name] = s3_key
        
        # Store metadata in DynamoDB
        item = {
            'ModelId': model_metadata.model_id,
            'Version': model_metadata.version,
            'Name': model_metadata.name,
            'Framework': model_metadata.framework,
            'Algorithm': model_metadata.algorithm,
            'TrainingDataVersion': model_metadata.training_data_version,
            'PerformanceMetrics': model_metadata.performance_metrics,
            'ResourceRequirements': model_metadata.resource_requirements,
            'CreatedAt': model_metadata.created_at.isoformat(),
            'CreatedBy': model_metadata.created_by,
            'Tags': model_metadata.tags,
            'Description': model_metadata.description,
            'Stage': ModelStage.DEVELOPMENT.value,
            'ArtifactKeys': artifact_keys,
            'S3Bucket': self.s3_bucket
        }
        
        self.table.put_item(Item=item)
        
        self.logger.info(f"Registered model {model_metadata.model_id}:{model_metadata.version} in S3")
        
        return f"{model_metadata.model_id}:{model_metadata.version}"
    
    async def get_model(self, model_id: str, version: Optional[str] = None) -> Optional[ModelMetadata]:
        """Get model metadata from DynamoDB"""
        
        try:
            if version:
                response = self.table.get_item(
                    Key={'ModelId': model_id, 'Version': version}
                )
            else:
                # Query for latest version
                response = self.table.query(
                    KeyConditionExpression='ModelId = :model_id',
                    ExpressionAttributeValues={':model_id': model_id},
                    ScanIndexForward=False,
                    Limit=1
                )
                
                if response.get('Items'):
                    item = response['Items'][0]
                else:
                    return None
            
            if 'Item' in response:
                item = response['Item']
            elif not response.get('Items'):
                return None
            
            return ModelMetadata(
                model_id=item['ModelId'],
                name=item['Name'],
                version=item['Version'],
                framework=item['Framework'],
                algorithm=item['Algorithm'],
                training_data_version=item['TrainingDataVersion'],
                performance_metrics=item['PerformanceMetrics'],
                resource_requirements=item['ResourceRequirements'],
                created_at=datetime.fromisoformat(item['CreatedAt']),
                created_by=item['CreatedBy'],
                tags=item['Tags'],
                description=item['Description']
            )
        
        except ClientError as e:
            self.logger.error(f"Error retrieving model {model_id}:{version}: {e}")
            return None
    
    async def list_models(self, stage: Optional[ModelStage] = None) -> List[ModelMetadata]:
        """List models from DynamoDB"""
        
        try:
            if stage:
                response = self.table.scan(
                    FilterExpression='Stage = :stage',
                    ExpressionAttributeValues={':stage': stage.value}
                )
            else:
                response = self.table.scan()
            
            models = []
            for item in response.get('Items', []):
                models.append(ModelMetadata(
                    model_id=item['ModelId'],
                    name=item['Name'],
                    version=item['Version'],
                    framework=item['Framework'],
                    algorithm=item['Algorithm'],
                    training_data_version=item['TrainingDataVersion'],
                    performance_metrics=item['PerformanceMetrics'],
                    resource_requirements=item['ResourceRequirements'],
                    created_at=datetime.fromisoformat(item['CreatedAt']),
                    created_by=item['CreatedBy'],
                    tags=item['Tags'],
                    description=item['Description']
                ))
            
            return models
        
        except ClientError as e:
            self.logger.error(f"Error listing models: {e}")
            return []
    
    async def promote_model(self, model_id: str, version: str, target_stage: ModelStage) -> bool:
        """Promote model stage in DynamoDB"""
        
        try:
            self.table.update_item(
                Key={'ModelId': model_id, 'Version': version},
                UpdateExpression='SET Stage = :stage',
                ExpressionAttributeValues={':stage': target_stage.value}
            )
            
            self.logger.info(f"Promoted model {model_id}:{version} to {target_stage.value}")
            return True
        
        except ClientError as e:
            self.logger.error(f"Error promoting model {model_id}:{version}: {e}")
            return False
    
    async def archive_model(self, model_id: str, version: str) -> bool:
        """Archive model version"""
        
        return await self.promote_model(model_id, version, ModelStage.ARCHIVED)

Kubernetes Model Deployment

# kubernetes_deployer.py
import asyncio
import yaml
import json
from typing import Dict, List, Any, Optional
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import base64
import tempfile

class KubernetesModelDeployer(ModelDeployer):
    """Kubernetes-based model deployer"""
    
    def __init__(self, 
                 namespace: str = "default",
                 kubeconfig_path: Optional[str] = None,
                 registry_url: str = "localhost:5000"):
        
        self.namespace = namespace
        self.registry_url = registry_url
        
        # Load Kubernetes configuration
        if kubeconfig_path:
            config.load_kube_config(config_file=kubeconfig_path)
        else:
            try:
                config.load_incluster_config()
            except:
                config.load_kube_config()
        
        # Initialize Kubernetes clients
        self.apps_v1 = client.AppsV1Api()
        self.core_v1 = client.CoreV1Api()
        self.autoscaling_v1 = client.AutoscalingV1Api()
        self.networking_v1 = client.NetworkingV1Api()
        
        self.logger = logging.getLogger(__name__)
    
    async def deploy(self, config: DeploymentConfig) -> Dict[str, Any]:
        """Deploy model to Kubernetes"""
        
        deployment_name = f"{config.model_metadata.model_id}-{config.model_metadata.version}".lower()
        
        try:
            # Create Docker image (simplified - in practice, use CI/CD pipeline)
            image_name = await self._build_model_image(config)
            
            # Create Kubernetes resources
            deployment = await self._create_deployment(config, deployment_name, image_name)
            service = await self._create_service(config, deployment_name)
            
            if config.strategy == DeploymentStrategy.CANARY:
                await self._setup_canary_deployment(config, deployment_name)
            elif config.strategy == DeploymentStrategy.A_B_TEST:
                await self._setup_ab_test(config, deployment_name)
            
            # Create HPA if autoscaling is enabled
            if config.scaling_config.get('enabled', True):
                await self._create_hpa(config, deployment_name)
            
            # Wait for deployment to be ready
            await self._wait_for_deployment_ready(deployment_name)
            
            endpoint = f"http://{service.status.load_balancer.ingress[0].ip}" if service.status.load_balancer.ingress else f"http://{deployment_name}.{self.namespace}.svc.cluster.local"
            
            return {
                'deployment_name': deployment_name,
                'endpoint': endpoint,
                'image': image_name,
                'replicas': deployment.spec.replicas,
                'status': 'deployed'
            }
        
        except Exception as e:
            self.logger.error(f"Deployment failed: {e}")
            raise
    
    async def _build_model_image(self, config: DeploymentConfig) -> str:
        """Build Docker image for model (simplified)"""
        
        # In production, this would integrate with CI/CD pipeline
        model_metadata = config.model_metadata
        image_name = f"{self.registry_url}/{model_metadata.model_id}:{model_metadata.version}"
        
        # Create Dockerfile
        dockerfile_content = self._generate_dockerfile(config)
        
        # This would typically use Docker API or buildah
        # For now, return the image name
        self.logger.info(f"Built image: {image_name}")
        
        return image_name
    
    def _generate_dockerfile(self, config: DeploymentConfig) -> str:
        """Generate Dockerfile for model"""
        
        model_metadata = config.model_metadata
        
        base_images = {
            'tensorflow': 'tensorflow/serving:latest',
            'pytorch': 'pytorch/torchserve:latest',
            'sklearn': 'python:3.9-slim',
            'xgboost': 'python:3.9-slim'
        }
        
        base_image = base_images.get(model_metadata.framework.lower(), 'python:3.9-slim')
        
        dockerfile = f"""
FROM {base_image}

WORKDIR /app

# Copy model artifacts
COPY model/ /app/model/

# Copy serving code
COPY serve.py /app/
COPY requirements.txt /app/

# Install dependencies
RUN pip install -r requirements.txt

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:8080/health || exit 1

# Run server
CMD ["python", "serve.py"]
"""
        
        return dockerfile
    
    async def _create_deployment(self, config: DeploymentConfig, 
                               deployment_name: str, image_name: str) -> client.V1Deployment:
        """Create Kubernetes deployment"""
        
        model_metadata = config.model_metadata
        resource_allocation = config.resource_allocation
        
        # Container specification
        container = client.V1Container(
            name="model-server",
            image=image_name,
            ports=[client.V1ContainerPort(container_port=8080)],
            resources=client.V1ResourceRequirements(
                requests={
                    "cpu": resource_allocation.get('cpu', '500m'),
                    "memory": resource_allocation.get('memory', '1Gi')
                },
                limits={
                    "cpu": resource_allocation.get('cpu_limit', resource_allocation.get('cpu', '1000m')),
                    "memory": resource_allocation.get('memory_limit', resource_allocation.get('memory', '2Gi'))
                }
            ),
            env=[
                client.V1EnvVar(name="MODEL_ID", value=model_metadata.model_id),
                client.V1EnvVar(name="MODEL_VERSION", value=model_metadata.version),
                client.V1EnvVar(name="FRAMEWORK", value=model_metadata.framework)
            ],
            liveness_probe=client.V1Probe(
                http_get=client.V1HTTPGetAction(path="/health", port=8080),
                initial_delay_seconds=30,
                period_seconds=10
            ),
            readiness_probe=client.V1Probe(
                http_get=client.V1HTTPGetAction(path="/ready", port=8080),
                initial_delay_seconds=5,
                period_seconds=5
            )
        )
        
        # Add GPU resources if needed
        if resource_allocation.get('gpu', 0) > 0:
            if not container.resources.requests:
                container.resources.requests = {}
            if not container.resources.limits:
                container.resources.limits = {}
            
            gpu_count = str(resource_allocation['gpu'])
            container.resources.requests['nvidia.com/gpu'] = gpu_count
            container.resources.limits['nvidia.com/gpu'] = gpu_count
        
        # Pod template
        pod_template = client.V1PodTemplateSpec(
            metadata=client.V1ObjectMeta(
                labels={
                    "app": deployment_name,
                    "model-id": model_metadata.model_id,
                    "model-version": model_metadata.version,
                    "framework": model_metadata.framework
                }
            ),
            spec=client.V1PodSpec(containers=[container])
        )
        
        # Deployment specification
        deployment_spec = client.V1DeploymentSpec(
            replicas=config.scaling_config.get('min_replicas', 1),
            selector=client.V1LabelSelector(
                match_labels={"app": deployment_name}
            ),
            template=pod_template,
            strategy=client.V1DeploymentStrategy(
                type="RollingUpdate",
                rolling_update=client.V1RollingUpdateDeployment(
                    max_surge=1,
                    max_unavailable=0
                )
            )
        )
        
        # Create deployment
        deployment = client.V1Deployment(
            api_version="apps/v1",
            kind="Deployment",
            metadata=client.V1ObjectMeta(
                name=deployment_name,
                namespace=self.namespace,
                labels={
                    "app": deployment_name,
                    "model-id": model_metadata.model_id,
                    "model-version": model_metadata.version
                }
            ),
            spec=deployment_spec
        )
        
        try:
            result = self.apps_v1.create_namespaced_deployment(
                namespace=self.namespace,
                body=deployment
            )
            self.logger.info(f"Created deployment: {deployment_name}")
            return result
        
        except ApiException as e:
            if e.status == 409:  # Already exists
                result = self.apps_v1.patch_namespaced_deployment(
                    name=deployment_name,
                    namespace=self.namespace,
                    body=deployment
                )
                self.logger.info(f"Updated deployment: {deployment_name}")
                return result
            else:
                raise
    
    async def _create_service(self, config: DeploymentConfig, 
                            deployment_name: str) -> client.V1Service:
        """Create Kubernetes service"""
        
        service_spec = client.V1ServiceSpec(
            selector={"app": deployment_name},
            ports=[
                client.V1ServicePort(
                    port=80,
                    target_port=8080,
                    protocol="TCP"
                )
            ],
            type="LoadBalancer"
        )
        
        service = client.V1Service(
            api_version="v1",
            kind="Service",
            metadata=client.V1ObjectMeta(
                name=f"{deployment_name}-service",
                namespace=self.namespace
            ),
            spec=service_spec
        )
        
        try:
            result = self.core_v1.create_namespaced_service(
                namespace=self.namespace,
                body=service
            )
            self.logger.info(f"Created service: {deployment_name}-service")
            return result
        
        except ApiException as e:
            if e.status == 409:  # Already exists
                result = self.core_v1.patch_namespaced_service(
                    name=f"{deployment_name}-service",
                    namespace=self.namespace,
                    body=service
                )
                self.logger.info(f"Updated service: {deployment_name}-service")
                return result
            else:
                raise
    
    async def _create_hpa(self, config: DeploymentConfig, deployment_name: str):
        """Create Horizontal Pod Autoscaler"""
        
        scaling_config = config.scaling_config
        
        hpa_spec = client.V1HorizontalPodAutoscalerSpec(
            scale_target_ref=client.V1CrossVersionObjectReference(
                api_version="apps/v1",
                kind="Deployment",
                name=deployment_name
            ),
            min_replicas=scaling_config.get('min_replicas', 1),
            max_replicas=scaling_config.get('max_replicas', 10),
            target_cpu_utilization_percentage=scaling_config.get('target_cpu_utilization', 70)
        )
        
        hpa = client.V1HorizontalPodAutoscaler(
            api_version="autoscaling/v1",
            kind="HorizontalPodAutoscaler",
            metadata=client.V1ObjectMeta(
                name=f"{deployment_name}-hpa",
                namespace=self.namespace
            ),
            spec=hpa_spec
        )
        
        try:
            self.autoscaling_v1.create_namespaced_horizontal_pod_autoscaler(
                namespace=self.namespace,
                body=hpa
            )
            self.logger.info(f"Created HPA: {deployment_name}-hpa")
        
        except ApiException as e:
            if e.status == 409:  # Already exists
                self.autoscaling_v1.patch_namespaced_horizontal_pod_autoscaler(
                    name=f"{deployment_name}-hpa",
                    namespace=self.namespace,
                    body=hpa
                )
                self.logger.info(f"Updated HPA: {deployment_name}-hpa")
            else:
                self.logger.error(f"Failed to create HPA: {e}")
    
    async def _setup_canary_deployment(self, config: DeploymentConfig, deployment_name: str):
        """Set up canary deployment with traffic splitting"""
        
        traffic_split = config.traffic_split
        canary_percentage = traffic_split.get('canary', 10)  # Default 10%
        
        # Create Istio VirtualService for traffic splitting
        virtual_service = {
            "apiVersion": "networking.istio.io/v1alpha3",
            "kind": "VirtualService",
            "metadata": {
                "name": f"{deployment_name}-vs",
                "namespace": self.namespace
            },
            "spec": {
                "http": [
                    {
                        "match": [{"headers": {"canary": {"exact": "true"}}}],
                        "route": [{"destination": {"host": f"{deployment_name}-canary"}}]
                    },
                    {
                        "route": [
                            {
                                "destination": {"host": f"{deployment_name}-stable"},
                                "weight": 100 - canary_percentage
                            },
                            {
                                "destination": {"host": f"{deployment_name}-canary"},
                                "weight": canary_percentage
                            }
                        ]
                    }
                ]
            }
        }
        
        # Apply using kubectl (simplified)
        self.logger.info(f"Set up canary deployment with {canary_percentage}% traffic")
    
    async def _setup_ab_test(self, config: DeploymentConfig, deployment_name: str):
        """Set up A/B test deployment"""
        
        traffic_split = config.traffic_split
        
        # Create separate deployments for A and B variants
        # This would involve creating multiple deployments with different configurations
        self.logger.info("Set up A/B test deployment")
    
    async def _wait_for_deployment_ready(self, deployment_name: str, timeout: int = 300):
        """Wait for deployment to be ready"""
        
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            try:
                deployment = self.apps_v1.read_namespaced_deployment(
                    name=deployment_name,
                    namespace=self.namespace
                )
                
                if (deployment.status.ready_replicas and 
                    deployment.status.ready_replicas == deployment.spec.replicas):
                    self.logger.info(f"Deployment {deployment_name} is ready")
                    return
                
                await asyncio.sleep(10)
            
            except ApiException as e:
                self.logger.error(f"Error checking deployment status: {e}")
                await asyncio.sleep(10)
        
        raise TimeoutError(f"Deployment {deployment_name} not ready within {timeout} seconds")
    
    async def rollback(self, deployment_id: str) -> Dict[str, Any]:
        """Rollback deployment"""
        
        try:
            # Get deployment name from deployment_id
            deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
            
            # Rollback to previous revision
            self.apps_v1.create_namespaced_deployment_rollback(
                name=deployment_name,
                namespace=self.namespace,
                body=client.V1DeploymentRollback(
                    name=deployment_name,
                    rollback_to=client.V1RollbackConfig(revision=0)  # Previous revision
                )
            )
            
            self.logger.info(f"Rolled back deployment: {deployment_name}")
            
            return {
                'status': 'success',
                'deployment_name': deployment_name,
                'action': 'rollback'
            }
        
        except ApiException as e:
            self.logger.error(f"Rollback failed: {e}")
            return {
                'status': 'failed',
                'error': str(e)
            }
    
    async def scale(self, deployment_id: str, replica_count: int) -> Dict[str, Any]:
        """Scale deployment"""
        
        try:
            deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
            
            # Scale deployment
            self.apps_v1.patch_namespaced_deployment_scale(
                name=deployment_name,
                namespace=self.namespace,
                body=client.V1Scale(
                    spec=client.V1ScaleSpec(replicas=replica_count)
                )
            )
            
            self.logger.info(f"Scaled deployment {deployment_name} to {replica_count} replicas")
            
            return {
                'status': 'success',
                'deployment_name': deployment_name,
                'replicas': replica_count
            }
        
        except ApiException as e:
            self.logger.error(f"Scaling failed: {e}")
            return {
                'status': 'failed',
                'error': str(e)
            }
    
    async def get_deployment_status(self, deployment_id: str) -> Dict[str, Any]:
        """Get deployment status"""
        
        try:
            deployment_name = deployment_id.split('_')[0] + '-' + deployment_id.split('_')[1]
            
            deployment = self.apps_v1.read_namespaced_deployment(
                name=deployment_name,
                namespace=self.namespace
            )
            
            return {
                'deployment_name': deployment_name,
                'replicas': deployment.spec.replicas,
                'ready_replicas': deployment.status.ready_replicas or 0,
                'available_replicas': deployment.status.available_replicas or 0,
                'conditions': [
                    {
                        'type': condition.type,
                        'status': condition.status,
                        'reason': condition.reason,
                        'message': condition.message
                    }
                    for condition in (deployment.status.conditions or [])
                ],
                'status': 'ready' if deployment.status.ready_replicas == deployment.spec.replicas else 'pending'
            }
        
        except ApiException as e:
            return {
                'status': 'error',
                'error': str(e)
            }

Model Monitoring and Observability

# model_monitoring.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from collections import deque, defaultdict
import json
import logging
from abc import ABC, abstractmethod
import sqlite3

@dataclass
class PredictionLog:
    model_id: str
    prediction_id: str
    timestamp: datetime
    input_data: Any
    prediction: Any
    confidence: Optional[float] = None
    latency_ms: Optional[float] = None
    metadata: Dict[str, Any] = None

@dataclass
class DriftDetectionResult:
    model_id: str
    drift_detected: bool
    drift_score: float
    drift_type: str  # "data", "concept", "prior"
    affected_features: List[str]
    recommendation: str
    timestamp: datetime

class DatabaseModelMonitor(ModelMonitor):
    """Database-based model monitoring implementation"""
    
    def __init__(self, db_path: str = "./model_monitoring.db"):
        self.db_path = db_path
        self._init_database()
        
        # In-memory buffers for real-time monitoring
        self.prediction_buffer = defaultdict(lambda: deque(maxlen=10000))
        self.performance_cache = defaultdict(dict)
        
        # Drift detection parameters
        self.drift_detection_window = 1000
        self.drift_threshold = 0.05
        
        self.logger = logging.getLogger(__name__)
    
    def _init_database(self):
        """Initialize monitoring database"""
        
        with sqlite3.connect(self.db_path) as conn:
            # Predictions table
            conn.execute("""
                CREATE TABLE IF NOT EXISTS predictions (
                    prediction_id TEXT PRIMARY KEY,
                    model_id TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    input_data TEXT NOT NULL,
                    prediction TEXT NOT NULL,
                    confidence REAL,
                    latency_ms REAL,
                    metadata TEXT
                )
            """)
            
            # Performance metrics table
            conn.execute("""
                CREATE TABLE IF NOT EXISTS performance_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    model_id TEXT NOT NULL,
                    metric_name TEXT NOT NULL,
                    metric_value REAL NOT NULL,
                    timestamp TEXT NOT NULL,
                    time_window_start TEXT,
                    time_window_end TEXT
                )
            """)
            
            # Drift detection results table
            conn.execute("""
                CREATE TABLE IF NOT EXISTS drift_events (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    model_id TEXT NOT NULL,
                    drift_type TEXT NOT NULL,
                    drift_score REAL NOT NULL,
                    affected_features TEXT,
                    recommendation TEXT,
                    timestamp TEXT NOT NULL
                )
            """)
            
            # Create indices
            conn.execute("CREATE INDEX IF NOT EXISTS idx_predictions_model_timestamp ON predictions(model_id, timestamp)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_performance_model_timestamp ON performance_metrics(model_id, timestamp)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_drift_model_timestamp ON drift_events(model_id, timestamp)")
    
    async def track_prediction(self, model_id: str, 
                             input_data: Any, 
                             prediction: Any,
                             metadata: Dict[str, Any] = None):
        """Track a model prediction"""
        
        import uuid
        prediction_id = str(uuid.uuid4())
        timestamp = datetime.now()
        
        # Create prediction log
        prediction_log = PredictionLog(
            model_id=model_id,
            prediction_id=prediction_id,
            timestamp=timestamp,
            input_data=input_data,
            prediction=prediction,
            confidence=metadata.get('confidence') if metadata else None,
            latency_ms=metadata.get('latency_ms') if metadata else None,
            metadata=metadata or {}
        )
        
        # Add to buffer for real-time monitoring
        self.prediction_buffer[model_id].append(prediction_log)
        
        # Store in database
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO predictions 
                (prediction_id, model_id, timestamp, input_data, prediction, confidence, latency_ms, metadata)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                prediction_id,
                model_id,
                timestamp.isoformat(),
                json.dumps(input_data, default=str),
                json.dumps(prediction, default=str),
                prediction_log.confidence,
                prediction_log.latency_ms,
                json.dumps(metadata or {})
            ))
        
        # Trigger drift detection if buffer is full
        if len(self.prediction_buffer[model_id]) >= self.drift_detection_window:
            asyncio.create_task(self._background_drift_detection(model_id))
    
    async def detect_drift(self, model_id: str, window_size: int = 1000) -> Dict[str, Any]:
        """Detect data/concept drift"""
        
        # Get recent predictions
        recent_predictions = await self._get_recent_predictions(model_id, window_size)
        
        if len(recent_predictions) < window_size // 2:
            return {
                'drift_detected': False,
                'reason': 'insufficient_data',
                'predictions_count': len(recent_predictions)
            }
        
        # Get reference data (older predictions for comparison)
        reference_predictions = await self._get_reference_predictions(model_id, window_size)
        
        if len(reference_predictions) < window_size // 2:
            return {
                'drift_detected': False,
                'reason': 'insufficient_reference_data',
                'reference_count': len(reference_predictions)
            }
        
        # Perform drift detection
        drift_results = []
        
        # Data drift detection (input distribution changes)
        data_drift = await self._detect_data_drift(recent_predictions, reference_predictions)
        if data_drift['drift_detected']:
            drift_results.append(data_drift)
        
        # Concept drift detection (prediction distribution changes)
        concept_drift = await self._detect_concept_drift(recent_predictions, reference_predictions)
        if concept_drift['drift_detected']:
            drift_results.append(concept_drift)
        
        # Prior drift detection (class distribution changes for classification)
        prior_drift = await self._detect_prior_drift(recent_predictions, reference_predictions)
        if prior_drift['drift_detected']:
            drift_results.append(prior_drift)
        
        # Overall drift assessment
        overall_drift_detected = len(drift_results) > 0
        
        result = {
            'model_id': model_id,
            'drift_detected': overall_drift_detected,
            'drift_types': [d['drift_type'] for d in drift_results],
            'drift_details': drift_results,
            'timestamp': datetime.now().isoformat(),
            'analysis_window': window_size
        }
        
        # Store drift events
        if overall_drift_detected:
            await self._store_drift_events(drift_results)
        
        return result
    
    async def _detect_data_drift(self, recent_data: List[PredictionLog], 
                               reference_data: List[PredictionLog]) -> Dict[str, Any]:
        """Detect data drift using statistical tests"""
        
        try:
            # Extract features from input data
            recent_features = self._extract_numerical_features(recent_data)
            reference_features = self._extract_numerical_features(reference_data)
            
            if not recent_features or not reference_features:
                return {'drift_detected': False, 'reason': 'no_numerical_features'}
            
            # Perform KS test for each feature
            from scipy import stats
            
            drift_scores = {}
            p_values = {}
            
            for feature_name in recent_features.keys():
                if feature_name in reference_features:
                    recent_values = recent_features[feature_name]
                    reference_values = reference_features[feature_name]
                    
                    # Kolmogorov-Smirnov test
                    ks_stat, p_value = stats.ks_2samp(recent_values, reference_values)
                    
                    drift_scores[feature_name] = ks_stat
                    p_values[feature_name] = p_value
            
            # Determine if drift is detected
            significant_drifts = [
                feature for feature, p_val in p_values.items() 
                if p_val < self.drift_threshold
            ]
            
            drift_detected = len(significant_drifts) > 0
            
            result = {
                'drift_detected': drift_detected,
                'drift_type': 'data',
                'drift_score': max(drift_scores.values()) if drift_scores else 0,
                'affected_features': significant_drifts,
                'p_values': p_values,
                'recommendation': 'Consider retraining model with recent data' if drift_detected else 'No action needed'
            }
            
            return result
        
        except Exception as e:
            self.logger.error(f"Data drift detection failed: {e}")
            return {'drift_detected': False, 'error': str(e)}
    
    async def _detect_concept_drift(self, recent_data: List[PredictionLog], 
                                  reference_data: List[PredictionLog]) -> Dict[str, Any]:
        """Detect concept drift (prediction distribution changes)"""
        
        try:
            # Extract predictions
            recent_predictions = [log.prediction for log in recent_data]
            reference_predictions = [log.prediction for log in reference_data]
            
            # Handle different prediction types
            if all(isinstance(p, (int, float)) for p in recent_predictions):
                # Regression: compare distributions
                from scipy import stats
                
                ks_stat, p_value = stats.ks_2samp(recent_predictions, reference_predictions)
                drift_detected = p_value < self.drift_threshold
                
                result = {
                    'drift_detected': drift_detected,
                    'drift_type': 'concept',
                    'drift_score': ks_stat,
                    'p_value': p_value,
                    'recommendation': 'Model predictions have shifted significantly' if drift_detected else 'Predictions stable'
                }
            
            elif all(isinstance(p, (list, dict)) for p in recent_predictions):
                # Classification: compare class distributions
                recent_classes = self._extract_predicted_classes(recent_predictions)
                reference_classes = self._extract_predicted_classes(reference_predictions)
                
                # Chi-square test for class distribution
                from collections import Counter
                recent_counts = Counter(recent_classes)
                reference_counts = Counter(reference_classes)
                
                # Align class counts
                all_classes = set(recent_counts.keys()) | set(reference_counts.keys())
                recent_freq = [recent_counts.get(cls, 0) for cls in all_classes]
                reference_freq = [reference_counts.get(cls, 0) for cls in all_classes]
                
                if sum(reference_freq) > 0:
                    from scipy.stats import chisquare
                    chi2_stat, p_value = chisquare(recent_freq, reference_freq)
                    drift_detected = p_value < self.drift_threshold
                    
                    result = {
                        'drift_detected': drift_detected,
                        'drift_type': 'concept',
                        'drift_score': chi2_stat,
                        'p_value': p_value,
                        'class_distributions': {
                            'recent': dict(recent_counts),
                            'reference': dict(reference_counts)
                        },
                        'recommendation': 'Class prediction patterns have changed' if drift_detected else 'Prediction patterns stable'
                    }
                else:
                    result = {'drift_detected': False, 'reason': 'insufficient_reference_data'}
            
            else:
                result = {'drift_detected': False, 'reason': 'unsupported_prediction_type'}
            
            return result
        
        except Exception as e:
            self.logger.error(f"Concept drift detection failed: {e}")
            return {'drift_detected': False, 'error': str(e)}
    
    async def _detect_prior_drift(self, recent_data: List[PredictionLog], 
                                reference_data: List[PredictionLog]) -> Dict[str, Any]:
        """Detect prior drift (changes in input label distribution)"""
        
        # This would require ground truth labels, which we typically don't have for predictions
        # For now, we'll use confidence scores as a proxy
        
        try:
            recent_confidences = [log.confidence for log in recent_data if log.confidence is not None]
            reference_confidences = [log.confidence for log in reference_data if log.confidence is not None]
            
            if len(recent_confidences) < 10 or len(reference_confidences) < 10:
                return {'drift_detected': False, 'reason': 'insufficient_confidence_data'}
            
            # Compare confidence distributions
            from scipy import stats
            ks_stat, p_value = stats.ks_2samp(recent_confidences, reference_confidences)
            
            drift_detected = p_value < self.drift_threshold
            
            return {
                'drift_detected': drift_detected,
                'drift_type': 'prior',
                'drift_score': ks_stat,
                'p_value': p_value,
                'recent_avg_confidence': np.mean(recent_confidences),
                'reference_avg_confidence': np.mean(reference_confidences),
                'recommendation': 'Model confidence patterns have changed' if drift_detected else 'Confidence patterns stable'
            }
        
        except Exception as e:
            self.logger.error(f"Prior drift detection failed: {e}")
            return {'drift_detected': False, 'error': str(e)}
    
    def _extract_numerical_features(self, predictions: List[PredictionLog]) -> Dict[str, List[float]]:
        """Extract numerical features from input data"""
        
        features = defaultdict(list)
        
        for pred_log in predictions:
            input_data = pred_log.input_data
            
            if isinstance(input_data, dict):
                for key, value in input_data.items():
                    if isinstance(value, (int, float)):
                        features[key].append(float(value))
            elif isinstance(input_data, (list, tuple)):
                for i, value in enumerate(input_data):
                    if isinstance(value, (int, float)):
                        features[f'feature_{i}'].append(float(value))
            elif isinstance(input_data, (int, float)):
                features['value'].append(float(input_data))
        
        return dict(features)
    
    def _extract_predicted_classes(self, predictions: List[Any]) -> List[str]:
        """Extract predicted classes from predictions"""
        
        classes = []
        
        for prediction in predictions:
            if isinstance(prediction, dict):
                # Assume class with highest probability
                if 'class' in prediction:
                    classes.append(str(prediction['class']))
                elif 'label' in prediction:
                    classes.append(str(prediction['label']))
                else:
                    # Find key with max value
                    max_key = max(prediction.keys(), key=lambda k: prediction[k] if isinstance(prediction[k], (int, float)) else 0)
                    classes.append(str(max_key))
            elif isinstance(prediction, list):
                # Assume index of max value is the class
                max_idx = np.argmax(prediction)
                classes.append(str(max_idx))
            else:
                classes.append(str(prediction))
        
        return classes
    
    async def _get_recent_predictions(self, model_id: str, window_size: int) -> List[PredictionLog]:
        """Get recent predictions from database"""
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT prediction_id, timestamp, input_data, prediction, confidence, latency_ms, metadata
                FROM predictions 
                WHERE model_id = ? 
                ORDER BY timestamp DESC 
                LIMIT ?
            """, (model_id, window_size))
            
            predictions = []
            for row in cursor.fetchall():
                pred_id, timestamp, input_data, prediction, confidence, latency_ms, metadata = row
                
                predictions.append(PredictionLog(
                    model_id=model_id,
                    prediction_id=pred_id,
                    timestamp=datetime.fromisoformat(timestamp),
                    input_data=json.loads(input_data),
                    prediction=json.loads(prediction),
                    confidence=confidence,
                    latency_ms=latency_ms,
                    metadata=json.loads(metadata) if metadata else {}
                ))
            
            return predictions
    
    async def _get_reference_predictions(self, model_id: str, window_size: int) -> List[PredictionLog]:
        """Get reference predictions (older data) from database"""
        
        # Get predictions from 2-3 windows ago as reference
        offset = window_size * 2
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT prediction_id, timestamp, input_data, prediction, confidence, latency_ms, metadata
                FROM predictions 
                WHERE model_id = ? 
                ORDER BY timestamp DESC 
                LIMIT ? OFFSET ?
            """, (model_id, window_size, offset))
            
            predictions = []
            for row in cursor.fetchall():
                pred_id, timestamp, input_data, prediction, confidence, latency_ms, metadata = row
                
                predictions.append(PredictionLog(
                    model_id=model_id,
                    prediction_id=pred_id,
                    timestamp=datetime.fromisoformat(timestamp),
                    input_data=json.loads(input_data),
                    prediction=json.loads(prediction),
                    confidence=confidence,
                    latency_ms=latency_ms,
                    metadata=json.loads(metadata) if metadata else {}
                ))
            
            return predictions
    
    async def _store_drift_events(self, drift_results: List[Dict[str, Any]]):
        """Store drift detection events"""
        
        with sqlite3.connect(self.db_path) as conn:
            for drift_result in drift_results:
                conn.execute("""
                    INSERT INTO drift_events 
                    (model_id, drift_type, drift_score, affected_features, recommendation, timestamp)
                    VALUES (?, ?, ?, ?, ?, ?)
                """, (
                    drift_result.get('model_id', ''),
                    drift_result['drift_type'],
                    drift_result['drift_score'],
                    json.dumps(drift_result.get('affected_features', [])),
                    drift_result.get('recommendation', ''),
                    datetime.now().isoformat()
                ))
    
    async def _background_drift_detection(self, model_id: str):
        """Background drift detection task"""
        
        try:
            drift_result = await self.detect_drift(model_id)
            
            if drift_result['drift_detected']:
                self.logger.warning(f"Drift detected for model {model_id}: {drift_result}")
            
        except Exception as e:
            self.logger.error(f"Background drift detection failed for {model_id}: {e}")
    
    async def get_performance_metrics(self, model_id: str,
                                    time_range: tuple) -> Dict[str, Any]:
        """Get model performance metrics for time range"""
        
        start_time, end_time = time_range
        
        with sqlite3.connect(self.db_path) as conn:
            # Get predictions in time range
            cursor = conn.execute("""
                SELECT prediction, confidence, latency_ms, timestamp
                FROM predictions 
                WHERE model_id = ? AND timestamp BETWEEN ? AND ?
                ORDER BY timestamp
            """, (model_id, start_time.isoformat(), end_time.isoformat()))
            
            predictions = []
            confidences = []
            latencies = []
            timestamps = []
            
            for row in cursor.fetchall():
                prediction, confidence, latency_ms, timestamp = row
                predictions.append(json.loads(prediction))
                if confidence is not None:
                    confidences.append(confidence)
                if latency_ms is not None:
                    latencies.append(latency_ms)
                timestamps.append(datetime.fromisoformat(timestamp))
            
            if not predictions:
                return {'error': 'No predictions found for time range'}
            
            # Calculate metrics
            metrics = {
                'total_predictions': len(predictions),
                'time_range': {
                    'start': start_time.isoformat(),
                    'end': end_time.isoformat()
                },
                'prediction_rate': len(predictions) / ((end_time - start_time).total_seconds() / 3600)  # per hour
            }
            
            if confidences:
                metrics['confidence'] = {
                    'mean': np.mean(confidences),
                    'std': np.std(confidences),
                    'min': np.min(confidences),
                    'max': np.max(confidences),
                    'p50': np.percentile(confidences, 50),
                    'p95': np.percentile(confidences, 95)
                }
            
            if latencies:
                metrics['latency_ms'] = {
                    'mean': np.mean(latencies),
                    'std': np.std(latencies),
                    'min': np.min(latencies),
                    'max': np.max(latencies),
                    'p50': np.percentile(latencies, 50),
                    'p95': np.percentile(latencies, 95),
                    'p99': np.percentile(latencies, 99)
                }
            
            # Get stored performance metrics
            cursor = conn.execute("""
                SELECT metric_name, metric_value, timestamp
                FROM performance_metrics 
                WHERE model_id = ? AND timestamp BETWEEN ? AND ?
                ORDER BY timestamp
            """, (model_id, start_time.isoformat(), end_time.isoformat()))
            
            stored_metrics = defaultdict(list)
            for row in cursor.fetchall():
                metric_name, metric_value, timestamp = row
                stored_metrics[metric_name].append({
                    'value': metric_value,
                    'timestamp': timestamp
                })
            
            if stored_metrics:
                metrics['historical_metrics'] = dict(stored_metrics)
            
            return metrics

# Example usage and testing
async def demonstrate_mlops():
    """Demonstrate MLOps pipeline"""
    
    # Initialize components
    model_registry = FileSystemModelRegistry("./demo_registry")
    deployer = KubernetesModelDeployer(namespace="mlops-demo")
    monitor = DatabaseModelMonitor("./demo_monitoring.db")
    
    # Create MLOps orchestrator
    orchestrator = MLOpsOrchestrator(
        model_registry=model_registry,
        deployer=deployer,
        monitor=monitor
    )
    
    # Register a model
    model_metadata = ModelMetadata(
        model_id="sentiment_classifier",
        name="Sentiment Classification Model",
        version="v1.0.0",
        framework="scikit-learn",
        algorithm="logistic_regression",
        training_data_version="20241101",
        performance_metrics={
            "accuracy": 0.85,
            "precision": 0.83,
            "recall": 0.87,
            "f1_score": 0.85
        },
        resource_requirements={
            "cpu": "500m",
            "memory": "1Gi"
        },
        created_at=datetime.now(),
        created_by="ml_engineer",
        tags=["nlp", "classification"],
        description="Sentiment classification model for customer reviews"
    )
    
    # Mock model artifacts
    model_artifacts = {
        "model.pkl": "path/to/model.pkl",
        "vectorizer.pkl": "path/to/vectorizer.pkl",
        "requirements.txt": "scikit-learn==1.0.2\nnumpy==1.21.0"
    }
    
    model_version = await model_registry.register_model(model_metadata, model_artifacts)
    print(f"Registered model: {model_version}")
    
    # Deploy model
    deployment_result = await orchestrator.deploy_model(
        model_id="sentiment_classifier",
        version="v1.0.0",
        target_environment="staging",
        strategy=DeploymentStrategy.ROLLING,
        resource_allocation={"cpu": "500m", "memory": "1Gi"},
        scaling_config={"min_replicas": 1, "max_replicas": 5}
    )
    
    print(f"Deployment result: {deployment_result}")

if __name__ == "__main__":
    asyncio.run(demonstrate_mlops())

CI/CD Pipeline for ML Models

# mlops_pipeline.py
import asyncio
import yaml
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import subprocess
import tempfile
import shutil

@dataclass
class PipelineStage:
    name: str
    script: str
    environment: Dict[str, str]
    dependencies: List[str] = None
    timeout_minutes: int = 30
    retry_count: int = 1

@dataclass
class PipelineConfig:
    name: str
    trigger: Dict[str, Any]
    stages: List[PipelineStage]
    environment_variables: Dict[str, str] = None
    notifications: Dict[str, Any] = None

class MLOpsPipeline:
    """ML-specific CI/CD pipeline"""
    
    def __init__(self, 
                 config_path: str,
                 workspace_path: str = "./pipeline_workspace"):
        
        self.config_path = Path(config_path)
        self.workspace_path = Path(workspace_path)
        self.workspace_path.mkdir(exist_ok=True)
        
        # Load pipeline configuration
        with open(self.config_path) as f:
            config_data = yaml.safe_load(f)
        
        self.config = PipelineConfig(
            name=config_data['name'],
            trigger=config_data['trigger'],
            stages=[
                PipelineStage(
                    name=stage['name'],
                    script=stage['script'],
                    environment=stage.get('environment', {}),
                    dependencies=stage.get('dependencies', []),
                    timeout_minutes=stage.get('timeout_minutes', 30),
                    retry_count=stage.get('retry_count', 1)
                )
                for stage in config_data['stages']
            ],
            environment_variables=config_data.get('environment_variables', {}),
            notifications=config_data.get('notifications', {})
        )
        
        # Pipeline state
        self.execution_history = []
        self.current_execution = None
        
        self.logger = logging.getLogger(__name__)
    
    async def execute_pipeline(self, 
                             trigger_data: Dict[str, Any] = None) -> Dict[str, Any]:
        """Execute the complete pipeline"""
        
        execution_id = f"{self.config.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        execution = {
            'execution_id': execution_id,
            'pipeline_name': self.config.name,
            'start_time': datetime.now(),
            'trigger_data': trigger_data or {},
            'stages': [],
            'status': 'running'
        }
        
        self.current_execution = execution
        
        try:
            self.logger.info(f"Starting pipeline execution: {execution_id}")
            
            # Create execution workspace
            execution_workspace = self.workspace_path / execution_id
            execution_workspace.mkdir(exist_ok=True)
            
            # Set up environment
            environment = self._prepare_environment(execution_workspace)
            
            # Execute stages in order
            for stage in self.config.stages:
                stage_result = await self._execute_stage(
                    stage, execution_workspace, environment
                )
                
                execution['stages'].append(stage_result)
                
                if not stage_result['success']:
                    execution['status'] = 'failed'
                    execution['failed_stage'] = stage.name
                    break
            
            if execution['status'] == 'running':
                execution['status'] = 'success'
            
            execution['end_time'] = datetime.now()
            execution['duration'] = (execution['end_time'] - execution['start_time']).total_seconds()
            
            # Send notifications
            await self._send_notifications(execution)
            
            self.logger.info(f"Pipeline execution completed: {execution_id} - {execution['status']}")
            
        except Exception as e:
            execution['status'] = 'error'
            execution['error'] = str(e)
            execution['end_time'] = datetime.now()
            
            self.logger.error(f"Pipeline execution failed: {execution_id} - {e}")
        
        finally:
            # Clean up workspace (optional)
            # shutil.rmtree(execution_workspace, ignore_errors=True)
            
            self.execution_history.append(execution)
            self.current_execution = None
        
        return execution
    
    def _prepare_environment(self, workspace: Path) -> Dict[str, str]:
        """Prepare execution environment"""
        
        environment = dict(os.environ)
        
        # Add pipeline environment variables
        if self.config.environment_variables:
            environment.update(self.config.environment_variables)
        
        # Add workspace paths
        environment['PIPELINE_WORKSPACE'] = str(workspace)
        environment['PIPELINE_ROOT'] = str(self.workspace_path)
        
        return environment
    
    async def _execute_stage(self, 
                           stage: PipelineStage,
                           workspace: Path,
                           base_environment: Dict[str, str]) -> Dict[str, Any]:
        """Execute a single pipeline stage"""
        
        stage_result = {
            'name': stage.name,
            'start_time': datetime.now(),
            'success': False,
            'attempts': 0,
            'logs': []
        }
        
        self.logger.info(f"Executing stage: {stage.name}")
        
        # Prepare stage environment
        environment = base_environment.copy()
        environment.update(stage.environment)
        
        # Create stage workspace
        stage_workspace = workspace / stage.name
        stage_workspace.mkdir(exist_ok=True)
        environment['STAGE_WORKSPACE'] = str(stage_workspace)
        
        # Execute with retries
        for attempt in range(stage.retry_count):
            stage_result['attempts'] = attempt + 1
            
            try:
                # Execute stage script
                result = await self._run_script(
                    stage.script,
                    stage_workspace,
                    environment,
                    timeout_minutes=stage.timeout_minutes
                )
                
                stage_result['logs'].append(result)
                
                if result['return_code'] == 0:
                    stage_result['success'] = True
                    break
                else:
                    self.logger.warning(f"Stage {stage.name} attempt {attempt + 1} failed")
            
            except Exception as e:
                stage_result['logs'].append({
                    'error': str(e),
                    'attempt': attempt + 1
                })
                self.logger.error(f"Stage {stage.name} attempt {attempt + 1} error: {e}")
        
        stage_result['end_time'] = datetime.now()
        stage_result['duration'] = (stage_result['end_time'] - stage_result['start_time']).total_seconds()
        
        if stage_result['success']:
            self.logger.info(f"Stage {stage.name} completed successfully")
        else:
            self.logger.error(f"Stage {stage.name} failed after {stage.retry_count} attempts")
        
        return stage_result
    
    async def _run_script(self, 
                        script: str,
                        workspace: Path,
                        environment: Dict[str, str],
                        timeout_minutes: int) -> Dict[str, Any]:
        """Run a script in the given environment"""
        
        # Create script file
        script_file = workspace / "stage_script.sh"
        with open(script_file, 'w') as f:
            f.write("#!/bin/bash\n")
            f.write("set -e\n")  # Exit on error
            f.write(script)
        
        script_file.chmod(0o755)
        
        # Execute script
        process = await asyncio.create_subprocess_exec(
            str(script_file),
            cwd=workspace,
            env=environment,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        try:
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=timeout_minutes * 60
            )
            
            return {
                'return_code': process.returncode,
                'stdout': stdout.decode('utf-8'),
                'stderr': stderr.decode('utf-8'),
                'timeout': False
            }
        
        except asyncio.TimeoutError:
            process.kill()
            await process.wait()
            
            return {
                'return_code': -1,
                'stdout': '',
                'stderr': f'Script timed out after {timeout_minutes} minutes',
                'timeout': True
            }
    
    async def _send_notifications(self, execution: Dict[str, Any]):
        """Send pipeline execution notifications"""
        
        if not self.config.notifications:
            return
        
        notification_config = self.config.notifications
        
        # Email notifications
        if 'email' in notification_config:
            await self._send_email_notification(execution, notification_config['email'])
        
        # Slack notifications
        if 'slack' in notification_config:
            await self._send_slack_notification(execution, notification_config['slack'])
        
        # Webhook notifications
        if 'webhook' in notification_config:
            await self._send_webhook_notification(execution, notification_config['webhook'])
    
    async def _send_email_notification(self, execution: Dict[str, Any], email_config: Dict[str, Any]):
        """Send email notification"""
        
        # This would integrate with actual email service
        self.logger.info(f"Email notification sent for execution {execution['execution_id']}")
    
    async def _send_slack_notification(self, execution: Dict[str, Any], slack_config: Dict[str, Any]):
        """Send Slack notification"""
        
        # This would integrate with Slack API
        self.logger.info(f"Slack notification sent for execution {execution['execution_id']}")
    
    async def _send_webhook_notification(self, execution: Dict[str, Any], webhook_config: Dict[str, Any]):
        """Send webhook notification"""
        
        import aiohttp
        
        webhook_url = webhook_config.get('url')
        if not webhook_url:
            return
        
        payload = {
            'execution_id': execution['execution_id'],
            'pipeline_name': execution['pipeline_name'],
            'status': execution['status'],
            'duration': execution.get('duration'),
            'timestamp': execution['start_time'].isoformat()
        }
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(webhook_url, json=payload) as response:
                    if response.status == 200:
                        self.logger.info(f"Webhook notification sent successfully")
                    else:
                        self.logger.warning(f"Webhook notification failed: {response.status}")
        
        except Exception as e:
            self.logger.error(f"Webhook notification error: {e}")
    
    def get_execution_status(self, execution_id: str) -> Optional[Dict[str, Any]]:
        """Get execution status"""
        
        if self.current_execution and self.current_execution['execution_id'] == execution_id:
            return self.current_execution
        
        for execution in self.execution_history:
            if execution['execution_id'] == execution_id:
                return execution
        
        return None
    
    def list_executions(self, limit: int = 10) -> List[Dict[str, Any]]:
        """List recent pipeline executions"""
        
        executions = []
        
        if self.current_execution:
            executions.append(self.current_execution)
        
        executions.extend(self.execution_history[-limit:])
        
        return executions

# Example pipeline configuration
def create_example_pipeline_config():
    """Create example ML pipeline configuration"""
    
    config = {
        'name': 'ml_model_pipeline',
        'trigger': {
            'branch': 'main',
            'paths': ['models/**', 'data/**', 'requirements.txt']
        },
        'environment_variables': {
            'PYTHONPATH': '/app',
            'MODEL_REGISTRY_URL': 'http://model-registry:8080'
        },
        'stages': [
            {
                'name': 'data_validation',
                'script': '''
                    echo "Validating training data..."
                    python scripts/validate_data.py --data-path data/train.csv
                    echo "Data validation completed"
                ''',
                'environment': {
                    'DATA_VERSION': 'v1.0'
                },
                'timeout_minutes': 10
            },
            {
                'name': 'model_training',
                'script': '''
                    echo "Training model..."
                    python scripts/train_model.py --config config/training.yaml
                    echo "Model training completed"
                ''',
                'dependencies': ['data_validation'],
                'timeout_minutes': 60,
                'retry_count': 2
            },
            {
                'name': 'model_evaluation',
                'script': '''
                    echo "Evaluating model..."
                    python scripts/evaluate_model.py --model-path models/latest
                    echo "Model evaluation completed"
                ''',
                'dependencies': ['model_training'],
                'timeout_minutes': 20
            },
            {
                'name': 'model_registration',
                'script': '''
                    echo "Registering model..."
                    python scripts/register_model.py --model-path models/latest
                    echo "Model registration completed"
                ''',
                'dependencies': ['model_evaluation'],
                'timeout_minutes': 10
            },
            {
                'name': 'deploy_staging',
                'script': '''
                    echo "Deploying to staging..."
                    python scripts/deploy_model.py --environment staging --model-id $MODEL_ID --version $MODEL_VERSION
                    echo "Staging deployment completed"
                ''',
                'dependencies': ['model_registration'],
                'environment': {
                    'DEPLOYMENT_ENV': 'staging'
                },
                'timeout_minutes': 15
            }
        ],
        'notifications': {
            'slack': {
                'webhook_url': 'https://hooks.slack.com/services/...',
                'channel': '#ml-ops'
            },
            'email': {
                'recipients': ['ml-team@company.com'],
                'smtp_server': 'smtp.company.com'
            }
        }
    }
    
    return config

# Example usage
async def run_example_pipeline():
    """Run example ML pipeline"""
    
    # Create pipeline configuration
    config = create_example_pipeline_config()
    
    # Save configuration to file
    config_path = "example_pipeline.yaml"
    with open(config_path, 'w') as f:
        yaml.dump(config, f, default_flow_style=False)
    
    # Create and run pipeline
    pipeline = MLOpsPipeline(config_path)
    
    execution_result = await pipeline.execute_pipeline({
        'commit_id': 'abc123',
        'branch': 'main',
        'author': 'ml_engineer'
    })
    
    print(f"Pipeline execution result:")
    print(json.dumps(execution_result, indent=2, default=str))

if __name__ == "__main__":
    asyncio.run(run_example_pipeline())

Production Best Practices

Model Versioning and Governance

  1. Semantic Versioning: Use semantic versioning for models (major.minor.patch)
  2. Lineage Tracking: Track data and model lineage for reproducibility
  3. Approval Workflows: Implement approval processes for production deployments
  4. Compliance: Ensure regulatory compliance and audit trails

Deployment Strategies

  1. Blue-Green Deployments: Zero-downtime deployments with instant rollback
  2. Canary Releases: Gradual rollout with traffic splitting
  3. A/B Testing: Compare model variants with controlled experiments
  4. Feature Flags: Control model features and experiments

Monitoring and Alerting

  1. Model Performance: Track accuracy, latency, and throughput
  2. Data Drift: Monitor for changes in input data distribution
  3. Concept Drift: Detect changes in model predictions
  4. Resource Usage: Monitor CPU, memory, and GPU utilization

Security and Compliance

  1. Model Security: Protect against adversarial attacks
  2. Data Privacy: Implement privacy-preserving techniques
  3. Access Control: Role-based access to models and data
  4. Audit Trails: Comprehensive logging for compliance

Conclusion

MLOps is essential for scaling AI applications in production environments. The practices, patterns, and implementations covered in this guide provide a comprehensive foundation for building robust, scalable, and maintainable ML systems.

Key success factors:

  1. Automation: Automate as much of the ML lifecycle as possible
  2. Monitoring: Implement comprehensive monitoring at all levels
  3. Governance: Establish clear processes for model lifecycle management
  4. Collaboration: Foster collaboration between ML and operations teams
  5. Continuous Improvement: Iterate and improve based on production experience

As MLOps practices continue to evolve, staying current with new tools, techniques, and best practices will be crucial for maintaining competitive AI systems.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles