Measure AI Model Performance Like a Pro

David Childs

Master AI model evaluation with comprehensive metrics, testing strategies, and validation frameworks for building reliable and robust machine learning systems.

Evaluating AI model performance is crucial for building reliable, robust, and trustworthy machine learning systems. Proper evaluation goes beyond simple accuracy metrics to encompass fairness, robustness, interpretability, and real-world performance. This comprehensive guide covers evaluation frameworks, metrics, testing strategies, and production monitoring approaches.

Comprehensive Evaluation Framework

AI model evaluation should be systematic, multi-dimensional, and aligned with business objectives. A comprehensive evaluation framework considers technical performance, business impact, ethical considerations, and operational requirements.

Core Evaluation Components

# evaluation_framework.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Union, Callable, Tuple
from enum import Enum
from datetime import datetime
import numpy as np
import pandas as pd
import json
import logging
from collections import defaultdict

class EvaluationDimension(Enum):
    PERFORMANCE = "performance"
    ROBUSTNESS = "robustness"
    FAIRNESS = "fairness"
    INTERPRETABILITY = "interpretability"
    EFFICIENCY = "efficiency"
    SAFETY = "safety"
    BUSINESS_IMPACT = "business_impact"

class ModelType(Enum):
    CLASSIFICATION = "classification"
    REGRESSION = "regression"
    RANKING = "ranking"
    GENERATION = "generation"
    RECOMMENDATION = "recommendation"
    TIME_SERIES = "time_series"

@dataclass
class EvaluationMetric:
    name: str
    description: str
    higher_is_better: bool
    score_range: Tuple[float, float]
    threshold: Optional[float] = None
    weight: float = 1.0

@dataclass
class EvaluationResult:
    metric_name: str
    score: float
    confidence_interval: Optional[Tuple[float, float]] = None
    p_value: Optional[float] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class EvaluationReport:
    model_id: str
    model_version: str
    dataset_info: Dict[str, Any]
    evaluation_config: Dict[str, Any]
    results: Dict[str, EvaluationResult]
    overall_score: float
    recommendations: List[str]
    timestamp: datetime = field(default_factory=datetime.now)

class MetricCalculator(ABC):
    """Abstract base class for metric calculations"""
    
    @abstractmethod
    def calculate(self, 
                 y_true: np.ndarray, 
                 y_pred: np.ndarray, 
                 **kwargs) -> EvaluationResult:
        """Calculate metric value"""
        pass
    
    @abstractmethod
    def get_metric_info(self) -> EvaluationMetric:
        """Get metric information"""
        pass

class ModelEvaluator:
    """Comprehensive model evaluator"""
    
    def __init__(self, 
                 model_type: ModelType,
                 evaluation_config: Dict[str, Any] = None):
        
        self.model_type = model_type
        self.config = evaluation_config or {}
        
        # Initialize metric calculators
        self.metric_calculators: Dict[str, MetricCalculator] = {}
        self._initialize_default_metrics()
        
        # Evaluation history
        self.evaluation_history: List[EvaluationReport] = []
        
        self.logger = logging.getLogger(__name__)
    
    def _initialize_default_metrics(self):
        """Initialize default metrics based on model type"""
        
        if self.model_type == ModelType.CLASSIFICATION:
            self.add_metric("accuracy", AccuracyCalculator())
            self.add_metric("precision", PrecisionCalculator())
            self.add_metric("recall", RecallCalculator())
            self.add_metric("f1_score", F1ScoreCalculator())
            self.add_metric("roc_auc", ROCAUCCalculator())
            self.add_metric("pr_auc", PRAUCCalculator())
            
        elif self.model_type == ModelType.REGRESSION:
            self.add_metric("mse", MSECalculator())
            self.add_metric("rmse", RMSECalculator())
            self.add_metric("mae", MAECalculator())
            self.add_metric("r2_score", R2ScoreCalculator())
            self.add_metric("mape", MAPECalculator())
            
        elif self.model_type == ModelType.RANKING:
            self.add_metric("ndcg", NDCGCalculator())
            self.add_metric("map", MAPCalculator())
            self.add_metric("mrr", MRRCalculator())
            
        elif self.model_type == ModelType.GENERATION:
            self.add_metric("bleu", BLEUCalculator())
            self.add_metric("rouge", ROUGECalculator())
            self.add_metric("meteor", METEORCalculator())
            
        # Add dimension-specific metrics
        self.add_metric("calibration", CalibrationCalculator())
        self.add_metric("robustness", RobustnessCalculator())
    
    def add_metric(self, name: str, calculator: MetricCalculator):
        """Add custom metric calculator"""
        self.metric_calculators[name] = calculator
    
    def evaluate(self, 
                y_true: np.ndarray,
                y_pred: np.ndarray,
                model_id: str,
                model_version: str,
                dataset_info: Dict[str, Any] = None,
                additional_data: Dict[str, Any] = None) -> EvaluationReport:
        """Comprehensive model evaluation"""
        
        self.logger.info(f"Starting evaluation for model {model_id}:{model_version}")
        
        # Calculate all metrics
        results = {}
        
        for metric_name, calculator in self.metric_calculators.items():
            try:
                if additional_data and metric_name in ["calibration", "robustness"]:
                    # Some metrics need additional data
                    result = calculator.calculate(
                        y_true, y_pred, 
                        **additional_data.get(metric_name, {})
                    )
                else:
                    result = calculator.calculate(y_true, y_pred)
                
                results[metric_name] = result
                self.logger.debug(f"Calculated {metric_name}: {result.score:.4f}")
                
            except Exception as e:
                self.logger.error(f"Failed to calculate {metric_name}: {e}")
                results[metric_name] = EvaluationResult(
                    metric_name=metric_name,
                    score=0.0,
                    metadata={"error": str(e)}
                )
        
        # Calculate overall score
        overall_score = self._calculate_overall_score(results)
        
        # Generate recommendations
        recommendations = self._generate_recommendations(results)
        
        # Create evaluation report
        report = EvaluationReport(
            model_id=model_id,
            model_version=model_version,
            dataset_info=dataset_info or {},
            evaluation_config=self.config,
            results=results,
            overall_score=overall_score,
            recommendations=recommendations
        )
        
        self.evaluation_history.append(report)
        
        self.logger.info(f"Evaluation completed. Overall score: {overall_score:.4f}")
        
        return report
    
    def _calculate_overall_score(self, results: Dict[str, EvaluationResult]) -> float:
        """Calculate weighted overall score"""
        
        total_weight = 0.0
        weighted_sum = 0.0
        
        for metric_name, result in results.items():
            if metric_name in self.metric_calculators:
                metric_info = self.metric_calculators[metric_name].get_metric_info()
                
                # Normalize score to 0-1 range
                score_min, score_max = metric_info.score_range
                normalized_score = (result.score - score_min) / (score_max - score_min)
                
                # Adjust for higher_is_better
                if not metric_info.higher_is_better:
                    normalized_score = 1.0 - normalized_score
                
                weighted_sum += normalized_score * metric_info.weight
                total_weight += metric_info.weight
        
        return weighted_sum / total_weight if total_weight > 0 else 0.0
    
    def _generate_recommendations(self, results: Dict[str, EvaluationResult]) -> List[str]:
        """Generate actionable recommendations based on results"""
        
        recommendations = []
        
        for metric_name, result in results.items():
            if metric_name in self.metric_calculators:
                metric_info = self.metric_calculators[metric_name].get_metric_info()
                
                # Check if metric is below threshold
                if metric_info.threshold is not None:
                    if metric_info.higher_is_better and result.score < metric_info.threshold:
                        recommendations.append(
                            f"Improve {metric_name}: current {result.score:.3f} < threshold {metric_info.threshold:.3f}"
                        )
                    elif not metric_info.higher_is_better and result.score > metric_info.threshold:
                        recommendations.append(
                            f"Reduce {metric_name}: current {result.score:.3f} > threshold {metric_info.threshold:.3f}"
                        )
        
        # Add general recommendations based on model type
        if self.model_type == ModelType.CLASSIFICATION:
            if "f1_score" in results and results["f1_score"].score < 0.7:
                recommendations.append("Consider feature engineering or model architecture changes to improve F1 score")
            
            if "roc_auc" in results and results["roc_auc"].score < 0.8:
                recommendations.append("Model discriminative power is low. Consider more complex models or additional features")
        
        elif self.model_type == ModelType.REGRESSION:
            if "r2_score" in results and results["r2_score"].score < 0.7:
                recommendations.append("Model explains low variance. Consider polynomial features or ensemble methods")
        
        return recommendations
    
    def compare_models(self, 
                      reports: List[EvaluationReport],
                      comparison_metrics: List[str] = None) -> Dict[str, Any]:
        """Compare multiple model evaluation reports"""
        
        if len(reports) < 2:
            raise ValueError("Need at least 2 reports for comparison")
        
        comparison_metrics = comparison_metrics or list(reports[0].results.keys())
        
        comparison = {
            "models": [f"{r.model_id}:{r.model_version}" for r in reports],
            "metrics_comparison": {},
            "best_model_per_metric": {},
            "overall_ranking": []
        }
        
        # Compare each metric
        for metric_name in comparison_metrics:
            metric_scores = []
            
            for report in reports:
                if metric_name in report.results:
                    metric_scores.append(report.results[metric_name].score)
                else:
                    metric_scores.append(None)
            
            comparison["metrics_comparison"][metric_name] = metric_scores
            
            # Find best model for this metric
            if all(score is not None for score in metric_scores):
                metric_info = self.metric_calculators[metric_name].get_metric_info()
                
                if metric_info.higher_is_better:
                    best_idx = np.argmax(metric_scores)
                else:
                    best_idx = np.argmin(metric_scores)
                
                comparison["best_model_per_metric"][metric_name] = {
                    "model": comparison["models"][best_idx],
                    "score": metric_scores[best_idx]
                }
        
        # Overall ranking by overall score
        overall_scores = [report.overall_score for report in reports]
        ranking_indices = np.argsort(overall_scores)[::-1]  # Descending order
        
        comparison["overall_ranking"] = [
            {
                "rank": i + 1,
                "model": comparison["models"][idx],
                "overall_score": overall_scores[idx]
            }
            for i, idx in enumerate(ranking_indices)
        ]
        
        return comparison
    
    def statistical_significance_test(self, 
                                    results1: EvaluationResult,
                                    results2: EvaluationResult,
                                    test_type: str = "paired_t_test") -> Dict[str, Any]:
        """Test statistical significance between two results"""
        
        # This would implement various statistical tests
        # For now, return a placeholder
        
        return {
            "test_type": test_type,
            "p_value": 0.05,
            "significant": True,
            "effect_size": 0.2
        }

