Deploy Scalable AI Models with MLOps

David Childs

Master MLOps practices including model versioning, deployment pipelines, monitoring systems, and automated retraining for production machine learning systems.

MLOps transforms machine learning from experimental prototypes to production systems that reliably deliver business value. After building MLOps platforms serving thousands of models, I've learned that successful ML in production requires treating models as products with complete lifecycle management. Here's your comprehensive guide to production MLOps.

MLOps Architecture and Foundation

Complete MLOps Platform

# mlops_platform.py
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import uuid
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
import pandas as pd

class ModelStatus(Enum):
    TRAINING = "training"
    EVALUATING = "evaluating"
    STAGING = "staging"
    PRODUCTION = "production"
    DEPRECATED = "deprecated"
    FAILED = "failed"

class DeploymentStage(Enum):
    DEV = "development"
    STAGING = "staging"
    CANARY = "canary"
    PRODUCTION = "production"

@dataclass
class ModelMetadata:
    model_id: str
    name: str
    version: str
    status: ModelStatus
    created_at: datetime
    updated_at: datetime
    author: str
    description: str
    tags: List[str]
    metrics: Dict[str, float]
    hyperparameters: Dict[str, Any]
    training_data_hash: str
    model_size_mb: float
    framework: str
    dependencies: Dict[str, str]

@dataclass
class DeploymentConfig:
    deployment_id: str
    model_id: str
    model_version: str
    stage: DeploymentStage
    resources: Dict[str, Any]
    scaling_config: Dict[str, Any]
    traffic_allocation: float
    health_check_config: Dict[str, Any]
    rollback_config: Dict[str, Any]

class MLOpsOrchestrator:
    def __init__(self, 
                 model_registry,
                 deployment_manager,
                 monitoring_system,
                 experiment_tracker):
        
        self.model_registry = model_registry
        self.deployment_manager = deployment_manager
        self.monitoring = monitoring_system
        self.experiment_tracker = experiment_tracker
        
        # Pipeline management
        self.training_pipelines = {}
        self.deployment_pipelines = {}
        self.monitoring_pipelines = {}
        
        # State tracking
        self.active_deployments = {}
        self.training_jobs = {}
        
    async def register_model(self, 
                           model_artifact_path: str,
                           metadata: ModelMetadata) -> str:
        """Register new model version"""
        
        # Validate model
        validation_results = await self._validate_model(
            model_artifact_path, 
            metadata
        )
        
        if not validation_results['is_valid']:
            raise ValueError(f"Model validation failed: {validation_results['errors']}")
        
        # Store model artifact
        stored_path = await self.model_registry.store_model(
            model_artifact_path,
            metadata
        )
        
        # Update metadata with storage info
        metadata.model_size_mb = validation_results['size_mb']
        metadata.status = ModelStatus.STAGING
        
        # Register in model registry
        registration_id = await self.model_registry.register_model(
            metadata,
            stored_path
        )
        
        # Log registration
        await self.experiment_tracker.log_model_registration(
            registration_id,
            metadata,
            validation_results
        )
        
        return registration_id
    
    async def deploy_model(self,
                         model_id: str,
                         model_version: str,
                         stage: DeploymentStage,
                         config: DeploymentConfig) -> str:
        """Deploy model to specified stage"""
        
        # Validate deployment readiness
        readiness_check = await self._check_deployment_readiness(
            model_id,
            model_version,
            stage
        )
        
        if not readiness_check['ready']:
            raise ValueError(f"Deployment not ready: {readiness_check['reasons']}")
        
        # Create deployment
        deployment_id = await self.deployment_manager.create_deployment(
            config
        )
        
        # Configure monitoring
        await self.monitoring.setup_model_monitoring(
            deployment_id,
            model_id,
            model_version
        )
        
        # Track deployment
        self.active_deployments[deployment_id] = {
            'model_id': model_id,
            'model_version': model_version,
            'stage': stage,
            'created_at': datetime.now(),
            'config': config
        }
        
        return deployment_id
    
    async def promote_model(self,
                          model_id: str,
                          from_stage: DeploymentStage,
                          to_stage: DeploymentStage,
                          validation_criteria: Dict[str, float]) -> bool:
        """Promote model between stages"""
        
        # Get current deployment
        current_deployment = await self._get_deployment_by_stage(
            model_id,
            from_stage
        )
        
        if not current_deployment:
            raise ValueError(f"No deployment found in {from_stage}")
        
        # Validate promotion criteria
        validation_results = await self._validate_promotion_criteria(
            current_deployment['deployment_id'],
            validation_criteria
        )
        
        if not validation_results['passed']:
            return False
        
        # Create new deployment config for target stage
        promotion_config = await self._create_promotion_config(
            current_deployment,
            to_stage
        )
        
        # Deploy to target stage
        new_deployment_id = await self.deploy_model(
            model_id,
            current_deployment['model_version'],
            to_stage,
            promotion_config
        )
        
        # Update model status in registry
        await self.model_registry.update_model_status(
            model_id,
            current_deployment['model_version'],
            ModelStatus.PRODUCTION if to_stage == DeploymentStage.PRODUCTION else ModelStatus.STAGING
        )
        
        return True
    
    async def rollback_deployment(self,
                                deployment_id: str,
                                rollback_reason: str) -> str:
        """Rollback deployment to previous version"""
        
        deployment = self.active_deployments.get(deployment_id)
        if not deployment:
            raise ValueError(f"Deployment {deployment_id} not found")
        
        # Get previous version
        previous_deployment = await self._get_previous_deployment(
            deployment['model_id'],
            deployment['stage']
        )
        
        if not previous_deployment:
            raise ValueError("No previous deployment found for rollback")
        
        # Create rollback deployment
        rollback_config = previous_deployment['config']
        rollback_config.deployment_id = str(uuid.uuid4())
        
        rollback_deployment_id = await self.deploy_model(
            deployment['model_id'],
            previous_deployment['model_version'],
            deployment['stage'],
            rollback_config
        )
        
        # Stop current deployment
        await self.deployment_manager.stop_deployment(deployment_id)
        
        # Log rollback
        await self.experiment_tracker.log_rollback(
            deployment_id,
            rollback_deployment_id,
            rollback_reason
        )
        
        return rollback_deployment_id

