Master AI model evaluation with comprehensive metrics, testing strategies, and validation frameworks for building reliable and robust machine learning systems.
Evaluating AI model performance is crucial for building reliable, robust, and trustworthy machine learning systems. Proper evaluation goes beyond simple accuracy metrics to encompass fairness, robustness, interpretability, and real-world performance. This comprehensive guide covers evaluation frameworks, metrics, testing strategies, and production monitoring approaches.
Comprehensive Evaluation Framework
AI model evaluation should be systematic, multi-dimensional, and aligned with business objectives. A comprehensive evaluation framework considers technical performance, business impact, ethical considerations, and operational requirements.
Core Evaluation Components
# evaluation_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union, Callable, Tuple
from enum import Enum
from datetime import datetime
import numpy as np
import pandas as pd
import json
import logging
from collections import defaultdict
class EvaluationDimension(Enum):
PERFORMANCE = "performance"
ROBUSTNESS = "robustness"
FAIRNESS = "fairness"
INTERPRETABILITY = "interpretability"
EFFICIENCY = "efficiency"
SAFETY = "safety"
BUSINESS_IMPACT = "business_impact"
class ModelType(Enum):
CLASSIFICATION = "classification"
REGRESSION = "regression"
RANKING = "ranking"
GENERATION = "generation"
RECOMMENDATION = "recommendation"
TIME_SERIES = "time_series"
@dataclass
class EvaluationMetric:
name: str
description: str
higher_is_better: bool
score_range: Tuple[float, float]
threshold: Optional[float] = None
weight: float = 1.0
@dataclass
class EvaluationResult:
metric_name: str
score: float
confidence_interval: Optional[Tuple[float, float]] = None
p_value: Optional[float] = None
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class EvaluationReport:
model_id: str
model_version: str
dataset_info: Dict[str, Any]
evaluation_config: Dict[str, Any]
results: Dict[str, EvaluationResult]
overall_score: float
recommendations: List[str]
timestamp: datetime = field(default_factory=datetime.now)
class MetricCalculator(ABC):
"""Abstract base class for metric calculations"""
@abstractmethod
def calculate(self,
y_true: np.ndarray,
y_pred: np.ndarray,
**kwargs) -> EvaluationResult:
"""Calculate metric value"""
pass
@abstractmethod
def get_metric_info(self) -> EvaluationMetric:
"""Get metric information"""
pass
class ModelEvaluator:
"""Comprehensive model evaluator"""
def __init__(self,
model_type: ModelType,
evaluation_config: Dict[str, Any] = None):
self.model_type = model_type
self.config = evaluation_config or {}
# Initialize metric calculators
self.metric_calculators: Dict[str, MetricCalculator] = {}
self._initialize_default_metrics()
# Evaluation history
self.evaluation_history: List[EvaluationReport] = []
self.logger = logging.getLogger(__name__)
def _initialize_default_metrics(self):
"""Initialize default metrics based on model type"""
if self.model_type == ModelType.CLASSIFICATION:
self.add_metric("accuracy", AccuracyCalculator())
self.add_metric("precision", PrecisionCalculator())
self.add_metric("recall", RecallCalculator())
self.add_metric("f1_score", F1ScoreCalculator())
self.add_metric("roc_auc", ROCAUCCalculator())
self.add_metric("pr_auc", PRAUCCalculator())
elif self.model_type == ModelType.REGRESSION:
self.add_metric("mse", MSECalculator())
self.add_metric("rmse", RMSECalculator())
self.add_metric("mae", MAECalculator())
self.add_metric("r2_score", R2ScoreCalculator())
self.add_metric("mape", MAPECalculator())
elif self.model_type == ModelType.RANKING:
self.add_metric("ndcg", NDCGCalculator())
self.add_metric("map", MAPCalculator())
self.add_metric("mrr", MRRCalculator())
elif self.model_type == ModelType.GENERATION:
self.add_metric("bleu", BLEUCalculator())
self.add_metric("rouge", ROUGECalculator())
self.add_metric("meteor", METEORCalculator())
# Add dimension-specific metrics
self.add_metric("calibration", CalibrationCalculator())
self.add_metric("robustness", RobustnessCalculator())
def add_metric(self, name: str, calculator: MetricCalculator):
"""Add custom metric calculator"""
self.metric_calculators[name] = calculator
def evaluate(self,
y_true: np.ndarray,
y_pred: np.ndarray,
model_id: str,
model_version: str,
dataset_info: Dict[str, Any] = None,
additional_data: Dict[str, Any] = None) -> EvaluationReport:
"""Comprehensive model evaluation"""
self.logger.info(f"Starting evaluation for model {model_id}:{model_version}")
# Calculate all metrics
results = {}
for metric_name, calculator in self.metric_calculators.items():
try:
if additional_data and metric_name in ["calibration", "robustness"]:
# Some metrics need additional data
result = calculator.calculate(
y_true, y_pred,
**additional_data.get(metric_name, {})
)
else:
result = calculator.calculate(y_true, y_pred)
results[metric_name] = result
self.logger.debug(f"Calculated {metric_name}: {result.score:.4f}")
except Exception as e:
self.logger.error(f"Failed to calculate {metric_name}: {e}")
results[metric_name] = EvaluationResult(
metric_name=metric_name,
score=0.0,
metadata={"error": str(e)}
)
# Calculate overall score
overall_score = self._calculate_overall_score(results)
# Generate recommendations
recommendations = self._generate_recommendations(results)
# Create evaluation report
report = EvaluationReport(
model_id=model_id,
model_version=model_version,
dataset_info=dataset_info or {},
evaluation_config=self.config,
results=results,
overall_score=overall_score,
recommendations=recommendations
)
self.evaluation_history.append(report)
self.logger.info(f"Evaluation completed. Overall score: {overall_score:.4f}")
return report
def _calculate_overall_score(self, results: Dict[str, EvaluationResult]) -> float:
"""Calculate weighted overall score"""
total_weight = 0.0
weighted_sum = 0.0
for metric_name, result in results.items():
if metric_name in self.metric_calculators:
metric_info = self.metric_calculators[metric_name].get_metric_info()
# Normalize score to 0-1 range
score_min, score_max = metric_info.score_range
normalized_score = (result.score - score_min) / (score_max - score_min)
# Adjust for higher_is_better
if not metric_info.higher_is_better:
normalized_score = 1.0 - normalized_score
weighted_sum += normalized_score * metric_info.weight
total_weight += metric_info.weight
return weighted_sum / total_weight if total_weight > 0 else 0.0
def _generate_recommendations(self, results: Dict[str, EvaluationResult]) -> List[str]:
"""Generate actionable recommendations based on results"""
recommendations = []
for metric_name, result in results.items():
if metric_name in self.metric_calculators:
metric_info = self.metric_calculators[metric_name].get_metric_info()
# Check if metric is below threshold
if metric_info.threshold is not None:
if metric_info.higher_is_better and result.score < metric_info.threshold:
recommendations.append(
f"Improve {metric_name}: current {result.score:.3f} < threshold {metric_info.threshold:.3f}"
)
elif not metric_info.higher_is_better and result.score > metric_info.threshold:
recommendations.append(
f"Reduce {metric_name}: current {result.score:.3f} > threshold {metric_info.threshold:.3f}"
)
# Add general recommendations based on model type
if self.model_type == ModelType.CLASSIFICATION:
if "f1_score" in results and results["f1_score"].score < 0.7:
recommendations.append("Consider feature engineering or model architecture changes to improve F1 score")
if "roc_auc" in results and results["roc_auc"].score < 0.8:
recommendations.append("Model discriminative power is low. Consider more complex models or additional features")
elif self.model_type == ModelType.REGRESSION:
if "r2_score" in results and results["r2_score"].score < 0.7:
recommendations.append("Model explains low variance. Consider polynomial features or ensemble methods")
return recommendations
def compare_models(self,
reports: List[EvaluationReport],
comparison_metrics: List[str] = None) -> Dict[str, Any]:
"""Compare multiple model evaluation reports"""
if len(reports) < 2:
raise ValueError("Need at least 2 reports for comparison")
comparison_metrics = comparison_metrics or list(reports[0].results.keys())
comparison = {
"models": [f"{r.model_id}:{r.model_version}" for r in reports],
"metrics_comparison": {},
"best_model_per_metric": {},
"overall_ranking": []
}
# Compare each metric
for metric_name in comparison_metrics:
metric_scores = []
for report in reports:
if metric_name in report.results:
metric_scores.append(report.results[metric_name].score)
else:
metric_scores.append(None)
comparison["metrics_comparison"][metric_name] = metric_scores
# Find best model for this metric
if all(score is not None for score in metric_scores):
metric_info = self.metric_calculators[metric_name].get_metric_info()
if metric_info.higher_is_better:
best_idx = np.argmax(metric_scores)
else:
best_idx = np.argmin(metric_scores)
comparison["best_model_per_metric"][metric_name] = {
"model": comparison["models"][best_idx],
"score": metric_scores[best_idx]
}
# Overall ranking by overall score
overall_scores = [report.overall_score for report in reports]
ranking_indices = np.argsort(overall_scores)[::-1] # Descending order
comparison["overall_ranking"] = [
{
"rank": i + 1,
"model": comparison["models"][idx],
"overall_score": overall_scores[idx]
}
for i, idx in enumerate(ranking_indices)
]
return comparison
def statistical_significance_test(self,
results1: EvaluationResult,
results2: EvaluationResult,
test_type: str = "paired_t_test") -> Dict[str, Any]:
"""Test statistical significance between two results"""
# This would implement various statistical tests
# For now, return a placeholder
return {
"test_type": test_type,
"p_value": 0.05,
"significant": True,
"effect_size": 0.2
}
Advanced Metric Implementations
# advanced_metrics.py
import numpy as np
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score, mean_squared_error,
mean_absolute_error, r2_score, log_loss, confusion_matrix
)
from sklearn.calibration import calibration_curve
from sklearn.model_selection import bootstrap_resample
import scipy.stats as stats
from typing import List, Tuple, Optional, Dict, Any
class AccuracyCalculator(MetricCalculator):
"""Calculate accuracy with confidence intervals"""
def calculate(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> EvaluationResult:
score = accuracy_score(y_true, y_pred)
# Bootstrap confidence interval
ci = self._bootstrap_confidence_interval(y_true, y_pred, accuracy_score)
return EvaluationResult(
metric_name="accuracy",
score=score,
confidence_interval=ci,
metadata={"n_samples": len(y_true)}
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="accuracy",
description="Fraction of predictions that match binary or multiclass labels",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.8,
weight=1.0
)
def _bootstrap_confidence_interval(self,
y_true: np.ndarray,
y_pred: np.ndarray,
metric_func: Callable,
n_bootstrap: int = 1000,
confidence_level: float = 0.95) -> Tuple[float, float]:
"""Calculate bootstrap confidence interval"""
bootstrap_scores = []
n_samples = len(y_true)
for _ in range(n_bootstrap):
# Bootstrap sample
indices = np.random.choice(n_samples, size=n_samples, replace=True)
y_true_boot = y_true[indices]
y_pred_boot = y_pred[indices]
# Calculate metric
score = metric_func(y_true_boot, y_pred_boot)
bootstrap_scores.append(score)
# Calculate confidence interval
alpha = 1 - confidence_level
lower_percentile = (alpha / 2) * 100
upper_percentile = (1 - alpha / 2) * 100
ci_lower = np.percentile(bootstrap_scores, lower_percentile)
ci_upper = np.percentile(bootstrap_scores, upper_percentile)
return (ci_lower, ci_upper)
class ROCAUCCalculator(MetricCalculator):
"""Calculate ROC-AUC with additional analysis"""
def calculate(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> EvaluationResult:
# Handle multiclass case
if len(np.unique(y_true)) > 2:
score = roc_auc_score(y_true, y_pred, multi_class='ovr')
multiclass = True
else:
score = roc_auc_score(y_true, y_pred)
multiclass = False
# Calculate DeLong confidence interval for binary case
ci = None
p_value = None
if not multiclass:
ci, p_value = self._delong_confidence_interval(y_true, y_pred)
metadata = {
"multiclass": multiclass,
"n_positive": int(np.sum(y_true)),
"n_negative": int(len(y_true) - np.sum(y_true))
}
return EvaluationResult(
metric_name="roc_auc",
score=score,
confidence_interval=ci,
p_value=p_value,
metadata=metadata
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="roc_auc",
description="Area under the Receiver Operating Characteristic curve",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.8,
weight=1.5
)
def _delong_confidence_interval(self,
y_true: np.ndarray,
y_pred: np.ndarray,
confidence_level: float = 0.95) -> Tuple[Tuple[float, float], float]:
"""Calculate DeLong confidence interval for AUC"""
# Simplified DeLong method
auc = roc_auc_score(y_true, y_pred)
# Calculate variance using DeLong method (simplified)
n_pos = np.sum(y_true == 1)
n_neg = np.sum(y_true == 0)
# This is a simplified version - full DeLong requires more complex calculations
se = np.sqrt((auc * (1 - auc) + (n_pos - 1) * (auc / (2 - auc) - auc**2) +
(n_neg - 1) * (2 * auc**2 / (1 + auc) - auc**2)) / (n_pos * n_neg))
# Confidence interval
z = stats.norm.ppf(1 - (1 - confidence_level) / 2)
ci_lower = max(0, auc - z * se)
ci_upper = min(1, auc + z * se)
# P-value for H0: AUC = 0.5
z_score = (auc - 0.5) / se
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
return (ci_lower, ci_upper), p_value
class CalibrationCalculator(MetricCalculator):
"""Calculate model calibration metrics"""
def calculate(self, y_true: np.ndarray, y_pred_proba: np.ndarray, **kwargs) -> EvaluationResult:
# Brier score
brier_score = np.mean((y_pred_proba - y_true) ** 2)
# Calibration curve analysis
fraction_of_positives, mean_predicted_value = calibration_curve(
y_true, y_pred_proba, n_bins=10
)
# Expected Calibration Error (ECE)
ece = self._calculate_ece(y_true, y_pred_proba)
# Maximum Calibration Error (MCE)
mce = self._calculate_mce(y_true, y_pred_proba)
# Reliability (how close predicted probabilities are to actual frequencies)
reliability = np.mean(np.abs(fraction_of_positives - mean_predicted_value))
metadata = {
"brier_score": brier_score,
"ece": ece,
"mce": mce,
"reliability": reliability,
"calibration_curve": {
"fraction_of_positives": fraction_of_positives.tolist(),
"mean_predicted_value": mean_predicted_value.tolist()
}
}
# Overall calibration score (lower is better, so we invert)
calibration_score = 1.0 - ece
return EvaluationResult(
metric_name="calibration",
score=calibration_score,
metadata=metadata
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="calibration",
description="How well predicted probabilities match actual frequencies",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.9,
weight=1.2
)
def _calculate_ece(self, y_true: np.ndarray, y_pred_proba: np.ndarray, n_bins: int = 10) -> float:
"""Calculate Expected Calibration Error"""
bin_boundaries = np.linspace(0, 1, n_bins + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
ece = 0
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
# Find predictions in this bin
in_bin = (y_pred_proba > bin_lower) & (y_pred_proba <= bin_upper)
prop_in_bin = in_bin.mean()
if prop_in_bin > 0:
accuracy_in_bin = y_true[in_bin].mean()
avg_confidence_in_bin = y_pred_proba[in_bin].mean()
ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
return ece
def _calculate_mce(self, y_true: np.ndarray, y_pred_proba: np.ndarray, n_bins: int = 10) -> float:
"""Calculate Maximum Calibration Error"""
bin_boundaries = np.linspace(0, 1, n_bins + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
calibration_errors = []
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
in_bin = (y_pred_proba > bin_lower) & (y_pred_proba <= bin_upper)
if in_bin.sum() > 0:
accuracy_in_bin = y_true[in_bin].mean()
avg_confidence_in_bin = y_pred_proba[in_bin].mean()
calibration_errors.append(abs(avg_confidence_in_bin - accuracy_in_bin))
return max(calibration_errors) if calibration_errors else 0.0
class RobustnessCalculator(MetricCalculator):
"""Calculate model robustness metrics"""
def calculate(self,
y_true: np.ndarray,
y_pred: np.ndarray,
model=None,
X_test: np.ndarray = None,
**kwargs) -> EvaluationResult:
robustness_scores = {}
# Prediction stability (if multiple predictions available)
if "prediction_variance" in kwargs:
robustness_scores["prediction_stability"] = 1.0 - kwargs["prediction_variance"]
# Adversarial robustness (simplified)
if model is not None and X_test is not None:
adv_robustness = self._estimate_adversarial_robustness(model, X_test, y_true)
robustness_scores["adversarial_robustness"] = adv_robustness
# Input perturbation robustness
if "perturbation_scores" in kwargs:
robustness_scores["perturbation_robustness"] = np.mean(kwargs["perturbation_scores"])
# Overall robustness score
overall_robustness = np.mean(list(robustness_scores.values())) if robustness_scores else 0.5
return EvaluationResult(
metric_name="robustness",
score=overall_robustness,
metadata=robustness_scores
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="robustness",
description="Model resistance to input perturbations and adversarial attacks",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.8,
weight=1.3
)
def _estimate_adversarial_robustness(self,
model,
X_test: np.ndarray,
y_true: np.ndarray,
epsilon: float = 0.01) -> float:
"""Estimate adversarial robustness using simple perturbations"""
# Add small random perturbations
noise = np.random.normal(0, epsilon, X_test.shape)
X_perturbed = X_test + noise
# Get predictions on perturbed data
try:
if hasattr(model, 'predict'):
y_pred_perturbed = model.predict(X_perturbed)
y_pred_original = model.predict(X_test)
# Calculate prediction consistency
if len(np.unique(y_true)) == 2: # Binary classification
consistency = np.mean(y_pred_perturbed == y_pred_original)
else:
# For regression or multi-class, use correlation
consistency = np.corrcoef(y_pred_original, y_pred_perturbed)[0, 1]
consistency = max(0, consistency) # Ensure non-negative
return consistency
else:
return 0.5 # Default if can't evaluate
except Exception:
return 0.5 # Default on error
class FairnessCalculator(MetricCalculator):
"""Calculate fairness metrics"""
def calculate(self,
y_true: np.ndarray,
y_pred: np.ndarray,
sensitive_features: np.ndarray = None,
**kwargs) -> EvaluationResult:
if sensitive_features is None:
return EvaluationResult(
metric_name="fairness",
score=1.0, # Assume fair if no sensitive features provided
metadata={"note": "No sensitive features provided"}
)
fairness_metrics = {}
# Demographic parity
demographic_parity = self._calculate_demographic_parity(y_pred, sensitive_features)
fairness_metrics["demographic_parity"] = demographic_parity
# Equalized odds
equalized_odds = self._calculate_equalized_odds(y_true, y_pred, sensitive_features)
fairness_metrics["equalized_odds"] = equalized_odds
# Equal opportunity
equal_opportunity = self._calculate_equal_opportunity(y_true, y_pred, sensitive_features)
fairness_metrics["equal_opportunity"] = equal_opportunity
# Overall fairness score (higher is more fair)
overall_fairness = np.mean(list(fairness_metrics.values()))
return EvaluationResult(
metric_name="fairness",
score=overall_fairness,
metadata=fairness_metrics
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="fairness",
description="Model fairness across different demographic groups",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.8,
weight=1.5
)
def _calculate_demographic_parity(self, y_pred: np.ndarray, sensitive_features: np.ndarray) -> float:
"""Calculate demographic parity violation"""
groups = np.unique(sensitive_features)
positive_rates = []
for group in groups:
group_mask = sensitive_features == group
positive_rate = np.mean(y_pred[group_mask])
positive_rates.append(positive_rate)
# Calculate maximum difference in positive rates
max_diff = np.max(positive_rates) - np.min(positive_rates)
# Convert to fairness score (1 = perfectly fair, 0 = maximally unfair)
return 1.0 - max_diff
def _calculate_equalized_odds(self,
y_true: np.ndarray,
y_pred: np.ndarray,
sensitive_features: np.ndarray) -> float:
"""Calculate equalized odds violation"""
groups = np.unique(sensitive_features)
tpr_differences = []
fpr_differences = []
tprs = []
fprs = []
for group in groups:
group_mask = sensitive_features == group
y_true_group = y_true[group_mask]
y_pred_group = y_pred[group_mask]
# True Positive Rate
tp = np.sum((y_true_group == 1) & (y_pred_group == 1))
fn = np.sum((y_true_group == 1) & (y_pred_group == 0))
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
tprs.append(tpr)
# False Positive Rate
fp = np.sum((y_true_group == 0) & (y_pred_group == 1))
tn = np.sum((y_true_group == 0) & (y_pred_group == 0))
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
fprs.append(fpr)
# Maximum differences
max_tpr_diff = np.max(tprs) - np.min(tprs)
max_fpr_diff = np.max(fprs) - np.min(fprs)
# Average fairness across both rates
return 1.0 - (max_tpr_diff + max_fpr_diff) / 2
def _calculate_equal_opportunity(self,
y_true: np.ndarray,
y_pred: np.ndarray,
sensitive_features: np.ndarray) -> float:
"""Calculate equal opportunity violation"""
groups = np.unique(sensitive_features)
tprs = []
for group in groups:
group_mask = sensitive_features == group
y_true_group = y_true[group_mask]
y_pred_group = y_pred[group_mask]
# True Positive Rate for positive class only
tp = np.sum((y_true_group == 1) & (y_pred_group == 1))
fn = np.sum((y_true_group == 1) & (y_pred_group == 0))
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
tprs.append(tpr)
# Maximum difference in TPRs
max_tpr_diff = np.max(tprs) - np.min(tprs)
return 1.0 - max_tpr_diff
# Additional specialized metrics for different model types
class BLEUCalculator(MetricCalculator):
"""BLEU score for text generation"""
def calculate(self, y_true: List[str], y_pred: List[str], **kwargs) -> EvaluationResult:
# Simplified BLEU calculation
# In practice, use nltk.translate.bleu_score or similar
total_score = 0.0
n_samples = len(y_true)
for true_text, pred_text in zip(y_true, y_pred):
# Simplified word-level BLEU
true_words = set(true_text.lower().split())
pred_words = set(pred_text.lower().split())
if len(pred_words) > 0:
precision = len(true_words & pred_words) / len(pred_words)
total_score += precision
average_bleu = total_score / n_samples if n_samples > 0 else 0.0
return EvaluationResult(
metric_name="bleu",
score=average_bleu,
metadata={"n_samples": n_samples}
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="bleu",
description="BLEU score for text generation quality",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.3,
weight=1.0
)
class NDCGCalculator(MetricCalculator):
"""Normalized Discounted Cumulative Gain for ranking"""
def calculate(self,
y_true: np.ndarray,
y_scores: np.ndarray,
k: int = 10,
**kwargs) -> EvaluationResult:
# Calculate NDCG@k
ndcg_score = self._calculate_ndcg(y_true, y_scores, k)
return EvaluationResult(
metric_name="ndcg",
score=ndcg_score,
metadata={"k": k}
)
def get_metric_info(self) -> EvaluationMetric:
return EvaluationMetric(
name="ndcg",
description="Normalized Discounted Cumulative Gain for ranking quality",
higher_is_better=True,
score_range=(0.0, 1.0),
threshold=0.7,
weight=1.0
)
def _calculate_ndcg(self, y_true: np.ndarray, y_scores: np.ndarray, k: int) -> float:
"""Calculate NDCG@k"""
# Sort by predicted scores
order = np.argsort(y_scores)[::-1]
y_true_sorted = y_true[order]
# Calculate DCG@k
dcg = 0.0
for i in range(min(k, len(y_true_sorted))):
rel = y_true_sorted[i]
dcg += (2**rel - 1) / np.log2(i + 2)
# Calculate IDCG@k (ideal DCG)
y_true_ideal = np.sort(y_true)[::-1]
idcg = 0.0
for i in range(min(k, len(y_true_ideal))):
rel = y_true_ideal[i]
idcg += (2**rel - 1) / np.log2(i + 2)
return dcg / idcg if idcg > 0 else 0.0
Automated Testing Framework
# automated_testing.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Callable, Tuple
from dataclasses import dataclass
from datetime import datetime
import json
import logging
from abc import ABC, abstractmethod
class TestCase(ABC):
"""Abstract base class for test cases"""
@abstractmethod
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the test case"""
pass
@abstractmethod
def get_test_info(self) -> Dict[str, Any]:
"""Get test case information"""
pass
class ModelTestSuite:
"""Comprehensive model testing suite"""
def __init__(self, model_type: str):
self.model_type = model_type
self.test_cases: List[TestCase] = []
self.test_results: List[Dict[str, Any]] = []
# Initialize default test cases
self._initialize_default_tests()
self.logger = logging.getLogger(__name__)
def _initialize_default_tests(self):
"""Initialize default test cases"""
# Core functionality tests
self.add_test_case(DataShapeTest())
self.add_test_case(PredictionRangeTest())
self.add_test_case(DeterminismTest())
# Robustness tests
self.add_test_case(NoiseRobustnessTest())
self.add_test_case(OutlierRobustnessTest())
self.add_test_case(MissingValueTest())
# Performance tests
self.add_test_case(LatencyTest())
self.add_test_case(MemoryUsageTest())
self.add_test_case(ThroughputTest())
# Fairness tests
self.add_test_case(BiasDetectionTest())
# Edge case tests
self.add_test_case(EmptyInputTest())
self.add_test_case(ExtremeValueTest())
def add_test_case(self, test_case: TestCase):
"""Add a test case to the suite"""
self.test_cases.append(test_case)
async def run_all_tests(self,
model,
test_data: Dict[str, Any],
parallel: bool = True) -> Dict[str, Any]:
"""Run all test cases"""
self.logger.info(f"Running {len(self.test_cases)} test cases")
start_time = datetime.now()
if parallel:
# Run tests in parallel
tasks = [
self._run_single_test_async(test_case, model, test_data)
for test_case in self.test_cases
]
results = await asyncio.gather(*tasks, return_exceptions=True)
else:
# Run tests sequentially
results = []
for test_case in self.test_cases:
result = await self._run_single_test_async(test_case, model, test_data)
results.append(result)
end_time = datetime.now()
# Process results
test_results = []
passed_tests = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
test_result = {
"test_name": self.test_cases[i].__class__.__name__,
"status": "error",
"error": str(result),
"timestamp": datetime.now()
}
else:
test_result = result
if result.get("passed", False):
passed_tests += 1
test_results.append(test_result)
# Generate summary
summary = {
"total_tests": len(self.test_cases),
"passed_tests": passed_tests,
"failed_tests": len(self.test_cases) - passed_tests,
"pass_rate": passed_tests / len(self.test_cases),
"execution_time": (end_time - start_time).total_seconds(),
"timestamp": start_time,
"test_results": test_results
}
self.test_results.append(summary)
self.logger.info(
f"Test suite completed: {passed_tests}/{len(self.test_cases)} tests passed "
f"({summary['pass_rate']:.1%} pass rate)"
)
return summary
async def _run_single_test_async(self,
test_case: TestCase,
model,
test_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run a single test case asynchronously"""
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(
None, test_case.run_test, model, test_data
)
return result
except Exception as e:
return {
"test_name": test_case.__class__.__name__,
"status": "error",
"error": str(e),
"timestamp": datetime.now()
}
# Specific test case implementations
class DataShapeTest(TestCase):
"""Test that model handles input data shapes correctly"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
try:
# Test with original data
predictions = model.predict(X_test)
# Verify output shape
expected_shape = (len(X_test),) if hasattr(model, 'predict') else None
actual_shape = predictions.shape
shape_correct = (
expected_shape is None or
actual_shape[0] == expected_shape[0]
)
return {
"test_name": "data_shape_test",
"passed": shape_correct,
"details": {
"input_shape": X_test.shape,
"output_shape": actual_shape,
"expected_output_samples": expected_shape[0] if expected_shape else None
},
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "data_shape_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Data Shape Test",
"description": "Verify model handles input data shapes correctly",
"category": "functionality"
}
class DeterminismTest(TestCase):
"""Test model determinism"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
try:
# Make predictions twice
predictions1 = model.predict(X_test[:100]) # Use subset for speed
predictions2 = model.predict(X_test[:100])
# Check if predictions are identical
if hasattr(predictions1, 'shape'):
deterministic = np.allclose(predictions1, predictions2, rtol=1e-10)
else:
deterministic = predictions1 == predictions2
return {
"test_name": "determinism_test",
"passed": deterministic,
"details": {
"max_difference": float(np.max(np.abs(predictions1 - predictions2))) if hasattr(predictions1, 'shape') else None,
"samples_tested": len(predictions1)
},
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "determinism_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Determinism Test",
"description": "Verify model produces consistent predictions",
"category": "functionality"
}
class NoiseRobustnessTest(TestCase):
"""Test model robustness to input noise"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
try:
# Original predictions
original_predictions = model.predict(X_test[:100])
# Add noise and predict
noise_levels = [0.01, 0.05, 0.1]
robustness_scores = []
for noise_level in noise_levels:
noise = np.random.normal(0, noise_level, X_test[:100].shape)
X_noisy = X_test[:100] + noise
noisy_predictions = model.predict(X_noisy)
# Calculate similarity
if len(np.unique(original_predictions)) <= 10: # Classification
similarity = np.mean(original_predictions == noisy_predictions)
else: # Regression
similarity = np.corrcoef(original_predictions, noisy_predictions)[0, 1]
similarity = max(0, similarity) # Ensure non-negative
robustness_scores.append(similarity)
# Overall robustness score
avg_robustness = np.mean(robustness_scores)
passed = avg_robustness > 0.8 # 80% threshold
return {
"test_name": "noise_robustness_test",
"passed": passed,
"details": {
"noise_levels": noise_levels,
"robustness_scores": robustness_scores,
"average_robustness": avg_robustness
},
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "noise_robustness_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Noise Robustness Test",
"description": "Test model resistance to input noise",
"category": "robustness"
}
class LatencyTest(TestCase):
"""Test model prediction latency"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
try:
import time
# Warm up
model.predict(X_test[:10])
# Test single prediction latency
single_latencies = []
for i in range(10):
start_time = time.time()
model.predict(X_test[i:i+1])
latency = (time.time() - start_time) * 1000 # ms
single_latencies.append(latency)
# Test batch prediction latency
start_time = time.time()
model.predict(X_test[:100])
batch_latency = (time.time() - start_time) * 1000 # ms
avg_single_latency = np.mean(single_latencies)
avg_batch_per_sample = batch_latency / 100
# Pass if latencies are reasonable (< 100ms per sample)
passed = avg_single_latency < 100 and avg_batch_per_sample < 100
return {
"test_name": "latency_test",
"passed": passed,
"details": {
"avg_single_latency_ms": avg_single_latency,
"batch_latency_ms": batch_latency,
"avg_batch_per_sample_ms": avg_batch_per_sample,
"single_latency_std": np.std(single_latencies)
},
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "latency_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Latency Test",
"description": "Measure model prediction latency",
"category": "performance"
}
class BiasDetectionTest(TestCase):
"""Test for potential bias in model predictions"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
y_test = test_data.get('y_test')
sensitive_features = test_data.get('sensitive_features')
if sensitive_features is None:
return {
"test_name": "bias_detection_test",
"passed": True, # Pass if no sensitive features provided
"details": {"note": "No sensitive features provided for bias testing"},
"timestamp": datetime.now()
}
try:
predictions = model.predict(X_test)
# Calculate fairness metrics
fairness_calculator = FairnessCalculator()
fairness_result = fairness_calculator.calculate(
y_test, predictions, sensitive_features
)
# Pass if fairness score is above threshold
passed = fairness_result.score > 0.8
return {
"test_name": "bias_detection_test",
"passed": passed,
"details": {
"fairness_score": fairness_result.score,
"fairness_breakdown": fairness_result.metadata
},
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "bias_detection_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Bias Detection Test",
"description": "Test for potential bias in model predictions",
"category": "fairness"
}
class ExtremeValueTest(TestCase):
"""Test model behavior with extreme input values"""
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
X_test = test_data['X_test']
try:
# Create extreme value test cases
X_extreme = X_test[:10].copy()
# Test with very large values
X_large = X_extreme * 1000
# Test with very small values
X_small = X_extreme * 0.001
# Test with zeros
X_zeros = np.zeros_like(X_extreme)
test_cases = [
("large_values", X_large),
("small_values", X_small),
("zero_values", X_zeros)
]
results = {}
all_passed = True
for test_name, X_test_case in test_cases:
try:
predictions = model.predict(X_test_case)
# Check if predictions are valid (not NaN, not infinite)
valid_predictions = (
not np.any(np.isnan(predictions)) and
not np.any(np.isinf(predictions))
)
results[test_name] = {
"passed": valid_predictions,
"prediction_stats": {
"mean": float(np.mean(predictions)),
"std": float(np.std(predictions)),
"min": float(np.min(predictions)),
"max": float(np.max(predictions))
}
}
if not valid_predictions:
all_passed = False
except Exception as e:
results[test_name] = {
"passed": False,
"error": str(e)
}
all_passed = False
return {
"test_name": "extreme_value_test",
"passed": all_passed,
"details": results,
"timestamp": datetime.now()
}
except Exception as e:
return {
"test_name": "extreme_value_test",
"passed": False,
"error": str(e),
"timestamp": datetime.now()
}
def get_test_info(self) -> Dict[str, Any]:
return {
"name": "Extreme Value Test",
"description": "Test model behavior with extreme input values",
"category": "robustness"
}
# Additional placeholder test classes for completeness
class PredictionRangeTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing prediction ranges
return {"test_name": "prediction_range_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Prediction Range Test", "description": "Test prediction value ranges", "category": "functionality"}
class OutlierRobustnessTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing outlier robustness
return {"test_name": "outlier_robustness_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Outlier Robustness Test", "description": "Test robustness to outliers", "category": "robustness"}
class MissingValueTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing missing value handling
return {"test_name": "missing_value_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Missing Value Test", "description": "Test handling of missing values", "category": "robustness"}
class MemoryUsageTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing memory usage
return {"test_name": "memory_usage_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Memory Usage Test", "description": "Test memory consumption", "category": "performance"}
class ThroughputTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing throughput
return {"test_name": "throughput_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Throughput Test", "description": "Test prediction throughput", "category": "performance"}
class EmptyInputTest(TestCase):
def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
# Implementation for testing empty input handling
return {"test_name": "empty_input_test", "passed": True, "timestamp": datetime.now()}
def get_test_info(self) -> Dict[str, Any]:
return {"name": "Empty Input Test", "description": "Test handling of empty inputs", "category": "edge_cases"}
Production Monitoring and Validation
# production_monitoring.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import logging
from collections import deque, defaultdict
import sqlite3
@dataclass
class ProductionMetrics:
timestamp: datetime
model_id: str
model_version: str
prediction_count: int
avg_latency_ms: float
error_rate: float
accuracy: Optional[float] = None
drift_score: Optional[float] = None
fairness_score: Optional[float] = None
class ProductionValidator:
"""Production model validation and monitoring"""
def __init__(self,
db_path: str = "./production_monitoring.db",
alert_thresholds: Dict[str, float] = None):
self.db_path = db_path
self.alert_thresholds = alert_thresholds or {
"error_rate": 0.05,
"latency_ms": 1000,
"accuracy_drop": 0.1,
"drift_score": 0.1,
"fairness_score": 0.8
}
# Initialize database
self._init_database()
# Real-time monitoring buffers
self.recent_predictions = defaultdict(lambda: deque(maxlen=1000))
self.performance_history = defaultdict(list)
# Alert system
self.active_alerts = []
self.alert_callbacks = []
self.logger = logging.getLogger(__name__)
def _init_database(self):
"""Initialize monitoring database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS production_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
model_id TEXT NOT NULL,
model_version TEXT NOT NULL,
prediction_count INTEGER,
avg_latency_ms REAL,
error_rate REAL,
accuracy REAL,
drift_score REAL,
fairness_score REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
model_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
resolved BOOLEAN DEFAULT FALSE,
resolved_at TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_model_time
ON production_metrics(model_id, timestamp)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_alerts_model_time
ON alerts(model_id, timestamp)
""")
async def log_prediction(self,
model_id: str,
model_version: str,
prediction_data: Dict[str, Any]):
"""Log a single prediction for monitoring"""
prediction_log = {
"timestamp": datetime.now(),
"model_id": model_id,
"model_version": model_version,
"latency_ms": prediction_data.get("latency_ms", 0),
"success": prediction_data.get("success", True),
"input_features": prediction_data.get("input_features"),
"prediction": prediction_data.get("prediction"),
"confidence": prediction_data.get("confidence")
}
self.recent_predictions[model_id].append(prediction_log)
# Trigger periodic validation
if len(self.recent_predictions[model_id]) % 100 == 0:
asyncio.create_task(self._periodic_validation(model_id, model_version))
async def _periodic_validation(self, model_id: str, model_version: str):
"""Perform periodic validation checks"""
recent_logs = list(self.recent_predictions[model_id])
if len(recent_logs) < 10:
return
# Calculate metrics
metrics = await self._calculate_current_metrics(model_id, model_version, recent_logs)
# Store metrics
await self._store_metrics(metrics)
# Check for alerts
await self._check_alerts(metrics)
async def _calculate_current_metrics(self,
model_id: str,
model_version: str,
recent_logs: List[Dict[str, Any]]) -> ProductionMetrics:
"""Calculate current performance metrics"""
if not recent_logs:
return ProductionMetrics(
timestamp=datetime.now(),
model_id=model_id,
model_version=model_version,
prediction_count=0,
avg_latency_ms=0.0,
error_rate=0.0
)
# Basic metrics
prediction_count = len(recent_logs)
latencies = [log["latency_ms"] for log in recent_logs if log["latency_ms"] > 0]
avg_latency_ms = np.mean(latencies) if latencies else 0.0
# Error rate
errors = [log for log in recent_logs if not log["success"]]
error_rate = len(errors) / prediction_count
# Accuracy (if ground truth available)
accuracy = await self._calculate_accuracy(recent_logs)
# Drift detection
drift_score = await self._calculate_drift_score(model_id, recent_logs)
# Fairness (if sensitive features available)
fairness_score = await self._calculate_fairness_score(recent_logs)
return ProductionMetrics(
timestamp=datetime.now(),
model_id=model_id,
model_version=model_version,
prediction_count=prediction_count,
avg_latency_ms=avg_latency_ms,
error_rate=error_rate,
accuracy=accuracy,
drift_score=drift_score,
fairness_score=fairness_score
)
async def _calculate_accuracy(self, recent_logs: List[Dict[str, Any]]) -> Optional[float]:
"""Calculate accuracy if ground truth is available"""
# This would require a mechanism to obtain ground truth labels
# For now, return None indicating accuracy is not available
return None
async def _calculate_drift_score(self,
model_id: str,
recent_logs: List[Dict[str, Any]]) -> Optional[float]:
"""Calculate data drift score"""
try:
# Extract input features from recent predictions
recent_features = []
for log in recent_logs:
if log["input_features"] is not None:
recent_features.append(log["input_features"])
if len(recent_features) < 50:
return None
# Get reference features from historical data
reference_features = await self._get_reference_features(model_id)
if not reference_features:
return None
# Simple drift detection using feature means
recent_means = np.mean(recent_features, axis=0)
reference_means = np.mean(reference_features, axis=0)
# Calculate relative change
drift_score = np.mean(np.abs((recent_means - reference_means) / (reference_means + 1e-8)))
return float(drift_score)
except Exception as e:
self.logger.error(f"Drift calculation failed: {e}")
return None
async def _calculate_fairness_score(self, recent_logs: List[Dict[str, Any]]) -> Optional[float]:
"""Calculate fairness score if sensitive features available"""
# This would require sensitive feature information in the logs
# For now, return None
return None
async def _get_reference_features(self, model_id: str) -> Optional[List[List[float]]]:
"""Get reference features for drift comparison"""
# This would retrieve historical feature data
# For now, return None
return None
async def _store_metrics(self, metrics: ProductionMetrics):
"""Store metrics in database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO production_metrics
(timestamp, model_id, model_version, prediction_count,
avg_latency_ms, error_rate, accuracy, drift_score, fairness_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
metrics.timestamp.isoformat(),
metrics.model_id,
metrics.model_version,
metrics.prediction_count,
metrics.avg_latency_ms,
metrics.error_rate,
metrics.accuracy,
metrics.drift_score,
metrics.fairness_score
))
async def _check_alerts(self, metrics: ProductionMetrics):
"""Check for alert conditions"""
alerts = []
# Error rate alert
if metrics.error_rate > self.alert_thresholds["error_rate"]:
alerts.append({
"type": "high_error_rate",
"severity": "high",
"message": f"Error rate {metrics.error_rate:.2%} exceeds threshold {self.alert_thresholds['error_rate']:.2%}",
"value": metrics.error_rate,
"threshold": self.alert_thresholds["error_rate"]
})
# Latency alert
if metrics.avg_latency_ms > self.alert_thresholds["latency_ms"]:
alerts.append({
"type": "high_latency",
"severity": "medium",
"message": f"Average latency {metrics.avg_latency_ms:.1f}ms exceeds threshold {self.alert_thresholds['latency_ms']:.1f}ms",
"value": metrics.avg_latency_ms,
"threshold": self.alert_thresholds["latency_ms"]
})
# Drift alert
if metrics.drift_score and metrics.drift_score > self.alert_thresholds["drift_score"]:
alerts.append({
"type": "data_drift",
"severity": "medium",
"message": f"Data drift score {metrics.drift_score:.3f} exceeds threshold {self.alert_thresholds['drift_score']:.3f}",
"value": metrics.drift_score,
"threshold": self.alert_thresholds["drift_score"]
})
# Fairness alert
if metrics.fairness_score and metrics.fairness_score < self.alert_thresholds["fairness_score"]:
alerts.append({
"type": "fairness_violation",
"severity": "high",
"message": f"Fairness score {metrics.fairness_score:.3f} below threshold {self.alert_thresholds['fairness_score']:.3f}",
"value": metrics.fairness_score,
"threshold": self.alert_thresholds["fairness_score"]
})
# Store and notify alerts
for alert in alerts:
await self._store_alert(metrics.model_id, alert)
await self._notify_alert(metrics.model_id, alert)
async def _store_alert(self, model_id: str, alert: Dict[str, Any]):
"""Store alert in database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO alerts
(timestamp, model_id, alert_type, severity, message)
VALUES (?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
model_id,
alert["type"],
alert["severity"],
alert["message"]
))
async def _notify_alert(self, model_id: str, alert: Dict[str, Any]):
"""Notify alert to callbacks"""
self.logger.warning(f"Alert for model {model_id}: {alert['message']}")
# Notify registered callbacks
for callback in self.alert_callbacks:
try:
await callback(model_id, alert)
except Exception as e:
self.logger.error(f"Alert callback failed: {e}")
def add_alert_callback(self, callback: Callable):
"""Add alert notification callback"""
self.alert_callbacks.append(callback)
async def get_model_health(self,
model_id: str,
time_window_hours: int = 24) -> Dict[str, Any]:
"""Get comprehensive model health report"""
start_time = datetime.now() - timedelta(hours=time_window_hours)
with sqlite3.connect(self.db_path) as conn:
# Get metrics
cursor = conn.execute("""
SELECT * FROM production_metrics
WHERE model_id = ? AND timestamp >= ?
ORDER BY timestamp DESC
""", (model_id, start_time.isoformat()))
metrics_data = cursor.fetchall()
# Get alerts
cursor = conn.execute("""
SELECT * FROM alerts
WHERE model_id = ? AND timestamp >= ?
ORDER BY timestamp DESC
""", (model_id, start_time.isoformat()))
alerts_data = cursor.fetchall()
if not metrics_data:
return {
"model_id": model_id,
"status": "no_data",
"message": f"No metrics found for the last {time_window_hours} hours"
}
# Calculate health metrics
latest_metrics = metrics_data[0]
# Health score calculation
health_factors = []
# Error rate factor
error_rate = latest_metrics[6] # error_rate column
error_factor = max(0, 1 - error_rate / self.alert_thresholds["error_rate"])
health_factors.append(error_factor)
# Latency factor
latency = latest_metrics[5] # avg_latency_ms column
latency_factor = max(0, 1 - latency / self.alert_thresholds["latency_ms"])
health_factors.append(latency_factor)
# Alert factor
active_alerts = [alert for alert in alerts_data if not alert[7]] # not resolved
alert_factor = max(0, 1 - len(active_alerts) / 10) # Penalize multiple alerts
health_factors.append(alert_factor)
overall_health = np.mean(health_factors)
# Determine status
if overall_health > 0.8:
status = "healthy"
elif overall_health > 0.6:
status = "warning"
else:
status = "critical"
return {
"model_id": model_id,
"status": status,
"health_score": overall_health,
"time_window_hours": time_window_hours,
"latest_metrics": {
"timestamp": latest_metrics[1],
"prediction_count": latest_metrics[4],
"avg_latency_ms": latest_metrics[5],
"error_rate": latest_metrics[6],
"accuracy": latest_metrics[7],
"drift_score": latest_metrics[8],
"fairness_score": latest_metrics[9]
},
"active_alerts": len(active_alerts),
"total_predictions": sum(row[4] for row in metrics_data),
"recommendations": self._generate_health_recommendations(overall_health, latest_metrics, active_alerts)
}
def _generate_health_recommendations(self,
health_score: float,
latest_metrics: tuple,
active_alerts: List[tuple]) -> List[str]:
"""Generate health improvement recommendations"""
recommendations = []
if health_score < 0.6:
recommendations.append("Immediate attention required - multiple performance issues detected")
error_rate = latest_metrics[6]
if error_rate > self.alert_thresholds["error_rate"]:
recommendations.append(f"High error rate ({error_rate:.2%}) - investigate model stability")
latency = latest_metrics[5]
if latency > self.alert_thresholds["latency_ms"]:
recommendations.append(f"High latency ({latency:.1f}ms) - consider model optimization")
if len(active_alerts) > 5:
recommendations.append("Multiple active alerts - prioritize resolution")
if not recommendations:
recommendations.append("Model performing well - continue monitoring")
return recommendations
# Example usage
async def demonstrate_production_monitoring():
"""Demonstrate production monitoring system"""
# Initialize production validator
validator = ProductionValidator(alert_thresholds={
"error_rate": 0.05,
"latency_ms": 500,
"drift_score": 0.1
})
# Add alert callback
async def alert_handler(model_id: str, alert: Dict[str, Any]):
print(f"ALERT for {model_id}: {alert['message']}")
validator.add_alert_callback(alert_handler)
# Simulate predictions
model_id = "sentiment_classifier_v1"
model_version = "1.0.0"
for i in range(200):
# Simulate prediction data
prediction_data = {
"latency_ms": np.random.normal(100, 20),
"success": np.random.random() > 0.02, # 2% error rate
"input_features": np.random.random(10).tolist(),
"prediction": np.random.choice([0, 1]),
"confidence": np.random.random()
}
await validator.log_prediction(model_id, model_version, prediction_data)
# Add some delay
await asyncio.sleep(0.01)
# Get model health report
health_report = await validator.get_model_health(model_id)
print(f"Model Health Report:")
print(json.dumps(health_report, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(demonstrate_production_monitoring())
Best Practices for AI Evaluation
Evaluation Strategy Design
- Multi-Dimensional Assessment: Evaluate across performance, robustness, fairness, and interpretability
- Business Alignment: Ensure metrics align with business objectives and user needs
- Continuous Evaluation: Implement ongoing evaluation in production environments
- Comparative Analysis: Always evaluate against baselines and alternative approaches
Metric Selection and Interpretation
- Domain-Appropriate Metrics: Choose metrics that are meaningful for your specific domain
- Multiple Metrics: Use multiple complementary metrics to get a complete picture
- Statistical Significance: Include confidence intervals and significance testing
- Practical Significance: Consider whether differences are practically meaningful
Production Monitoring
- Real-Time Monitoring: Monitor key metrics in real-time
- Automated Alerting: Set up automated alerts for performance degradation
- Drift Detection: Continuously monitor for data and concept drift
- Human-in-the-Loop: Include human validation in critical decisions
Testing and Validation
- Comprehensive Test Suites: Implement automated testing for multiple scenarios
- Edge Case Testing: Test thoroughly on edge cases and adversarial examples
- Cross-Validation: Use proper cross-validation techniques
- Hold-Out Testing: Maintain truly independent test sets
Conclusion
Comprehensive AI model evaluation is essential for building reliable, trustworthy, and effective machine learning systems. The frameworks, metrics, and practices covered in this guide provide a solid foundation for evaluating models across multiple dimensions and ensuring they meet both technical and business requirements.
Key principles for effective evaluation:
- Systematic Approach: Use structured evaluation frameworks
- Multiple Perspectives: Evaluate from technical, business, and ethical viewpoints
- Continuous Monitoring: Implement ongoing evaluation in production
- Actionable Insights: Focus on generating actionable recommendations
- Stakeholder Communication: Present results clearly to different audiences
As AI systems become more complex and critical to business operations, robust evaluation practices become increasingly important for maintaining trust, compliance, and competitive advantage.