Advanced Metric Implementations

# advanced_metrics.py
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, mean_squared_error,
    mean_absolute_error, r2_score, log_loss, confusion_matrix
)
from sklearn.calibration import calibration_curve
from sklearn.model_selection import bootstrap_resample
import scipy.stats as stats
from typing import List, Tuple, Optional, Dict, Any

class AccuracyCalculator(MetricCalculator):
    """Calculate accuracy with confidence intervals"""
    
    def calculate(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> EvaluationResult:
        score = accuracy_score(y_true, y_pred)
        
        # Bootstrap confidence interval
        ci = self._bootstrap_confidence_interval(y_true, y_pred, accuracy_score)
        
        return EvaluationResult(
            metric_name="accuracy",
            score=score,
            confidence_interval=ci,
            metadata={"n_samples": len(y_true)}
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="accuracy",
            description="Fraction of predictions that match binary or multiclass labels",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.8,
            weight=1.0
        )
    
    def _bootstrap_confidence_interval(self, 
                                     y_true: np.ndarray, 
                                     y_pred: np.ndarray,
                                     metric_func: Callable,
                                     n_bootstrap: int = 1000,
                                     confidence_level: float = 0.95) -> Tuple[float, float]:
        """Calculate bootstrap confidence interval"""
        
        bootstrap_scores = []
        n_samples = len(y_true)
        
        for _ in range(n_bootstrap):
            # Bootstrap sample
            indices = np.random.choice(n_samples, size=n_samples, replace=True)
            y_true_boot = y_true[indices]
            y_pred_boot = y_pred[indices]
            
            # Calculate metric
            score = metric_func(y_true_boot, y_pred_boot)
            bootstrap_scores.append(score)
        
        # Calculate confidence interval
        alpha = 1 - confidence_level
        lower_percentile = (alpha / 2) * 100
        upper_percentile = (1 - alpha / 2) * 100
        
        ci_lower = np.percentile(bootstrap_scores, lower_percentile)
        ci_upper = np.percentile(bootstrap_scores, upper_percentile)
        
        return (ci_lower, ci_upper)

class ROCAUCCalculator(MetricCalculator):
    """Calculate ROC-AUC with additional analysis"""
    
    def calculate(self, y_true: np.ndarray, y_pred: np.ndarray, **kwargs) -> EvaluationResult:
        # Handle multiclass case
        if len(np.unique(y_true)) > 2:
            score = roc_auc_score(y_true, y_pred, multi_class='ovr')
            multiclass = True
        else:
            score = roc_auc_score(y_true, y_pred)
            multiclass = False
        
        # Calculate DeLong confidence interval for binary case
        ci = None
        p_value = None
        
        if not multiclass:
            ci, p_value = self._delong_confidence_interval(y_true, y_pred)
        
        metadata = {
            "multiclass": multiclass,
            "n_positive": int(np.sum(y_true)),
            "n_negative": int(len(y_true) - np.sum(y_true))
        }
        
        return EvaluationResult(
            metric_name="roc_auc",
            score=score,
            confidence_interval=ci,
            p_value=p_value,
            metadata=metadata
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="roc_auc",
            description="Area under the Receiver Operating Characteristic curve",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.8,
            weight=1.5
        )
    
    def _delong_confidence_interval(self, 
                                  y_true: np.ndarray, 
                                  y_pred: np.ndarray,
                                  confidence_level: float = 0.95) -> Tuple[Tuple[float, float], float]:
        """Calculate DeLong confidence interval for AUC"""
        
        # Simplified DeLong method
        auc = roc_auc_score(y_true, y_pred)
        
        # Calculate variance using DeLong method (simplified)
        n_pos = np.sum(y_true == 1)
        n_neg = np.sum(y_true == 0)
        
        # This is a simplified version - full DeLong requires more complex calculations
        se = np.sqrt((auc * (1 - auc) + (n_pos - 1) * (auc / (2 - auc) - auc**2) + 
                     (n_neg - 1) * (2 * auc**2 / (1 + auc) - auc**2)) / (n_pos * n_neg))
        
        # Confidence interval
        z = stats.norm.ppf(1 - (1 - confidence_level) / 2)
        ci_lower = max(0, auc - z * se)
        ci_upper = min(1, auc + z * se)
        
        # P-value for H0: AUC = 0.5
        z_score = (auc - 0.5) / se
        p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
        
        return (ci_lower, ci_upper), p_value

class CalibrationCalculator(MetricCalculator):
    """Calculate model calibration metrics"""
    
    def calculate(self, y_true: np.ndarray, y_pred_proba: np.ndarray, **kwargs) -> EvaluationResult:
        
        # Brier score
        brier_score = np.mean((y_pred_proba - y_true) ** 2)
        
        # Calibration curve analysis
        fraction_of_positives, mean_predicted_value = calibration_curve(
            y_true, y_pred_proba, n_bins=10
        )
        
        # Expected Calibration Error (ECE)
        ece = self._calculate_ece(y_true, y_pred_proba)
        
        # Maximum Calibration Error (MCE)
        mce = self._calculate_mce(y_true, y_pred_proba)
        
        # Reliability (how close predicted probabilities are to actual frequencies)
        reliability = np.mean(np.abs(fraction_of_positives - mean_predicted_value))
        
        metadata = {
            "brier_score": brier_score,
            "ece": ece,
            "mce": mce,
            "reliability": reliability,
            "calibration_curve": {
                "fraction_of_positives": fraction_of_positives.tolist(),
                "mean_predicted_value": mean_predicted_value.tolist()
            }
        }
        
        # Overall calibration score (lower is better, so we invert)
        calibration_score = 1.0 - ece
        
        return EvaluationResult(
            metric_name="calibration",
            score=calibration_score,
            metadata=metadata
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="calibration",
            description="How well predicted probabilities match actual frequencies",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.9,
            weight=1.2
        )
    
    def _calculate_ece(self, y_true: np.ndarray, y_pred_proba: np.ndarray, n_bins: int = 10) -> float:
        """Calculate Expected Calibration Error"""
        
        bin_boundaries = np.linspace(0, 1, n_bins + 1)
        bin_lowers = bin_boundaries[:-1]
        bin_uppers = bin_boundaries[1:]
        
        ece = 0
        for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
            # Find predictions in this bin
            in_bin = (y_pred_proba > bin_lower) & (y_pred_proba <= bin_upper)
            prop_in_bin = in_bin.mean()
            
            if prop_in_bin > 0:
                accuracy_in_bin = y_true[in_bin].mean()
                avg_confidence_in_bin = y_pred_proba[in_bin].mean()
                ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin
        
        return ece
    
    def _calculate_mce(self, y_true: np.ndarray, y_pred_proba: np.ndarray, n_bins: int = 10) -> float:
        """Calculate Maximum Calibration Error"""
        
        bin_boundaries = np.linspace(0, 1, n_bins + 1)
        bin_lowers = bin_boundaries[:-1]
        bin_uppers = bin_boundaries[1:]
        
        calibration_errors = []
        for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
            in_bin = (y_pred_proba > bin_lower) & (y_pred_proba <= bin_upper)
            
            if in_bin.sum() > 0:
                accuracy_in_bin = y_true[in_bin].mean()
                avg_confidence_in_bin = y_pred_proba[in_bin].mean()
                calibration_errors.append(abs(avg_confidence_in_bin - accuracy_in_bin))
        
        return max(calibration_errors) if calibration_errors else 0.0

class RobustnessCalculator(MetricCalculator):
    """Calculate model robustness metrics"""
    
    def calculate(self, 
                 y_true: np.ndarray, 
                 y_pred: np.ndarray,
                 model=None,
                 X_test: np.ndarray = None,
                 **kwargs) -> EvaluationResult:
        
        robustness_scores = {}
        
        # Prediction stability (if multiple predictions available)
        if "prediction_variance" in kwargs:
            robustness_scores["prediction_stability"] = 1.0 - kwargs["prediction_variance"]
        
        # Adversarial robustness (simplified)
        if model is not None and X_test is not None:
            adv_robustness = self._estimate_adversarial_robustness(model, X_test, y_true)
            robustness_scores["adversarial_robustness"] = adv_robustness
        
        # Input perturbation robustness
        if "perturbation_scores" in kwargs:
            robustness_scores["perturbation_robustness"] = np.mean(kwargs["perturbation_scores"])
        
        # Overall robustness score
        overall_robustness = np.mean(list(robustness_scores.values())) if robustness_scores else 0.5
        
        return EvaluationResult(
            metric_name="robustness",
            score=overall_robustness,
            metadata=robustness_scores
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="robustness",
            description="Model resistance to input perturbations and adversarial attacks",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.8,
            weight=1.3
        )
    
    def _estimate_adversarial_robustness(self, 
                                       model, 
                                       X_test: np.ndarray, 
                                       y_true: np.ndarray,
                                       epsilon: float = 0.01) -> float:
        """Estimate adversarial robustness using simple perturbations"""
        
        # Add small random perturbations
        noise = np.random.normal(0, epsilon, X_test.shape)
        X_perturbed = X_test + noise
        
        # Get predictions on perturbed data
        try:
            if hasattr(model, 'predict'):
                y_pred_perturbed = model.predict(X_perturbed)
                y_pred_original = model.predict(X_test)
                
                # Calculate prediction consistency
                if len(np.unique(y_true)) == 2:  # Binary classification
                    consistency = np.mean(y_pred_perturbed == y_pred_original)
                else:
                    # For regression or multi-class, use correlation
                    consistency = np.corrcoef(y_pred_original, y_pred_perturbed)[0, 1]
                    consistency = max(0, consistency)  # Ensure non-negative
                
                return consistency
            else:
                return 0.5  # Default if can't evaluate
        
        except Exception:
            return 0.5  # Default on error

class FairnessCalculator(MetricCalculator):
    """Calculate fairness metrics"""
    
    def calculate(self, 
                 y_true: np.ndarray, 
                 y_pred: np.ndarray,
                 sensitive_features: np.ndarray = None,
                 **kwargs) -> EvaluationResult:
        
        if sensitive_features is None:
            return EvaluationResult(
                metric_name="fairness",
                score=1.0,  # Assume fair if no sensitive features provided
                metadata={"note": "No sensitive features provided"}
            )
        
        fairness_metrics = {}
        
        # Demographic parity
        demographic_parity = self._calculate_demographic_parity(y_pred, sensitive_features)
        fairness_metrics["demographic_parity"] = demographic_parity
        
        # Equalized odds
        equalized_odds = self._calculate_equalized_odds(y_true, y_pred, sensitive_features)
        fairness_metrics["equalized_odds"] = equalized_odds
        
        # Equal opportunity
        equal_opportunity = self._calculate_equal_opportunity(y_true, y_pred, sensitive_features)
        fairness_metrics["equal_opportunity"] = equal_opportunity
        
        # Overall fairness score (higher is more fair)
        overall_fairness = np.mean(list(fairness_metrics.values()))
        
        return EvaluationResult(
            metric_name="fairness",
            score=overall_fairness,
            metadata=fairness_metrics
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="fairness",
            description="Model fairness across different demographic groups",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.8,
            weight=1.5
        )
    
    def _calculate_demographic_parity(self, y_pred: np.ndarray, sensitive_features: np.ndarray) -> float:
        """Calculate demographic parity violation"""
        
        groups = np.unique(sensitive_features)
        positive_rates = []
        
        for group in groups:
            group_mask = sensitive_features == group
            positive_rate = np.mean(y_pred[group_mask])
            positive_rates.append(positive_rate)
        
        # Calculate maximum difference in positive rates
        max_diff = np.max(positive_rates) - np.min(positive_rates)
        
        # Convert to fairness score (1 = perfectly fair, 0 = maximally unfair)
        return 1.0 - max_diff
    
    def _calculate_equalized_odds(self, 
                                y_true: np.ndarray, 
                                y_pred: np.ndarray, 
                                sensitive_features: np.ndarray) -> float:
        """Calculate equalized odds violation"""
        
        groups = np.unique(sensitive_features)
        tpr_differences = []
        fpr_differences = []
        
        tprs = []
        fprs = []
        
        for group in groups:
            group_mask = sensitive_features == group
            y_true_group = y_true[group_mask]
            y_pred_group = y_pred[group_mask]
            
            # True Positive Rate
            tp = np.sum((y_true_group == 1) & (y_pred_group == 1))
            fn = np.sum((y_true_group == 1) & (y_pred_group == 0))
            tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
            tprs.append(tpr)
            
            # False Positive Rate
            fp = np.sum((y_true_group == 0) & (y_pred_group == 1))
            tn = np.sum((y_true_group == 0) & (y_pred_group == 0))
            fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
            fprs.append(fpr)
        
        # Maximum differences
        max_tpr_diff = np.max(tprs) - np.min(tprs)
        max_fpr_diff = np.max(fprs) - np.min(fprs)
        
        # Average fairness across both rates
        return 1.0 - (max_tpr_diff + max_fpr_diff) / 2
    
    def _calculate_equal_opportunity(self, 
                                   y_true: np.ndarray, 
                                   y_pred: np.ndarray, 
                                   sensitive_features: np.ndarray) -> float:
        """Calculate equal opportunity violation"""
        
        groups = np.unique(sensitive_features)
        tprs = []
        
        for group in groups:
            group_mask = sensitive_features == group
            y_true_group = y_true[group_mask]
            y_pred_group = y_pred[group_mask]
            
            # True Positive Rate for positive class only
            tp = np.sum((y_true_group == 1) & (y_pred_group == 1))
            fn = np.sum((y_true_group == 1) & (y_pred_group == 0))
            tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
            tprs.append(tpr)
        
        # Maximum difference in TPRs
        max_tpr_diff = np.max(tprs) - np.min(tprs)
        
        return 1.0 - max_tpr_diff

# Additional specialized metrics for different model types
class BLEUCalculator(MetricCalculator):
    """BLEU score for text generation"""
    
    def calculate(self, y_true: List[str], y_pred: List[str], **kwargs) -> EvaluationResult:
        # Simplified BLEU calculation
        # In practice, use nltk.translate.bleu_score or similar
        
        total_score = 0.0
        n_samples = len(y_true)
        
        for true_text, pred_text in zip(y_true, y_pred):
            # Simplified word-level BLEU
            true_words = set(true_text.lower().split())
            pred_words = set(pred_text.lower().split())
            
            if len(pred_words) > 0:
                precision = len(true_words & pred_words) / len(pred_words)
                total_score += precision
        
        average_bleu = total_score / n_samples if n_samples > 0 else 0.0
        
        return EvaluationResult(
            metric_name="bleu",
            score=average_bleu,
            metadata={"n_samples": n_samples}
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="bleu",
            description="BLEU score for text generation quality",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.3,
            weight=1.0
        )

class NDCGCalculator(MetricCalculator):
    """Normalized Discounted Cumulative Gain for ranking"""
    
    def calculate(self, 
                 y_true: np.ndarray, 
                 y_scores: np.ndarray, 
                 k: int = 10,
                 **kwargs) -> EvaluationResult:
        
        # Calculate NDCG@k
        ndcg_score = self._calculate_ndcg(y_true, y_scores, k)
        
        return EvaluationResult(
            metric_name="ndcg",
            score=ndcg_score,
            metadata={"k": k}
        )
    
    def get_metric_info(self) -> EvaluationMetric:
        return EvaluationMetric(
            name="ndcg",
            description="Normalized Discounted Cumulative Gain for ranking quality",
            higher_is_better=True,
            score_range=(0.0, 1.0),
            threshold=0.7,
            weight=1.0
        )
    
    def _calculate_ndcg(self, y_true: np.ndarray, y_scores: np.ndarray, k: int) -> float:
        """Calculate NDCG@k"""
        
        # Sort by predicted scores
        order = np.argsort(y_scores)[::-1]
        y_true_sorted = y_true[order]
        
        # Calculate DCG@k
        dcg = 0.0
        for i in range(min(k, len(y_true_sorted))):
            rel = y_true_sorted[i]
            dcg += (2**rel - 1) / np.log2(i + 2)
        
        # Calculate IDCG@k (ideal DCG)
        y_true_ideal = np.sort(y_true)[::-1]
        idcg = 0.0
        for i in range(min(k, len(y_true_ideal))):
            rel = y_true_ideal[i]
            idcg += (2**rel - 1) / np.log2(i + 2)
        
        return dcg / idcg if idcg > 0 else 0.0

Automated Testing Framework

# automated_testing.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Callable, Tuple
from dataclasses import dataclass
from datetime import datetime
import json
import logging
from abc import ABC, abstractmethod

class TestCase(ABC):
    """Abstract base class for test cases"""
    
    @abstractmethod
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run the test case"""
        pass
    
    @abstractmethod
    def get_test_info(self) -> Dict[str, Any]:
        """Get test case information"""
        pass

class ModelTestSuite:
    """Comprehensive model testing suite"""
    
    def __init__(self, model_type: str):
        self.model_type = model_type
        self.test_cases: List[TestCase] = []
        self.test_results: List[Dict[str, Any]] = []
        
        # Initialize default test cases
        self._initialize_default_tests()
        
        self.logger = logging.getLogger(__name__)
    
    def _initialize_default_tests(self):
        """Initialize default test cases"""
        
        # Core functionality tests
        self.add_test_case(DataShapeTest())
        self.add_test_case(PredictionRangeTest())
        self.add_test_case(DeterminismTest())
        
        # Robustness tests
        self.add_test_case(NoiseRobustnessTest())
        self.add_test_case(OutlierRobustnessTest())
        self.add_test_case(MissingValueTest())
        
        # Performance tests
        self.add_test_case(LatencyTest())
        self.add_test_case(MemoryUsageTest())
        self.add_test_case(ThroughputTest())
        
        # Fairness tests
        self.add_test_case(BiasDetectionTest())
        
        # Edge case tests
        self.add_test_case(EmptyInputTest())
        self.add_test_case(ExtremeValueTest())
    
    def add_test_case(self, test_case: TestCase):
        """Add a test case to the suite"""
        self.test_cases.append(test_case)
    
    async def run_all_tests(self, 
                          model,
                          test_data: Dict[str, Any],
                          parallel: bool = True) -> Dict[str, Any]:
        """Run all test cases"""
        
        self.logger.info(f"Running {len(self.test_cases)} test cases")
        
        start_time = datetime.now()
        
        if parallel:
            # Run tests in parallel
            tasks = [
                self._run_single_test_async(test_case, model, test_data)
                for test_case in self.test_cases
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
        else:
            # Run tests sequentially
            results = []
            for test_case in self.test_cases:
                result = await self._run_single_test_async(test_case, model, test_data)
                results.append(result)
        
        end_time = datetime.now()
        
        # Process results
        test_results = []
        passed_tests = 0
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                test_result = {
                    "test_name": self.test_cases[i].__class__.__name__,
                    "status": "error",
                    "error": str(result),
                    "timestamp": datetime.now()
                }
            else:
                test_result = result
                if result.get("passed", False):
                    passed_tests += 1
            
            test_results.append(test_result)
        
        # Generate summary
        summary = {
            "total_tests": len(self.test_cases),
            "passed_tests": passed_tests,
            "failed_tests": len(self.test_cases) - passed_tests,
            "pass_rate": passed_tests / len(self.test_cases),
            "execution_time": (end_time - start_time).total_seconds(),
            "timestamp": start_time,
            "test_results": test_results
        }
        
        self.test_results.append(summary)
        
        self.logger.info(
            f"Test suite completed: {passed_tests}/{len(self.test_cases)} tests passed "
            f"({summary['pass_rate']:.1%} pass rate)"
        )
        
        return summary
    
    async def _run_single_test_async(self, 
                                   test_case: TestCase, 
                                   model, 
                                   test_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run a single test case asynchronously"""
        
        loop = asyncio.get_event_loop()
        
        try:
            result = await loop.run_in_executor(
                None, test_case.run_test, model, test_data
            )
            return result
        
        except Exception as e:
            return {
                "test_name": test_case.__class__.__name__,
                "status": "error",
                "error": str(e),
                "timestamp": datetime.now()
            }

# Specific test case implementations
class DataShapeTest(TestCase):
    """Test that model handles input data shapes correctly"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        
        try:
            # Test with original data
            predictions = model.predict(X_test)
            
            # Verify output shape
            expected_shape = (len(X_test),) if hasattr(model, 'predict') else None
            actual_shape = predictions.shape
            
            shape_correct = (
                expected_shape is None or 
                actual_shape[0] == expected_shape[0]
            )
            
            return {
                "test_name": "data_shape_test",
                "passed": shape_correct,
                "details": {
                    "input_shape": X_test.shape,
                    "output_shape": actual_shape,
                    "expected_output_samples": expected_shape[0] if expected_shape else None
                },
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "data_shape_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Data Shape Test",
            "description": "Verify model handles input data shapes correctly",
            "category": "functionality"
        }

class DeterminismTest(TestCase):
    """Test model determinism"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        
        try:
            # Make predictions twice
            predictions1 = model.predict(X_test[:100])  # Use subset for speed
            predictions2 = model.predict(X_test[:100])
            
            # Check if predictions are identical
            if hasattr(predictions1, 'shape'):
                deterministic = np.allclose(predictions1, predictions2, rtol=1e-10)
            else:
                deterministic = predictions1 == predictions2
            
            return {
                "test_name": "determinism_test",
                "passed": deterministic,
                "details": {
                    "max_difference": float(np.max(np.abs(predictions1 - predictions2))) if hasattr(predictions1, 'shape') else None,
                    "samples_tested": len(predictions1)
                },
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "determinism_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Determinism Test",
            "description": "Verify model produces consistent predictions",
            "category": "functionality"
        }

class NoiseRobustnessTest(TestCase):
    """Test model robustness to input noise"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        
        try:
            # Original predictions
            original_predictions = model.predict(X_test[:100])
            
            # Add noise and predict
            noise_levels = [0.01, 0.05, 0.1]
            robustness_scores = []
            
            for noise_level in noise_levels:
                noise = np.random.normal(0, noise_level, X_test[:100].shape)
                X_noisy = X_test[:100] + noise
                
                noisy_predictions = model.predict(X_noisy)
                
                # Calculate similarity
                if len(np.unique(original_predictions)) <= 10:  # Classification
                    similarity = np.mean(original_predictions == noisy_predictions)
                else:  # Regression
                    similarity = np.corrcoef(original_predictions, noisy_predictions)[0, 1]
                    similarity = max(0, similarity)  # Ensure non-negative
                
                robustness_scores.append(similarity)
            
            # Overall robustness score
            avg_robustness = np.mean(robustness_scores)
            passed = avg_robustness > 0.8  # 80% threshold
            
            return {
                "test_name": "noise_robustness_test",
                "passed": passed,
                "details": {
                    "noise_levels": noise_levels,
                    "robustness_scores": robustness_scores,
                    "average_robustness": avg_robustness
                },
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "noise_robustness_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Noise Robustness Test",
            "description": "Test model resistance to input noise",
            "category": "robustness"
        }

class LatencyTest(TestCase):
    """Test model prediction latency"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        
        try:
            import time
            
            # Warm up
            model.predict(X_test[:10])
            
            # Test single prediction latency
            single_latencies = []
            for i in range(10):
                start_time = time.time()
                model.predict(X_test[i:i+1])
                latency = (time.time() - start_time) * 1000  # ms
                single_latencies.append(latency)
            
            # Test batch prediction latency
            start_time = time.time()
            model.predict(X_test[:100])
            batch_latency = (time.time() - start_time) * 1000  # ms
            
            avg_single_latency = np.mean(single_latencies)
            avg_batch_per_sample = batch_latency / 100
            
            # Pass if latencies are reasonable (< 100ms per sample)
            passed = avg_single_latency < 100 and avg_batch_per_sample < 100
            
            return {
                "test_name": "latency_test",
                "passed": passed,
                "details": {
                    "avg_single_latency_ms": avg_single_latency,
                    "batch_latency_ms": batch_latency,
                    "avg_batch_per_sample_ms": avg_batch_per_sample,
                    "single_latency_std": np.std(single_latencies)
                },
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "latency_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Latency Test",
            "description": "Measure model prediction latency",
            "category": "performance"
        }

class BiasDetectionTest(TestCase):
    """Test for potential bias in model predictions"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        y_test = test_data.get('y_test')
        sensitive_features = test_data.get('sensitive_features')
        
        if sensitive_features is None:
            return {
                "test_name": "bias_detection_test",
                "passed": True,  # Pass if no sensitive features provided
                "details": {"note": "No sensitive features provided for bias testing"},
                "timestamp": datetime.now()
            }
        
        try:
            predictions = model.predict(X_test)
            
            # Calculate fairness metrics
            fairness_calculator = FairnessCalculator()
            fairness_result = fairness_calculator.calculate(
                y_test, predictions, sensitive_features
            )
            
            # Pass if fairness score is above threshold
            passed = fairness_result.score > 0.8
            
            return {
                "test_name": "bias_detection_test",
                "passed": passed,
                "details": {
                    "fairness_score": fairness_result.score,
                    "fairness_breakdown": fairness_result.metadata
                },
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "bias_detection_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Bias Detection Test",
            "description": "Test for potential bias in model predictions",
            "category": "fairness"
        }

class ExtremeValueTest(TestCase):
    """Test model behavior with extreme input values"""
    
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        X_test = test_data['X_test']
        
        try:
            # Create extreme value test cases
            X_extreme = X_test[:10].copy()
            
            # Test with very large values
            X_large = X_extreme * 1000
            
            # Test with very small values
            X_small = X_extreme * 0.001
            
            # Test with zeros
            X_zeros = np.zeros_like(X_extreme)
            
            test_cases = [
                ("large_values", X_large),
                ("small_values", X_small),
                ("zero_values", X_zeros)
            ]
            
            results = {}
            all_passed = True
            
            for test_name, X_test_case in test_cases:
                try:
                    predictions = model.predict(X_test_case)
                    
                    # Check if predictions are valid (not NaN, not infinite)
                    valid_predictions = (
                        not np.any(np.isnan(predictions)) and
                        not np.any(np.isinf(predictions))
                    )
                    
                    results[test_name] = {
                        "passed": valid_predictions,
                        "prediction_stats": {
                            "mean": float(np.mean(predictions)),
                            "std": float(np.std(predictions)),
                            "min": float(np.min(predictions)),
                            "max": float(np.max(predictions))
                        }
                    }
                    
                    if not valid_predictions:
                        all_passed = False
                
                except Exception as e:
                    results[test_name] = {
                        "passed": False,
                        "error": str(e)
                    }
                    all_passed = False
            
            return {
                "test_name": "extreme_value_test",
                "passed": all_passed,
                "details": results,
                "timestamp": datetime.now()
            }
        
        except Exception as e:
            return {
                "test_name": "extreme_value_test",
                "passed": False,
                "error": str(e),
                "timestamp": datetime.now()
            }
    
    def get_test_info(self) -> Dict[str, Any]:
        return {
            "name": "Extreme Value Test",
            "description": "Test model behavior with extreme input values",
            "category": "robustness"
        }

# Additional placeholder test classes for completeness
class PredictionRangeTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing prediction ranges
        return {"test_name": "prediction_range_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Prediction Range Test", "description": "Test prediction value ranges", "category": "functionality"}

class OutlierRobustnessTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing outlier robustness
        return {"test_name": "outlier_robustness_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Outlier Robustness Test", "description": "Test robustness to outliers", "category": "robustness"}

class MissingValueTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing missing value handling
        return {"test_name": "missing_value_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Missing Value Test", "description": "Test handling of missing values", "category": "robustness"}

class MemoryUsageTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing memory usage
        return {"test_name": "memory_usage_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Memory Usage Test", "description": "Test memory consumption", "category": "performance"}

class ThroughputTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing throughput
        return {"test_name": "throughput_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Throughput Test", "description": "Test prediction throughput", "category": "performance"}

class EmptyInputTest(TestCase):
    def run_test(self, model, test_data: Dict[str, Any]) -> Dict[str, Any]:
        # Implementation for testing empty input handling
        return {"test_name": "empty_input_test", "passed": True, "timestamp": datetime.now()}
    
    def get_test_info(self) -> Dict[str, Any]:
        return {"name": "Empty Input Test", "description": "Test handling of empty inputs", "category": "edge_cases"}

Production Monitoring and Validation

# production_monitoring.py
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import logging
from collections import deque, defaultdict
import sqlite3

@dataclass
class ProductionMetrics:
    timestamp: datetime
    model_id: str
    model_version: str
    prediction_count: int
    avg_latency_ms: float
    error_rate: float
    accuracy: Optional[float] = None
    drift_score: Optional[float] = None
    fairness_score: Optional[float] = None

class ProductionValidator:
    """Production model validation and monitoring"""
    
    def __init__(self, 
                 db_path: str = "./production_monitoring.db",
                 alert_thresholds: Dict[str, float] = None):
        
        self.db_path = db_path
        self.alert_thresholds = alert_thresholds or {
            "error_rate": 0.05,
            "latency_ms": 1000,
            "accuracy_drop": 0.1,
            "drift_score": 0.1,
            "fairness_score": 0.8
        }
        
        # Initialize database
        self._init_database()
        
        # Real-time monitoring buffers
        self.recent_predictions = defaultdict(lambda: deque(maxlen=1000))
        self.performance_history = defaultdict(list)
        
        # Alert system
        self.active_alerts = []
        self.alert_callbacks = []
        
        self.logger = logging.getLogger(__name__)
    
    def _init_database(self):
        """Initialize monitoring database"""
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS production_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    model_id TEXT NOT NULL,
                    model_version TEXT NOT NULL,
                    prediction_count INTEGER,
                    avg_latency_ms REAL,
                    error_rate REAL,
                    accuracy REAL,
                    drift_score REAL,
                    fairness_score REAL
                )
            """)
            
            conn.execute("""
                CREATE TABLE IF NOT EXISTS alerts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    model_id TEXT NOT NULL,
                    alert_type TEXT NOT NULL,
                    severity TEXT NOT NULL,
                    message TEXT NOT NULL,
                    resolved BOOLEAN DEFAULT FALSE,
                    resolved_at TEXT
                )
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_metrics_model_time 
                ON production_metrics(model_id, timestamp)
            """)
            
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_alerts_model_time 
                ON alerts(model_id, timestamp)
            """)
    
    async def log_prediction(self, 
                           model_id: str, 
                           model_version: str,
                           prediction_data: Dict[str, Any]):
        """Log a single prediction for monitoring"""
        
        prediction_log = {
            "timestamp": datetime.now(),
            "model_id": model_id,
            "model_version": model_version,
            "latency_ms": prediction_data.get("latency_ms", 0),
            "success": prediction_data.get("success", True),
            "input_features": prediction_data.get("input_features"),
            "prediction": prediction_data.get("prediction"),
            "confidence": prediction_data.get("confidence")
        }
        
        self.recent_predictions[model_id].append(prediction_log)
        
        # Trigger periodic validation
        if len(self.recent_predictions[model_id]) % 100 == 0:
            asyncio.create_task(self._periodic_validation(model_id, model_version))
    
    async def _periodic_validation(self, model_id: str, model_version: str):
        """Perform periodic validation checks"""
        
        recent_logs = list(self.recent_predictions[model_id])
        
        if len(recent_logs) < 10:
            return
        
        # Calculate metrics
        metrics = await self._calculate_current_metrics(model_id, model_version, recent_logs)
        
        # Store metrics
        await self._store_metrics(metrics)
        
        # Check for alerts
        await self._check_alerts(metrics)
    
    async def _calculate_current_metrics(self, 
                                       model_id: str, 
                                       model_version: str,
                                       recent_logs: List[Dict[str, Any]]) -> ProductionMetrics:
        """Calculate current performance metrics"""
        
        if not recent_logs:
            return ProductionMetrics(
                timestamp=datetime.now(),
                model_id=model_id,
                model_version=model_version,
                prediction_count=0,
                avg_latency_ms=0.0,
                error_rate=0.0
            )
        
        # Basic metrics
        prediction_count = len(recent_logs)
        latencies = [log["latency_ms"] for log in recent_logs if log["latency_ms"] > 0]
        avg_latency_ms = np.mean(latencies) if latencies else 0.0
        
        # Error rate
        errors = [log for log in recent_logs if not log["success"]]
        error_rate = len(errors) / prediction_count
        
        # Accuracy (if ground truth available)
        accuracy = await self._calculate_accuracy(recent_logs)
        
        # Drift detection
        drift_score = await self._calculate_drift_score(model_id, recent_logs)
        
        # Fairness (if sensitive features available)
        fairness_score = await self._calculate_fairness_score(recent_logs)
        
        return ProductionMetrics(
            timestamp=datetime.now(),
            model_id=model_id,
            model_version=model_version,
            prediction_count=prediction_count,
            avg_latency_ms=avg_latency_ms,
            error_rate=error_rate,
            accuracy=accuracy,
            drift_score=drift_score,
            fairness_score=fairness_score
        )
    
    async def _calculate_accuracy(self, recent_logs: List[Dict[str, Any]]) -> Optional[float]:
        """Calculate accuracy if ground truth is available"""
        
        # This would require a mechanism to obtain ground truth labels
        # For now, return None indicating accuracy is not available
        return None
    
    async def _calculate_drift_score(self, 
                                   model_id: str, 
                                   recent_logs: List[Dict[str, Any]]) -> Optional[float]:
        """Calculate data drift score"""
        
        try:
            # Extract input features from recent predictions
            recent_features = []
            for log in recent_logs:
                if log["input_features"] is not None:
                    recent_features.append(log["input_features"])
            
            if len(recent_features) < 50:
                return None
            
            # Get reference features from historical data
            reference_features = await self._get_reference_features(model_id)
            
            if not reference_features:
                return None
            
            # Simple drift detection using feature means
            recent_means = np.mean(recent_features, axis=0)
            reference_means = np.mean(reference_features, axis=0)
            
            # Calculate relative change
            drift_score = np.mean(np.abs((recent_means - reference_means) / (reference_means + 1e-8)))
            
            return float(drift_score)
        
        except Exception as e:
            self.logger.error(f"Drift calculation failed: {e}")
            return None
    
    async def _calculate_fairness_score(self, recent_logs: List[Dict[str, Any]]) -> Optional[float]:
        """Calculate fairness score if sensitive features available"""
        
        # This would require sensitive feature information in the logs
        # For now, return None
        return None
    
    async def _get_reference_features(self, model_id: str) -> Optional[List[List[float]]]:
        """Get reference features for drift comparison"""
        
        # This would retrieve historical feature data
        # For now, return None
        return None
    
    async def _store_metrics(self, metrics: ProductionMetrics):
        """Store metrics in database"""
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO production_metrics 
                (timestamp, model_id, model_version, prediction_count, 
                 avg_latency_ms, error_rate, accuracy, drift_score, fairness_score)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                metrics.timestamp.isoformat(),
                metrics.model_id,
                metrics.model_version,
                metrics.prediction_count,
                metrics.avg_latency_ms,
                metrics.error_rate,
                metrics.accuracy,
                metrics.drift_score,
                metrics.fairness_score
            ))
    
    async def _check_alerts(self, metrics: ProductionMetrics):
        """Check for alert conditions"""
        
        alerts = []
        
        # Error rate alert
        if metrics.error_rate > self.alert_thresholds["error_rate"]:
            alerts.append({
                "type": "high_error_rate",
                "severity": "high",
                "message": f"Error rate {metrics.error_rate:.2%} exceeds threshold {self.alert_thresholds['error_rate']:.2%}",
                "value": metrics.error_rate,
                "threshold": self.alert_thresholds["error_rate"]
            })
        
        # Latency alert
        if metrics.avg_latency_ms > self.alert_thresholds["latency_ms"]:
            alerts.append({
                "type": "high_latency",
                "severity": "medium",
                "message": f"Average latency {metrics.avg_latency_ms:.1f}ms exceeds threshold {self.alert_thresholds['latency_ms']:.1f}ms",
                "value": metrics.avg_latency_ms,
                "threshold": self.alert_thresholds["latency_ms"]
            })
        
        # Drift alert
        if metrics.drift_score and metrics.drift_score > self.alert_thresholds["drift_score"]:
            alerts.append({
                "type": "data_drift",
                "severity": "medium",
                "message": f"Data drift score {metrics.drift_score:.3f} exceeds threshold {self.alert_thresholds['drift_score']:.3f}",
                "value": metrics.drift_score,
                "threshold": self.alert_thresholds["drift_score"]
            })
        
        # Fairness alert
        if metrics.fairness_score and metrics.fairness_score < self.alert_thresholds["fairness_score"]:
            alerts.append({
                "type": "fairness_violation",
                "severity": "high",
                "message": f"Fairness score {metrics.fairness_score:.3f} below threshold {self.alert_thresholds['fairness_score']:.3f}",
                "value": metrics.fairness_score,
                "threshold": self.alert_thresholds["fairness_score"]
            })
        
        # Store and notify alerts
        for alert in alerts:
            await self._store_alert(metrics.model_id, alert)
            await self._notify_alert(metrics.model_id, alert)
    
    async def _store_alert(self, model_id: str, alert: Dict[str, Any]):
        """Store alert in database"""
        
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO alerts 
                (timestamp, model_id, alert_type, severity, message)
                VALUES (?, ?, ?, ?, ?)
            """, (
                datetime.now().isoformat(),
                model_id,
                alert["type"],
                alert["severity"],
                alert["message"]
            ))
    
    async def _notify_alert(self, model_id: str, alert: Dict[str, Any]):
        """Notify alert to callbacks"""
        
        self.logger.warning(f"Alert for model {model_id}: {alert['message']}")
        
        # Notify registered callbacks
        for callback in self.alert_callbacks:
            try:
                await callback(model_id, alert)
            except Exception as e:
                self.logger.error(f"Alert callback failed: {e}")
    
    def add_alert_callback(self, callback: Callable):
        """Add alert notification callback"""
        self.alert_callbacks.append(callback)
    
    async def get_model_health(self, 
                             model_id: str, 
                             time_window_hours: int = 24) -> Dict[str, Any]:
        """Get comprehensive model health report"""
        
        start_time = datetime.now() - timedelta(hours=time_window_hours)
        
        with sqlite3.connect(self.db_path) as conn:
            # Get metrics
            cursor = conn.execute("""
                SELECT * FROM production_metrics 
                WHERE model_id = ? AND timestamp >= ?
                ORDER BY timestamp DESC
            """, (model_id, start_time.isoformat()))
            
            metrics_data = cursor.fetchall()
            
            # Get alerts
            cursor = conn.execute("""
                SELECT * FROM alerts 
                WHERE model_id = ? AND timestamp >= ?
                ORDER BY timestamp DESC
            """, (model_id, start_time.isoformat()))
            
            alerts_data = cursor.fetchall()
        
        if not metrics_data:
            return {
                "model_id": model_id,
                "status": "no_data",
                "message": f"No metrics found for the last {time_window_hours} hours"
            }
        
        # Calculate health metrics
        latest_metrics = metrics_data[0]
        
        # Health score calculation
        health_factors = []
        
        # Error rate factor
        error_rate = latest_metrics[6]  # error_rate column
        error_factor = max(0, 1 - error_rate / self.alert_thresholds["error_rate"])
        health_factors.append(error_factor)
        
        # Latency factor
        latency = latest_metrics[5]  # avg_latency_ms column
        latency_factor = max(0, 1 - latency / self.alert_thresholds["latency_ms"])
        health_factors.append(latency_factor)
        
        # Alert factor
        active_alerts = [alert for alert in alerts_data if not alert[7]]  # not resolved
        alert_factor = max(0, 1 - len(active_alerts) / 10)  # Penalize multiple alerts
        health_factors.append(alert_factor)
        
        overall_health = np.mean(health_factors)
        
        # Determine status
        if overall_health > 0.8:
            status = "healthy"
        elif overall_health > 0.6:
            status = "warning"
        else:
            status = "critical"
        
        return {
            "model_id": model_id,
            "status": status,
            "health_score": overall_health,
            "time_window_hours": time_window_hours,
            "latest_metrics": {
                "timestamp": latest_metrics[1],
                "prediction_count": latest_metrics[4],
                "avg_latency_ms": latest_metrics[5],
                "error_rate": latest_metrics[6],
                "accuracy": latest_metrics[7],
                "drift_score": latest_metrics[8],
                "fairness_score": latest_metrics[9]
            },
            "active_alerts": len(active_alerts),
            "total_predictions": sum(row[4] for row in metrics_data),
            "recommendations": self._generate_health_recommendations(overall_health, latest_metrics, active_alerts)
        }
    
    def _generate_health_recommendations(self, 
                                       health_score: float,
                                       latest_metrics: tuple,
                                       active_alerts: List[tuple]) -> List[str]:
        """Generate health improvement recommendations"""
        
        recommendations = []
        
        if health_score < 0.6:
            recommendations.append("Immediate attention required - multiple performance issues detected")
        
        error_rate = latest_metrics[6]
        if error_rate > self.alert_thresholds["error_rate"]:
            recommendations.append(f"High error rate ({error_rate:.2%}) - investigate model stability")
        
        latency = latest_metrics[5]
        if latency > self.alert_thresholds["latency_ms"]:
            recommendations.append(f"High latency ({latency:.1f}ms) - consider model optimization")
        
        if len(active_alerts) > 5:
            recommendations.append("Multiple active alerts - prioritize resolution")
        
        if not recommendations:
            recommendations.append("Model performing well - continue monitoring")
        
        return recommendations