class ModelRegistry:
    def __init__(self, storage_backend, metadata_store):
        self.storage = storage_backend
        self.metadata_store = metadata_store
        
    async def store_model(self, 
                        model_path: str,
                        metadata: ModelMetadata) -> str:
        """Store model artifact"""
        
        # Generate storage path
        storage_path = f"models/{metadata.name}/{metadata.version}/{metadata.model_id}"
        
        # Store model files
        stored_path = await self.storage.upload_directory(
            model_path,
            storage_path
        )
        
        return stored_path
    
    async def register_model(self,
                           metadata: ModelMetadata,
                           storage_path: str) -> str:
        """Register model in metadata store"""
        
        # Add storage information
        registration_data = asdict(metadata)
        registration_data['storage_path'] = storage_path
        registration_data['registered_at'] = datetime.now().isoformat()
        
        # Store metadata
        registration_id = await self.metadata_store.store(
            f"models/{metadata.model_id}",
            registration_data
        )
        
        return registration_id
    
    async def get_model(self, 
                       model_id: str,
                       version: str = None) -> Dict:
        """Retrieve model metadata and artifact"""
        
        # Get metadata
        if version:
            metadata = await self.metadata_store.get(
                f"models/{model_id}/{version}"
            )
        else:
            # Get latest version
            metadata = await self._get_latest_version(model_id)
        
        if not metadata:
            raise ValueError(f"Model {model_id} not found")
        
        return metadata
    
    async def list_models(self,
                        tags: List[str] = None,
                        status: ModelStatus = None) -> List[Dict]:
        """List models with filtering"""
        
        # Get all models
        all_models = await self.metadata_store.list("models/")
        
        # Apply filters
        filtered_models = []
        
        for model in all_models:
            if tags and not any(tag in model.get('tags', []) for tag in tags):
                continue
            
            if status and model.get('status') != status.value:
                continue
            
            filtered_models.append(model)
        
        return filtered_models
    
    async def update_model_status(self,
                                model_id: str,
                                version: str,
                                new_status: ModelStatus):
        """Update model status"""
        
        # Get current metadata
        metadata = await self.get_model(model_id, version)
        
        # Update status
        metadata['status'] = new_status.value
        metadata['updated_at'] = datetime.now().isoformat()
        
        # Store updated metadata
        await self.metadata_store.update(
            f"models/{model_id}/{version}",
            metadata
        )

Training Pipeline Automation

# training_pipeline.py
class AutomatedTrainingPipeline:
    def __init__(self, 
                 data_pipeline,
                 feature_store,
                 model_trainer,
                 model_evaluator,
                 model_registry):
        
        self.data_pipeline = data_pipeline
        self.feature_store = feature_store
        self.model_trainer = model_trainer
        self.model_evaluator = model_evaluator
        self.model_registry = model_registry
        
        # Pipeline configuration
        self.training_config = {}
        self.retraining_triggers = []
        
    async def setup_training_pipeline(self,
                                    pipeline_config: Dict) -> str:
        """Setup automated training pipeline"""
        
        pipeline_id = str(uuid.uuid4())
        
        self.training_config[pipeline_id] = {
            'data_sources': pipeline_config['data_sources'],
            'feature_config': pipeline_config['feature_config'],
            'training_config': pipeline_config['training_config'],
            'evaluation_config': pipeline_config['evaluation_config'],
            'triggers': pipeline_config.get('triggers', {}),
            'schedule': pipeline_config.get('schedule'),
            'created_at': datetime.now()
        }
        
        # Setup triggers
        if 'data_drift' in pipeline_config.get('triggers', {}):
            await self._setup_data_drift_trigger(
                pipeline_id,
                pipeline_config['triggers']['data_drift']
            )
        
        if 'performance_degradation' in pipeline_config.get('triggers', {}):
            await self._setup_performance_trigger(
                pipeline_id,
                pipeline_config['triggers']['performance_degradation']
            )
        
        if 'schedule' in pipeline_config:
            await self._setup_scheduled_trigger(
                pipeline_id,
                pipeline_config['schedule']
            )
        
        return pipeline_id
    
    async def trigger_training(self,
                             pipeline_id: str,
                             trigger_reason: str = "manual") -> str:
        """Trigger training pipeline execution"""
        
        if pipeline_id not in self.training_config:
            raise ValueError(f"Pipeline {pipeline_id} not found")
        
        config = self.training_config[pipeline_id]
        
        # Generate training job ID
        job_id = f"{pipeline_id}_{int(datetime.now().timestamp())}"
        
        # Start training job
        training_task = asyncio.create_task(
            self._execute_training_pipeline(job_id, config, trigger_reason)
        )
        
        return job_id
    
    async def _execute_training_pipeline(self,
                                       job_id: str,
                                       config: Dict,
                                       trigger_reason: str):
        """Execute complete training pipeline"""
        
        try:
            # Step 1: Data preparation
            training_data = await self._prepare_training_data(
                config['data_sources'],
                config['feature_config']
            )
            
            # Step 2: Feature engineering
            features = await self._engineer_features(
                training_data,
                config['feature_config']
            )
            
            # Step 3: Model training
            model_artifact = await self._train_model(
                features,
                config['training_config']
            )
            
            # Step 4: Model evaluation
            evaluation_results = await self._evaluate_model(
                model_artifact,
                features,
                config['evaluation_config']
            )
            
            # Step 5: Model validation
            validation_passed = await self._validate_model_quality(
                evaluation_results,
                config['evaluation_config'].get('thresholds', {})
            )
            
            if validation_passed:
                # Step 6: Model registration
                model_metadata = await self._create_model_metadata(
                    model_artifact,
                    evaluation_results,
                    config,
                    trigger_reason
                )
                
                registration_id = await self.model_registry.register_model(
                    model_artifact,
                    model_metadata
                )
                
                # Log success
                await self._log_training_success(
                    job_id,
                    registration_id,
                    evaluation_results
                )
            
            else:
                # Log validation failure
                await self._log_training_failure(
                    job_id,
                    "Model validation failed",
                    evaluation_results
                )
        
        except Exception as e:
            await self._log_training_failure(
                job_id,
                str(e),
                {}
            )
            raise
    
    async def _prepare_training_data(self,
                                   data_sources: List[Dict],
                                   feature_config: Dict) -> pd.DataFrame:
        """Prepare training data from sources"""
        
        # Load data from sources
        datasets = []
        
        for source in data_sources:
            if source['type'] == 'database':
                data = await self.data_pipeline.load_from_database(
                    source['connection'],
                    source['query']
                )
            elif source['type'] == 'file':
                data = await self.data_pipeline.load_from_file(
                    source['path']
                )
            elif source['type'] == 'api':
                data = await self.data_pipeline.load_from_api(
                    source['endpoint'],
                    source['params']
                )
            else:
                raise ValueError(f"Unsupported data source type: {source['type']}")
            
            datasets.append(data)
        
        # Combine datasets
        if len(datasets) == 1:
            combined_data = datasets[0]
        else:
            combined_data = await self._combine_datasets(
                datasets,
                feature_config.get('join_config', {})
            )
        
        # Data quality validation
        quality_report = await self._validate_data_quality(combined_data)
        
        if not quality_report['passed']:
            raise ValueError(f"Data quality validation failed: {quality_report['issues']}")
        
        return combined_data
    
    async def _engineer_features(self,
                               data: pd.DataFrame,
                               feature_config: Dict) -> pd.DataFrame:
        """Apply feature engineering pipeline"""
        
        # Load existing feature definitions
        feature_definitions = await self.feature_store.get_feature_definitions(
            feature_config.get('feature_set_id')
        )
        
        # Apply transformations
        features = data.copy()
        
        for transformation in feature_definitions.get('transformations', []):
            if transformation['type'] == 'scaling':
                features = await self._apply_scaling(
                    features,
                    transformation['config']
                )
            elif transformation['type'] == 'encoding':
                features = await self._apply_encoding(
                    features,
                    transformation['config']
                )
            elif transformation['type'] == 'feature_selection':
                features = await self._apply_feature_selection(
                    features,
                    transformation['config']
                )
        
        # Store feature statistics
        await self.feature_store.store_feature_statistics(
            feature_config['feature_set_id'],
            self._calculate_feature_stats(features)
        )
        
        return features
    
    async def _train_model(self,
                         features: pd.DataFrame,
                         training_config: Dict) -> str:
        """Train model with given features"""
        
        # Split data
        train_data, val_data, test_data = await self._split_data(
            features,
            training_config.get('split_config', {})
        )
        
        # Initialize model
        model = await self.model_trainer.create_model(
            training_config['model_config']
        )
        
        # Train model
        trained_model = await self.model_trainer.train(
            model,
            train_data,
            val_data,
            training_config['hyperparameters']
        )
        
        # Save model artifact
        artifact_path = await self.model_trainer.save_model(
            trained_model,
            f"/tmp/model_{uuid.uuid4()}"
        )
        
        return artifact_path
    
    async def _evaluate_model(self,
                            model_path: str,
                            test_data: pd.DataFrame,
                            evaluation_config: Dict) -> Dict:
        """Evaluate trained model"""
        
        # Load model
        model = await self.model_evaluator.load_model(model_path)
        
        # Generate predictions
        predictions = await self.model_evaluator.predict(model, test_data)
        
        # Calculate metrics
        metrics = {}
        
        for metric_name in evaluation_config.get('metrics', []):
            if metric_name == 'accuracy':
                metrics['accuracy'] = await self._calculate_accuracy(
                    test_data['target'],
                    predictions
                )
            elif metric_name == 'precision':
                metrics['precision'] = await self._calculate_precision(
                    test_data['target'],
                    predictions
                )
            elif metric_name == 'recall':
                metrics['recall'] = await self._calculate_recall(
                    test_data['target'],
                    predictions
                )
            elif metric_name == 'f1_score':
                metrics['f1_score'] = await self._calculate_f1_score(
                    test_data['target'],
                    predictions
                )
            elif metric_name == 'roc_auc':
                metrics['roc_auc'] = await self._calculate_roc_auc(
                    test_data['target'],
                    predictions
                )
        
        # Generate evaluation report
        evaluation_report = {
            'metrics': metrics,
            'confusion_matrix': await self._generate_confusion_matrix(
                test_data['target'],
                predictions
            ),
            'feature_importance': await self._calculate_feature_importance(model),
            'evaluation_timestamp': datetime.now().isoformat()
        }
        
        return evaluation_report

Model Monitoring and Observability

Comprehensive Model Monitoring

# model_monitoring.py
class ModelMonitoringSystem:
    def __init__(self, metrics_store, alerting_system):
        self.metrics_store = metrics_store
        self.alerting = alerting_system
        
        # Monitoring configuration
        self.monitoring_configs = {}
        self.drift_detectors = {}
        self.performance_trackers = {}
        
    async def setup_model_monitoring(self,
                                   deployment_id: str,
                                   model_id: str,
                                   model_version: str,
                                   monitoring_config: Dict = None):
        """Setup comprehensive model monitoring"""
        
        config = monitoring_config or self._get_default_monitoring_config()
        
        self.monitoring_configs[deployment_id] = {
            'model_id': model_id,
            'model_version': model_version,
            'config': config,
            'started_at': datetime.now()
        }
        
        # Initialize drift detection
        if config.get('drift_detection', {}).get('enabled', True):
            await self._setup_drift_detection(
                deployment_id,
                config['drift_detection']
            )
        
        # Initialize performance monitoring
        if config.get('performance_monitoring', {}).get('enabled', True):
            await self._setup_performance_monitoring(
                deployment_id,
                config['performance_monitoring']
            )
        
        # Initialize data quality monitoring
        if config.get('data_quality', {}).get('enabled', True):
            await self._setup_data_quality_monitoring(
                deployment_id,
                config['data_quality']
            )
    
    async def log_prediction(self,
                           deployment_id: str,
                           input_data: Dict,
                           prediction: Any,
                           ground_truth: Any = None,
                           metadata: Dict = None):
        """Log prediction for monitoring"""
        
        timestamp = datetime.now()
        
        # Store prediction data
        prediction_record = {
            'deployment_id': deployment_id,
            'timestamp': timestamp.isoformat(),
            'input_data': input_data,
            'prediction': prediction,
            'ground_truth': ground_truth,
            'metadata': metadata or {}
        }
        
        await self.metrics_store.store(
            f"predictions/{deployment_id}/{timestamp.strftime('%Y/%m/%d')}",
            prediction_record
        )
        
        # Update monitoring metrics
        await self._update_monitoring_metrics(
            deployment_id,
            prediction_record
        )
        
        # Check for anomalies
        await self._check_anomalies(
            deployment_id,
            prediction_record
        )
    
    async def _setup_drift_detection(self,
                                   deployment_id: str,
                                   drift_config: Dict):
        """Setup data and concept drift detection"""
        
        # Get reference data (training data statistics)
        reference_data = await self._get_reference_statistics(
            self.monitoring_configs[deployment_id]['model_id']
        )
        
        # Initialize drift detector
        detector_config = {
            'method': drift_config.get('method', 'ks_test'),
            'threshold': drift_config.get('threshold', 0.05),
            'window_size': drift_config.get('window_size', 1000),
            'reference_data': reference_data
        }
        
        self.drift_detectors[deployment_id] = DriftDetector(detector_config)
    
    async def _update_monitoring_metrics(self,
                                       deployment_id: str,
                                       prediction_record: Dict):
        """Update real-time monitoring metrics"""
        
        timestamp = datetime.now()
        
        # Update prediction count
        await self.metrics_store.increment(
            f"metrics/{deployment_id}/prediction_count",
            1,
            timestamp
        )
        
        # Update performance metrics (if ground truth available)
        if prediction_record.get('ground_truth') is not None:
            accuracy = self._calculate_prediction_accuracy(
                prediction_record['prediction'],
                prediction_record['ground_truth']
            )
            
            await self.metrics_store.store_metric(
                f"metrics/{deployment_id}/accuracy",
                accuracy,
                timestamp
            )
        
        # Update drift metrics
        if deployment_id in self.drift_detectors:
            drift_score = await self.drift_detectors[deployment_id].detect_drift(
                prediction_record['input_data']
            )
            
            await self.metrics_store.store_metric(
                f"metrics/{deployment_id}/drift_score",
                drift_score,
                timestamp
            )
            
            # Check drift threshold
            drift_threshold = self.monitoring_configs[deployment_id]['config']['drift_detection']['threshold']
            
            if drift_score > drift_threshold:
                await self.alerting.send_alert(
                    f"Data drift detected for deployment {deployment_id}",
                    {
                        'deployment_id': deployment_id,
                        'drift_score': drift_score,
                        'threshold': drift_threshold,
                        'timestamp': timestamp.isoformat()
                    }
                )
    
    async def generate_monitoring_report(self,
                                       deployment_id: str,
                                       time_range: timedelta = timedelta(days=7)) -> Dict:
        """Generate comprehensive monitoring report"""
        
        if deployment_id not in self.monitoring_configs:
            raise ValueError(f"No monitoring setup for deployment {deployment_id}")
        
        end_time = datetime.now()
        start_time = end_time - time_range
        
        # Get metrics data
        metrics_data = await self.metrics_store.get_metrics(
            f"metrics/{deployment_id}",
            start_time,
            end_time
        )
        
        # Get prediction data
        prediction_data = await self.metrics_store.get_predictions(
            f"predictions/{deployment_id}",
            start_time,
            end_time
        )
        
        # Generate report
        report = {
            'deployment_id': deployment_id,
            'model_info': self.monitoring_configs[deployment_id],
            'time_range': {
                'start': start_time.isoformat(),
                'end': end_time.isoformat()
            },
            'summary': {
                'total_predictions': len(prediction_data),
                'avg_predictions_per_day': len(prediction_data) / time_range.days,
                'error_rate': await self._calculate_error_rate(prediction_data),
                'avg_latency_ms': await self._calculate_avg_latency(metrics_data),
            },
            'performance_metrics': await self._analyze_performance_metrics(metrics_data),
            'drift_analysis': await self._analyze_drift_metrics(metrics_data),
            'data_quality_report': await self._analyze_data_quality(prediction_data),
            'recommendations': await self._generate_recommendations(
                deployment_id,
                metrics_data,
                prediction_data
            )
        }
        
        return report

class DriftDetector:
    def __init__(self, config: Dict):
        self.config = config
        self.method = config['method']
        self.threshold = config['threshold']
        self.window_size = config['window_size']
        self.reference_data = config['reference_data']
        
        # Current window of data
        self.current_window = []
        
    async def detect_drift(self, input_data: Dict) -> float:
        """Detect drift in input data"""
        
        # Add to current window
        self.current_window.append(input_data)
        
        # Maintain window size
        if len(self.current_window) > self.window_size:
            self.current_window.pop(0)
        
        # Need minimum samples for drift detection
        if len(self.current_window) < 100:
            return 0.0
        
        # Calculate drift score based on method
        if self.method == 'ks_test':
            return await self._ks_test_drift(input_data)
        elif self.method == 'psi':
            return await self._psi_drift(input_data)
        elif self.method == 'js_distance':
            return await self._js_distance_drift(input_data)
        else:
            raise ValueError(f"Unknown drift detection method: {self.method}")
    
    async def _ks_test_drift(self, input_data: Dict) -> float:
        """Kolmogorov-Smirnov test for drift detection"""
        
        from scipy import stats
        
        drift_scores = []
        
        # Check each feature
        for feature_name, current_values in self._extract_feature_values(input_data).items():
            if feature_name in self.reference_data:
                reference_values = self.reference_data[feature_name]
                
                # Perform KS test
                ks_statistic, p_value = stats.ks_2samp(
                    reference_values,
                    current_values
                )
                
                # Convert p-value to drift score (lower p-value = higher drift)
                drift_score = 1.0 - p_value
                drift_scores.append(drift_score)
        
        # Return maximum drift score across features
        return max(drift_scores) if drift_scores else 0.0
    
    async def _psi_drift(self, input_data: Dict) -> float:
        """Population Stability Index for drift detection"""
        
        drift_scores = []
        
        for feature_name, current_values in self._extract_feature_values(input_data).items():
            if feature_name in self.reference_data:
                reference_values = self.reference_data[feature_name]
                
                # Calculate PSI
                psi = self._calculate_psi(reference_values, current_values)
                drift_scores.append(psi)
        
        return max(drift_scores) if drift_scores else 0.0
    
    def _calculate_psi(self, reference: List, current: List) -> float:
        """Calculate Population Stability Index"""
        
        import numpy as np
        
        # Create bins
        all_values = reference + current
        bins = np.histogram_bin_edges(all_values, bins=10)
        
        # Calculate distributions
        ref_dist, _ = np.histogram(reference, bins=bins)
        cur_dist, _ = np.histogram(current, bins=bins)
        
        # Normalize
        ref_dist = ref_dist / len(reference)
        cur_dist = cur_dist / len(current)
        
        # Add small epsilon to avoid division by zero
        epsilon = 1e-10
        ref_dist = ref_dist + epsilon
        cur_dist = cur_dist + epsilon
        
        # Calculate PSI
        psi = np.sum((cur_dist - ref_dist) * np.log(cur_dist / ref_dist))
        
        return psi

Automated Model Lifecycle

Lifecycle Automation

# lifecycle_automation.py
class ModelLifecycleAutomation:
    def __init__(self, 
                 orchestrator: MLOpsOrchestrator,
                 monitoring_system: ModelMonitoringSystem,
                 training_pipeline: AutomatedTrainingPipeline):
        
        self.orchestrator = orchestrator
        self.monitoring = monitoring_system
        self.training_pipeline = training_pipeline
        
        # Automation rules
        self.automation_rules = {}
        self.lifecycle_policies = {}
        
    async def setup_automation_rules(self, rules_config: Dict):
        """Setup automated lifecycle rules"""
        
        for rule_name, rule_config in rules_config.items():
            self.automation_rules[rule_name] = {
                'triggers': rule_config['triggers'],
                'conditions': rule_config['conditions'],
                'actions': rule_config['actions'],
                'enabled': rule_config.get('enabled', True)
            }
        
        # Start monitoring for triggers
        await self._start_automation_monitoring()
    
    async def _start_automation_monitoring(self):
        """Start monitoring for automation triggers"""
        
        async def monitor_loop():
            while True:
                try:
                    # Check each rule
                    for rule_name, rule in self.automation_rules.items():
                        if not rule['enabled']:
                            continue
                        
                        # Check triggers
                        triggered = await self._check_rule_triggers(
                            rule_name, 
                            rule['triggers']
                        )
                        
                        if triggered:
                            # Validate conditions
                            conditions_met = await self._validate_conditions(
                                rule_name,
                                rule['conditions']
                            )
                            
                            if conditions_met:
                                # Execute actions
                                await self._execute_actions(
                                    rule_name,
                                    rule['actions']
                                )
                    
                    # Sleep before next check
                    await asyncio.sleep(300)  # Check every 5 minutes
                    
                except Exception as e:
                    logging.error(f"Automation monitoring error: {e}")
                    await asyncio.sleep(60)  # Wait before retry
        
        # Start monitoring task
        asyncio.create_task(monitor_loop())
    
    async def _check_rule_triggers(self, rule_name: str, triggers: Dict) -> bool:
        """Check if rule triggers are met"""
        
        for trigger_type, trigger_config in triggers.items():
            if trigger_type == 'performance_degradation':
                if await self._check_performance_degradation(trigger_config):
                    return True
            
            elif trigger_type == 'data_drift':
                if await self._check_data_drift(trigger_config):
                    return True
            
            elif trigger_type == 'error_rate_increase':
                if await self._check_error_rate_increase(trigger_config):
                    return True
            
            elif trigger_type == 'schedule':
                if await self._check_schedule_trigger(trigger_config):
                    return True
        
        return False
    
    async def _execute_actions(self, rule_name: str, actions: List[Dict]):
        """Execute automation actions"""
        
        for action in actions:
            action_type = action['type']
            
            try:
                if action_type == 'retrain_model':
                    await self._action_retrain_model(action['config'])
                
                elif action_type == 'rollback_deployment':
                    await self._action_rollback_deployment(action['config'])
                
                elif action_type == 'scale_deployment':
                    await self._action_scale_deployment(action['config'])
                
                elif action_type == 'send_alert':
                    await self._action_send_alert(action['config'])
                
                elif action_type == 'update_traffic_allocation':
                    await self._action_update_traffic(action['config'])
                
                # Log action execution
                await self._log_action_execution(rule_name, action)
                
            except Exception as e:
                logging.error(f"Action execution failed: {e}")
                await self._log_action_failure(rule_name, action, str(e))
    
    async def _action_retrain_model(self, config: Dict):
        """Action: Trigger model retraining"""
        
        pipeline_id = config['pipeline_id']
        trigger_reason = config.get('reason', 'automated_retraining')
        
        # Trigger training pipeline
        job_id = await self.training_pipeline.trigger_training(
            pipeline_id,
            trigger_reason
        )
        
        logging.info(f"Automated retraining triggered: job_id={job_id}")
    
    async def _action_rollback_deployment(self, config: Dict):
        """Action: Rollback deployment"""
        
        deployment_id = config['deployment_id']
        reason = config.get('reason', 'automated_rollback')
        
        # Execute rollback
        rollback_id = await self.orchestrator.rollback_deployment(
            deployment_id,
            reason
        )
        
        logging.info(f"Automated rollback executed: {rollback_id}")
    
    async def _action_scale_deployment(self, config: Dict):
        """Action: Scale deployment resources"""
        
        deployment_id = config['deployment_id']
        scale_config = config['scaling']
        
        # Update deployment scaling
        await self.orchestrator.deployment_manager.scale_deployment(
            deployment_id,
            scale_config
        )
        
        logging.info(f"Deployment scaled: {deployment_id}")
    
    async def create_model_retirement_plan(self,
                                         model_id: str,
                                         retirement_date: datetime,
                                         migration_plan: Dict) -> str:
        """Create automated model retirement plan"""
        
        retirement_plan_id = str(uuid.uuid4())
        
        plan = {
            'plan_id': retirement_plan_id,
            'model_id': model_id,
            'retirement_date': retirement_date,
            'migration_plan': migration_plan,
            'phases': [
                {
                    'phase': 'deprecation_warning',
                    'start_date': retirement_date - timedelta(days=90),
                    'actions': ['send_deprecation_notices', 'update_documentation']
                },
                {
                    'phase': 'traffic_reduction',
                    'start_date': retirement_date - timedelta(days=30),
                    'actions': ['reduce_traffic_allocation', 'monitor_performance']
                },
                {
                    'phase': 'final_migration',
                    'start_date': retirement_date - timedelta(days=7),
                    'actions': ['complete_traffic_migration', 'verify_new_model']
                },
                {
                    'phase': 'retirement',
                    'start_date': retirement_date,
                    'actions': ['stop_deployment', 'archive_model', 'cleanup_resources']
                }
            ],
            'created_at': datetime.now()
        }
        
        # Store retirement plan
        self.lifecycle_policies[retirement_plan_id] = plan
        
        # Schedule retirement phases
        await self._schedule_retirement_phases(plan)
        
        return retirement_plan_id

Best Practices Checklist

  • Implement comprehensive model versioning
  • Set up automated training pipelines
  • Monitor for data and concept drift
  • Implement gradual deployment strategies
  • Track model performance metrics continuously
  • Set up automated rollback mechanisms
  • Document model lineage and dependencies
  • Implement model governance policies
  • Set up A/B testing for model comparisons
  • Monitor resource utilization and costs
  • Implement security scanning for models
  • Set up automated model retirement workflows
  • Create disaster recovery procedures
  • Implement compliance and audit logging
  • Regular model performance reviews

Conclusion

MLOps transforms machine learning from experimental code to reliable production systems. By implementing comprehensive lifecycle management, automated training pipelines, continuous monitoring, and intelligent automation, you can build ML systems that deliver consistent business value. Remember that MLOps is not just about tools—it's about creating processes that enable reliable, scalable, and maintainable machine learning in production. Start with the basics, measure everything, and continuously evolve your MLOps practices based on real-world operational experience.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles