Master MLOps practices including model versioning, deployment pipelines, monitoring systems, and automated retraining for production machine learning systems.
MLOps transforms machine learning from experimental prototypes to production systems that reliably deliver business value. After building MLOps platforms serving thousands of models, I've learned that successful ML in production requires treating models as products with complete lifecycle management. Here's your comprehensive guide to production MLOps.
MLOps Architecture and Foundation
Complete MLOps Platform
# mlops_platform.py
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import uuid
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
import pandas as pd
class ModelStatus(Enum):
TRAINING = "training"
EVALUATING = "evaluating"
STAGING = "staging"
PRODUCTION = "production"
DEPRECATED = "deprecated"
FAILED = "failed"
class DeploymentStage(Enum):
DEV = "development"
STAGING = "staging"
CANARY = "canary"
PRODUCTION = "production"
@dataclass
class ModelMetadata:
model_id: str
name: str
version: str
status: ModelStatus
created_at: datetime
updated_at: datetime
author: str
description: str
tags: List[str]
metrics: Dict[str, float]
hyperparameters: Dict[str, Any]
training_data_hash: str
model_size_mb: float
framework: str
dependencies: Dict[str, str]
@dataclass
class DeploymentConfig:
deployment_id: str
model_id: str
model_version: str
stage: DeploymentStage
resources: Dict[str, Any]
scaling_config: Dict[str, Any]
traffic_allocation: float
health_check_config: Dict[str, Any]
rollback_config: Dict[str, Any]
class MLOpsOrchestrator:
def __init__(self,
model_registry,
deployment_manager,
monitoring_system,
experiment_tracker):
self.model_registry = model_registry
self.deployment_manager = deployment_manager
self.monitoring = monitoring_system
self.experiment_tracker = experiment_tracker
# Pipeline management
self.training_pipelines = {}
self.deployment_pipelines = {}
self.monitoring_pipelines = {}
# State tracking
self.active_deployments = {}
self.training_jobs = {}
async def register_model(self,
model_artifact_path: str,
metadata: ModelMetadata) -> str:
"""Register new model version"""
# Validate model
validation_results = await self._validate_model(
model_artifact_path,
metadata
)
if not validation_results['is_valid']:
raise ValueError(f"Model validation failed: {validation_results['errors']}")
# Store model artifact
stored_path = await self.model_registry.store_model(
model_artifact_path,
metadata
)
# Update metadata with storage info
metadata.model_size_mb = validation_results['size_mb']
metadata.status = ModelStatus.STAGING
# Register in model registry
registration_id = await self.model_registry.register_model(
metadata,
stored_path
)
# Log registration
await self.experiment_tracker.log_model_registration(
registration_id,
metadata,
validation_results
)
return registration_id
async def deploy_model(self,
model_id: str,
model_version: str,
stage: DeploymentStage,
config: DeploymentConfig) -> str:
"""Deploy model to specified stage"""
# Validate deployment readiness
readiness_check = await self._check_deployment_readiness(
model_id,
model_version,
stage
)
if not readiness_check['ready']:
raise ValueError(f"Deployment not ready: {readiness_check['reasons']}")
# Create deployment
deployment_id = await self.deployment_manager.create_deployment(
config
)
# Configure monitoring
await self.monitoring.setup_model_monitoring(
deployment_id,
model_id,
model_version
)
# Track deployment
self.active_deployments[deployment_id] = {
'model_id': model_id,
'model_version': model_version,
'stage': stage,
'created_at': datetime.now(),
'config': config
}
return deployment_id
async def promote_model(self,
model_id: str,
from_stage: DeploymentStage,
to_stage: DeploymentStage,
validation_criteria: Dict[str, float]) -> bool:
"""Promote model between stages"""
# Get current deployment
current_deployment = await self._get_deployment_by_stage(
model_id,
from_stage
)
if not current_deployment:
raise ValueError(f"No deployment found in {from_stage}")
# Validate promotion criteria
validation_results = await self._validate_promotion_criteria(
current_deployment['deployment_id'],
validation_criteria
)
if not validation_results['passed']:
return False
# Create new deployment config for target stage
promotion_config = await self._create_promotion_config(
current_deployment,
to_stage
)
# Deploy to target stage
new_deployment_id = await self.deploy_model(
model_id,
current_deployment['model_version'],
to_stage,
promotion_config
)
# Update model status in registry
await self.model_registry.update_model_status(
model_id,
current_deployment['model_version'],
ModelStatus.PRODUCTION if to_stage == DeploymentStage.PRODUCTION else ModelStatus.STAGING
)
return True
async def rollback_deployment(self,
deployment_id: str,
rollback_reason: str) -> str:
"""Rollback deployment to previous version"""
deployment = self.active_deployments.get(deployment_id)
if not deployment:
raise ValueError(f"Deployment {deployment_id} not found")
# Get previous version
previous_deployment = await self._get_previous_deployment(
deployment['model_id'],
deployment['stage']
)
if not previous_deployment:
raise ValueError("No previous deployment found for rollback")
# Create rollback deployment
rollback_config = previous_deployment['config']
rollback_config.deployment_id = str(uuid.uuid4())
rollback_deployment_id = await self.deploy_model(
deployment['model_id'],
previous_deployment['model_version'],
deployment['stage'],
rollback_config
)
# Stop current deployment
await self.deployment_manager.stop_deployment(deployment_id)
# Log rollback
await self.experiment_tracker.log_rollback(
deployment_id,
rollback_deployment_id,
rollback_reason
)
return rollback_deployment_id
class ModelRegistry:
def __init__(self, storage_backend, metadata_store):
self.storage = storage_backend
self.metadata_store = metadata_store
async def store_model(self,
model_path: str,
metadata: ModelMetadata) -> str:
"""Store model artifact"""
# Generate storage path
storage_path = f"models/{metadata.name}/{metadata.version}/{metadata.model_id}"
# Store model files
stored_path = await self.storage.upload_directory(
model_path,
storage_path
)
return stored_path
async def register_model(self,
metadata: ModelMetadata,
storage_path: str) -> str:
"""Register model in metadata store"""
# Add storage information
registration_data = asdict(metadata)
registration_data['storage_path'] = storage_path
registration_data['registered_at'] = datetime.now().isoformat()
# Store metadata
registration_id = await self.metadata_store.store(
f"models/{metadata.model_id}",
registration_data
)
return registration_id
async def get_model(self,
model_id: str,
version: str = None) -> Dict:
"""Retrieve model metadata and artifact"""
# Get metadata
if version:
metadata = await self.metadata_store.get(
f"models/{model_id}/{version}"
)
else:
# Get latest version
metadata = await self._get_latest_version(model_id)
if not metadata:
raise ValueError(f"Model {model_id} not found")
return metadata
async def list_models(self,
tags: List[str] = None,
status: ModelStatus = None) -> List[Dict]:
"""List models with filtering"""
# Get all models
all_models = await self.metadata_store.list("models/")
# Apply filters
filtered_models = []
for model in all_models:
if tags and not any(tag in model.get('tags', []) for tag in tags):
continue
if status and model.get('status') != status.value:
continue
filtered_models.append(model)
return filtered_models
async def update_model_status(self,
model_id: str,
version: str,
new_status: ModelStatus):
"""Update model status"""
# Get current metadata
metadata = await self.get_model(model_id, version)
# Update status
metadata['status'] = new_status.value
metadata['updated_at'] = datetime.now().isoformat()
# Store updated metadata
await self.metadata_store.update(
f"models/{model_id}/{version}",
metadata
)
Training Pipeline Automation
# training_pipeline.py
class AutomatedTrainingPipeline:
def __init__(self,
data_pipeline,
feature_store,
model_trainer,
model_evaluator,
model_registry):
self.data_pipeline = data_pipeline
self.feature_store = feature_store
self.model_trainer = model_trainer
self.model_evaluator = model_evaluator
self.model_registry = model_registry
# Pipeline configuration
self.training_config = {}
self.retraining_triggers = []
async def setup_training_pipeline(self,
pipeline_config: Dict) -> str:
"""Setup automated training pipeline"""
pipeline_id = str(uuid.uuid4())
self.training_config[pipeline_id] = {
'data_sources': pipeline_config['data_sources'],
'feature_config': pipeline_config['feature_config'],
'training_config': pipeline_config['training_config'],
'evaluation_config': pipeline_config['evaluation_config'],
'triggers': pipeline_config.get('triggers', {}),
'schedule': pipeline_config.get('schedule'),
'created_at': datetime.now()
}
# Setup triggers
if 'data_drift' in pipeline_config.get('triggers', {}):
await self._setup_data_drift_trigger(
pipeline_id,
pipeline_config['triggers']['data_drift']
)
if 'performance_degradation' in pipeline_config.get('triggers', {}):
await self._setup_performance_trigger(
pipeline_id,
pipeline_config['triggers']['performance_degradation']
)
if 'schedule' in pipeline_config:
await self._setup_scheduled_trigger(
pipeline_id,
pipeline_config['schedule']
)
return pipeline_id
async def trigger_training(self,
pipeline_id: str,
trigger_reason: str = "manual") -> str:
"""Trigger training pipeline execution"""
if pipeline_id not in self.training_config:
raise ValueError(f"Pipeline {pipeline_id} not found")
config = self.training_config[pipeline_id]
# Generate training job ID
job_id = f"{pipeline_id}_{int(datetime.now().timestamp())}"
# Start training job
training_task = asyncio.create_task(
self._execute_training_pipeline(job_id, config, trigger_reason)
)
return job_id
async def _execute_training_pipeline(self,
job_id: str,
config: Dict,
trigger_reason: str):
"""Execute complete training pipeline"""
try:
# Step 1: Data preparation
training_data = await self._prepare_training_data(
config['data_sources'],
config['feature_config']
)
# Step 2: Feature engineering
features = await self._engineer_features(
training_data,
config['feature_config']
)
# Step 3: Model training
model_artifact = await self._train_model(
features,
config['training_config']
)
# Step 4: Model evaluation
evaluation_results = await self._evaluate_model(
model_artifact,
features,
config['evaluation_config']
)
# Step 5: Model validation
validation_passed = await self._validate_model_quality(
evaluation_results,
config['evaluation_config'].get('thresholds', {})
)
if validation_passed:
# Step 6: Model registration
model_metadata = await self._create_model_metadata(
model_artifact,
evaluation_results,
config,
trigger_reason
)
registration_id = await self.model_registry.register_model(
model_artifact,
model_metadata
)
# Log success
await self._log_training_success(
job_id,
registration_id,
evaluation_results
)
else:
# Log validation failure
await self._log_training_failure(
job_id,
"Model validation failed",
evaluation_results
)
except Exception as e:
await self._log_training_failure(
job_id,
str(e),
{}
)
raise
async def _prepare_training_data(self,
data_sources: List[Dict],
feature_config: Dict) -> pd.DataFrame:
"""Prepare training data from sources"""
# Load data from sources
datasets = []
for source in data_sources:
if source['type'] == 'database':
data = await self.data_pipeline.load_from_database(
source['connection'],
source['query']
)
elif source['type'] == 'file':
data = await self.data_pipeline.load_from_file(
source['path']
)
elif source['type'] == 'api':
data = await self.data_pipeline.load_from_api(
source['endpoint'],
source['params']
)
else:
raise ValueError(f"Unsupported data source type: {source['type']}")
datasets.append(data)
# Combine datasets
if len(datasets) == 1:
combined_data = datasets[0]
else:
combined_data = await self._combine_datasets(
datasets,
feature_config.get('join_config', {})
)
# Data quality validation
quality_report = await self._validate_data_quality(combined_data)
if not quality_report['passed']:
raise ValueError(f"Data quality validation failed: {quality_report['issues']}")
return combined_data
async def _engineer_features(self,
data: pd.DataFrame,
feature_config: Dict) -> pd.DataFrame:
"""Apply feature engineering pipeline"""
# Load existing feature definitions
feature_definitions = await self.feature_store.get_feature_definitions(
feature_config.get('feature_set_id')
)
# Apply transformations
features = data.copy()
for transformation in feature_definitions.get('transformations', []):
if transformation['type'] == 'scaling':
features = await self._apply_scaling(
features,
transformation['config']
)
elif transformation['type'] == 'encoding':
features = await self._apply_encoding(
features,
transformation['config']
)
elif transformation['type'] == 'feature_selection':
features = await self._apply_feature_selection(
features,
transformation['config']
)
# Store feature statistics
await self.feature_store.store_feature_statistics(
feature_config['feature_set_id'],
self._calculate_feature_stats(features)
)
return features
async def _train_model(self,
features: pd.DataFrame,
training_config: Dict) -> str:
"""Train model with given features"""
# Split data
train_data, val_data, test_data = await self._split_data(
features,
training_config.get('split_config', {})
)
# Initialize model
model = await self.model_trainer.create_model(
training_config['model_config']
)
# Train model
trained_model = await self.model_trainer.train(
model,
train_data,
val_data,
training_config['hyperparameters']
)
# Save model artifact
artifact_path = await self.model_trainer.save_model(
trained_model,
f"/tmp/model_{uuid.uuid4()}"
)
return artifact_path
async def _evaluate_model(self,
model_path: str,
test_data: pd.DataFrame,
evaluation_config: Dict) -> Dict:
"""Evaluate trained model"""
# Load model
model = await self.model_evaluator.load_model(model_path)
# Generate predictions
predictions = await self.model_evaluator.predict(model, test_data)
# Calculate metrics
metrics = {}
for metric_name in evaluation_config.get('metrics', []):
if metric_name == 'accuracy':
metrics['accuracy'] = await self._calculate_accuracy(
test_data['target'],
predictions
)
elif metric_name == 'precision':
metrics['precision'] = await self._calculate_precision(
test_data['target'],
predictions
)
elif metric_name == 'recall':
metrics['recall'] = await self._calculate_recall(
test_data['target'],
predictions
)
elif metric_name == 'f1_score':
metrics['f1_score'] = await self._calculate_f1_score(
test_data['target'],
predictions
)
elif metric_name == 'roc_auc':
metrics['roc_auc'] = await self._calculate_roc_auc(
test_data['target'],
predictions
)
# Generate evaluation report
evaluation_report = {
'metrics': metrics,
'confusion_matrix': await self._generate_confusion_matrix(
test_data['target'],
predictions
),
'feature_importance': await self._calculate_feature_importance(model),
'evaluation_timestamp': datetime.now().isoformat()
}
return evaluation_report
Model Monitoring and Observability
Comprehensive Model Monitoring
# model_monitoring.py
class ModelMonitoringSystem:
def __init__(self, metrics_store, alerting_system):
self.metrics_store = metrics_store
self.alerting = alerting_system
# Monitoring configuration
self.monitoring_configs = {}
self.drift_detectors = {}
self.performance_trackers = {}
async def setup_model_monitoring(self,
deployment_id: str,
model_id: str,
model_version: str,
monitoring_config: Dict = None):
"""Setup comprehensive model monitoring"""
config = monitoring_config or self._get_default_monitoring_config()
self.monitoring_configs[deployment_id] = {
'model_id': model_id,
'model_version': model_version,
'config': config,
'started_at': datetime.now()
}
# Initialize drift detection
if config.get('drift_detection', {}).get('enabled', True):
await self._setup_drift_detection(
deployment_id,
config['drift_detection']
)
# Initialize performance monitoring
if config.get('performance_monitoring', {}).get('enabled', True):
await self._setup_performance_monitoring(
deployment_id,
config['performance_monitoring']
)
# Initialize data quality monitoring
if config.get('data_quality', {}).get('enabled', True):
await self._setup_data_quality_monitoring(
deployment_id,
config['data_quality']
)
async def log_prediction(self,
deployment_id: str,
input_data: Dict,
prediction: Any,
ground_truth: Any = None,
metadata: Dict = None):
"""Log prediction for monitoring"""
timestamp = datetime.now()
# Store prediction data
prediction_record = {
'deployment_id': deployment_id,
'timestamp': timestamp.isoformat(),
'input_data': input_data,
'prediction': prediction,
'ground_truth': ground_truth,
'metadata': metadata or {}
}
await self.metrics_store.store(
f"predictions/{deployment_id}/{timestamp.strftime('%Y/%m/%d')}",
prediction_record
)
# Update monitoring metrics
await self._update_monitoring_metrics(
deployment_id,
prediction_record
)
# Check for anomalies
await self._check_anomalies(
deployment_id,
prediction_record
)
async def _setup_drift_detection(self,
deployment_id: str,
drift_config: Dict):
"""Setup data and concept drift detection"""
# Get reference data (training data statistics)
reference_data = await self._get_reference_statistics(
self.monitoring_configs[deployment_id]['model_id']
)
# Initialize drift detector
detector_config = {
'method': drift_config.get('method', 'ks_test'),
'threshold': drift_config.get('threshold', 0.05),
'window_size': drift_config.get('window_size', 1000),
'reference_data': reference_data
}
self.drift_detectors[deployment_id] = DriftDetector(detector_config)
async def _update_monitoring_metrics(self,
deployment_id: str,
prediction_record: Dict):
"""Update real-time monitoring metrics"""
timestamp = datetime.now()
# Update prediction count
await self.metrics_store.increment(
f"metrics/{deployment_id}/prediction_count",
1,
timestamp
)
# Update performance metrics (if ground truth available)
if prediction_record.get('ground_truth') is not None:
accuracy = self._calculate_prediction_accuracy(
prediction_record['prediction'],
prediction_record['ground_truth']
)
await self.metrics_store.store_metric(
f"metrics/{deployment_id}/accuracy",
accuracy,
timestamp
)
# Update drift metrics
if deployment_id in self.drift_detectors:
drift_score = await self.drift_detectors[deployment_id].detect_drift(
prediction_record['input_data']
)
await self.metrics_store.store_metric(
f"metrics/{deployment_id}/drift_score",
drift_score,
timestamp
)
# Check drift threshold
drift_threshold = self.monitoring_configs[deployment_id]['config']['drift_detection']['threshold']
if drift_score > drift_threshold:
await self.alerting.send_alert(
f"Data drift detected for deployment {deployment_id}",
{
'deployment_id': deployment_id,
'drift_score': drift_score,
'threshold': drift_threshold,
'timestamp': timestamp.isoformat()
}
)
async def generate_monitoring_report(self,
deployment_id: str,
time_range: timedelta = timedelta(days=7)) -> Dict:
"""Generate comprehensive monitoring report"""
if deployment_id not in self.monitoring_configs:
raise ValueError(f"No monitoring setup for deployment {deployment_id}")
end_time = datetime.now()
start_time = end_time - time_range
# Get metrics data
metrics_data = await self.metrics_store.get_metrics(
f"metrics/{deployment_id}",
start_time,
end_time
)
# Get prediction data
prediction_data = await self.metrics_store.get_predictions(
f"predictions/{deployment_id}",
start_time,
end_time
)
# Generate report
report = {
'deployment_id': deployment_id,
'model_info': self.monitoring_configs[deployment_id],
'time_range': {
'start': start_time.isoformat(),
'end': end_time.isoformat()
},
'summary': {
'total_predictions': len(prediction_data),
'avg_predictions_per_day': len(prediction_data) / time_range.days,
'error_rate': await self._calculate_error_rate(prediction_data),
'avg_latency_ms': await self._calculate_avg_latency(metrics_data),
},
'performance_metrics': await self._analyze_performance_metrics(metrics_data),
'drift_analysis': await self._analyze_drift_metrics(metrics_data),
'data_quality_report': await self._analyze_data_quality(prediction_data),
'recommendations': await self._generate_recommendations(
deployment_id,
metrics_data,
prediction_data
)
}
return report
class DriftDetector:
def __init__(self, config: Dict):
self.config = config
self.method = config['method']
self.threshold = config['threshold']
self.window_size = config['window_size']
self.reference_data = config['reference_data']
# Current window of data
self.current_window = []
async def detect_drift(self, input_data: Dict) -> float:
"""Detect drift in input data"""
# Add to current window
self.current_window.append(input_data)
# Maintain window size
if len(self.current_window) > self.window_size:
self.current_window.pop(0)
# Need minimum samples for drift detection
if len(self.current_window) < 100:
return 0.0
# Calculate drift score based on method
if self.method == 'ks_test':
return await self._ks_test_drift(input_data)
elif self.method == 'psi':
return await self._psi_drift(input_data)
elif self.method == 'js_distance':
return await self._js_distance_drift(input_data)
else:
raise ValueError(f"Unknown drift detection method: {self.method}")
async def _ks_test_drift(self, input_data: Dict) -> float:
"""Kolmogorov-Smirnov test for drift detection"""
from scipy import stats
drift_scores = []
# Check each feature
for feature_name, current_values in self._extract_feature_values(input_data).items():
if feature_name in self.reference_data:
reference_values = self.reference_data[feature_name]
# Perform KS test
ks_statistic, p_value = stats.ks_2samp(
reference_values,
current_values
)
# Convert p-value to drift score (lower p-value = higher drift)
drift_score = 1.0 - p_value
drift_scores.append(drift_score)
# Return maximum drift score across features
return max(drift_scores) if drift_scores else 0.0
async def _psi_drift(self, input_data: Dict) -> float:
"""Population Stability Index for drift detection"""
drift_scores = []
for feature_name, current_values in self._extract_feature_values(input_data).items():
if feature_name in self.reference_data:
reference_values = self.reference_data[feature_name]
# Calculate PSI
psi = self._calculate_psi(reference_values, current_values)
drift_scores.append(psi)
return max(drift_scores) if drift_scores else 0.0
def _calculate_psi(self, reference: List, current: List) -> float:
"""Calculate Population Stability Index"""
import numpy as np
# Create bins
all_values = reference + current
bins = np.histogram_bin_edges(all_values, bins=10)
# Calculate distributions
ref_dist, _ = np.histogram(reference, bins=bins)
cur_dist, _ = np.histogram(current, bins=bins)
# Normalize
ref_dist = ref_dist / len(reference)
cur_dist = cur_dist / len(current)
# Add small epsilon to avoid division by zero
epsilon = 1e-10
ref_dist = ref_dist + epsilon
cur_dist = cur_dist + epsilon
# Calculate PSI
psi = np.sum((cur_dist - ref_dist) * np.log(cur_dist / ref_dist))
return psi
Automated Model Lifecycle
Lifecycle Automation
# lifecycle_automation.py
class ModelLifecycleAutomation:
def __init__(self,
orchestrator: MLOpsOrchestrator,
monitoring_system: ModelMonitoringSystem,
training_pipeline: AutomatedTrainingPipeline):
self.orchestrator = orchestrator
self.monitoring = monitoring_system
self.training_pipeline = training_pipeline
# Automation rules
self.automation_rules = {}
self.lifecycle_policies = {}
async def setup_automation_rules(self, rules_config: Dict):
"""Setup automated lifecycle rules"""
for rule_name, rule_config in rules_config.items():
self.automation_rules[rule_name] = {
'triggers': rule_config['triggers'],
'conditions': rule_config['conditions'],
'actions': rule_config['actions'],
'enabled': rule_config.get('enabled', True)
}
# Start monitoring for triggers
await self._start_automation_monitoring()
async def _start_automation_monitoring(self):
"""Start monitoring for automation triggers"""
async def monitor_loop():
while True:
try:
# Check each rule
for rule_name, rule in self.automation_rules.items():
if not rule['enabled']:
continue
# Check triggers
triggered = await self._check_rule_triggers(
rule_name,
rule['triggers']
)
if triggered:
# Validate conditions
conditions_met = await self._validate_conditions(
rule_name,
rule['conditions']
)
if conditions_met:
# Execute actions
await self._execute_actions(
rule_name,
rule['actions']
)
# Sleep before next check
await asyncio.sleep(300) # Check every 5 minutes
except Exception as e:
logging.error(f"Automation monitoring error: {e}")
await asyncio.sleep(60) # Wait before retry
# Start monitoring task
asyncio.create_task(monitor_loop())
async def _check_rule_triggers(self, rule_name: str, triggers: Dict) -> bool:
"""Check if rule triggers are met"""
for trigger_type, trigger_config in triggers.items():
if trigger_type == 'performance_degradation':
if await self._check_performance_degradation(trigger_config):
return True
elif trigger_type == 'data_drift':
if await self._check_data_drift(trigger_config):
return True
elif trigger_type == 'error_rate_increase':
if await self._check_error_rate_increase(trigger_config):
return True
elif trigger_type == 'schedule':
if await self._check_schedule_trigger(trigger_config):
return True
return False
async def _execute_actions(self, rule_name: str, actions: List[Dict]):
"""Execute automation actions"""
for action in actions:
action_type = action['type']
try:
if action_type == 'retrain_model':
await self._action_retrain_model(action['config'])
elif action_type == 'rollback_deployment':
await self._action_rollback_deployment(action['config'])
elif action_type == 'scale_deployment':
await self._action_scale_deployment(action['config'])
elif action_type == 'send_alert':
await self._action_send_alert(action['config'])
elif action_type == 'update_traffic_allocation':
await self._action_update_traffic(action['config'])
# Log action execution
await self._log_action_execution(rule_name, action)
except Exception as e:
logging.error(f"Action execution failed: {e}")
await self._log_action_failure(rule_name, action, str(e))
async def _action_retrain_model(self, config: Dict):
"""Action: Trigger model retraining"""
pipeline_id = config['pipeline_id']
trigger_reason = config.get('reason', 'automated_retraining')
# Trigger training pipeline
job_id = await self.training_pipeline.trigger_training(
pipeline_id,
trigger_reason
)
logging.info(f"Automated retraining triggered: job_id={job_id}")
async def _action_rollback_deployment(self, config: Dict):
"""Action: Rollback deployment"""
deployment_id = config['deployment_id']
reason = config.get('reason', 'automated_rollback')
# Execute rollback
rollback_id = await self.orchestrator.rollback_deployment(
deployment_id,
reason
)
logging.info(f"Automated rollback executed: {rollback_id}")
async def _action_scale_deployment(self, config: Dict):
"""Action: Scale deployment resources"""
deployment_id = config['deployment_id']
scale_config = config['scaling']
# Update deployment scaling
await self.orchestrator.deployment_manager.scale_deployment(
deployment_id,
scale_config
)
logging.info(f"Deployment scaled: {deployment_id}")
async def create_model_retirement_plan(self,
model_id: str,
retirement_date: datetime,
migration_plan: Dict) -> str:
"""Create automated model retirement plan"""
retirement_plan_id = str(uuid.uuid4())
plan = {
'plan_id': retirement_plan_id,
'model_id': model_id,
'retirement_date': retirement_date,
'migration_plan': migration_plan,
'phases': [
{
'phase': 'deprecation_warning',
'start_date': retirement_date - timedelta(days=90),
'actions': ['send_deprecation_notices', 'update_documentation']
},
{
'phase': 'traffic_reduction',
'start_date': retirement_date - timedelta(days=30),
'actions': ['reduce_traffic_allocation', 'monitor_performance']
},
{
'phase': 'final_migration',
'start_date': retirement_date - timedelta(days=7),
'actions': ['complete_traffic_migration', 'verify_new_model']
},
{
'phase': 'retirement',
'start_date': retirement_date,
'actions': ['stop_deployment', 'archive_model', 'cleanup_resources']
}
],
'created_at': datetime.now()
}
# Store retirement plan
self.lifecycle_policies[retirement_plan_id] = plan
# Schedule retirement phases
await self._schedule_retirement_phases(plan)
return retirement_plan_id
Best Practices Checklist
Conclusion
MLOps transforms machine learning from experimental code to reliable production systems. By implementing comprehensive lifecycle management, automated training pipelines, continuous monitoring, and intelligent automation, you can build ML systems that deliver consistent business value. Remember that MLOps is not just about tools—it's about creating processes that enable reliable, scalable, and maintainable machine learning in production. Start with the basics, measure everything, and continuously evolve your MLOps practices based on real-world operational experience.