# Example usage
async def demonstrate_production_monitoring():
    """Demonstrate production monitoring system"""
    
    # Initialize production validator
    validator = ProductionValidator(alert_thresholds={
        "error_rate": 0.05,
        "latency_ms": 500,
        "drift_score": 0.1
    })
    
    # Add alert callback
    async def alert_handler(model_id: str, alert: Dict[str, Any]):
        print(f"ALERT for {model_id}: {alert['message']}")
    
    validator.add_alert_callback(alert_handler)
    
    # Simulate predictions
    model_id = "sentiment_classifier_v1"
    model_version = "1.0.0"
    
    for i in range(200):
        # Simulate prediction data
        prediction_data = {
            "latency_ms": np.random.normal(100, 20),
            "success": np.random.random() > 0.02,  # 2% error rate
            "input_features": np.random.random(10).tolist(),
            "prediction": np.random.choice([0, 1]),
            "confidence": np.random.random()
        }
        
        await validator.log_prediction(model_id, model_version, prediction_data)
        
        # Add some delay
        await asyncio.sleep(0.01)
    
    # Get model health report
    health_report = await validator.get_model_health(model_id)
    print(f"Model Health Report:")
    print(json.dumps(health_report, indent=2, default=str))

if __name__ == "__main__":
    asyncio.run(demonstrate_production_monitoring())

Best Practices for AI Evaluation

Evaluation Strategy Design

  1. Multi-Dimensional Assessment: Evaluate across performance, robustness, fairness, and interpretability
  2. Business Alignment: Ensure metrics align with business objectives and user needs
  3. Continuous Evaluation: Implement ongoing evaluation in production environments
  4. Comparative Analysis: Always evaluate against baselines and alternative approaches

Metric Selection and Interpretation

  1. Domain-Appropriate Metrics: Choose metrics that are meaningful for your specific domain
  2. Multiple Metrics: Use multiple complementary metrics to get a complete picture
  3. Statistical Significance: Include confidence intervals and significance testing
  4. Practical Significance: Consider whether differences are practically meaningful

Production Monitoring

  1. Real-Time Monitoring: Monitor key metrics in real-time
  2. Automated Alerting: Set up automated alerts for performance degradation
  3. Drift Detection: Continuously monitor for data and concept drift
  4. Human-in-the-Loop: Include human validation in critical decisions

Testing and Validation

  1. Comprehensive Test Suites: Implement automated testing for multiple scenarios
  2. Edge Case Testing: Test thoroughly on edge cases and adversarial examples
  3. Cross-Validation: Use proper cross-validation techniques
  4. Hold-Out Testing: Maintain truly independent test sets

Conclusion

Comprehensive AI model evaluation is essential for building reliable, trustworthy, and effective machine learning systems. The frameworks, metrics, and practices covered in this guide provide a solid foundation for evaluating models across multiple dimensions and ensuring they meet both technical and business requirements.

Key principles for effective evaluation:

  1. Systematic Approach: Use structured evaluation frameworks
  2. Multiple Perspectives: Evaluate from technical, business, and ethical viewpoints
  3. Continuous Monitoring: Implement ongoing evaluation in production
  4. Actionable Insights: Focus on generating actionable recommendations
  5. Stakeholder Communication: Present results clearly to different audiences

As AI systems become more complex and critical to business operations, robust evaluation practices become increasingly important for maintaining trust, compliance, and competitive advantage.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles