Build ethical AI systems with bias detection, fairness metrics, explainability techniques, and responsible governance frameworks for production use.
AI ethics isn't just philosophy—it's a practical necessity for building trustworthy systems that serve everyone fairly. After implementing responsible AI frameworks across healthcare, finance, and hiring systems, I've learned that ethical AI requires systematic approaches, measurable metrics, and continuous vigilance. Here's your comprehensive guide to building responsible AI systems.
Ethical AI Framework and Principles
Comprehensive Ethics Framework
# ethical_ai_framework.py
from typing import Dict, List, Optional, Any, Tuple, Callable
from dataclasses import dataclass
from enum import Enum
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
import logging
from datetime import datetime
import json
class EthicalPrinciple(Enum):
FAIRNESS = "fairness"
TRANSPARENCY = "transparency"
ACCOUNTABILITY = "accountability"
PRIVACY = "privacy"
BENEFICENCE = "beneficence"
NON_MALEFICENCE = "non_maleficence"
AUTONOMY = "autonomy"
JUSTICE = "justice"
class BiasType(Enum):
STATISTICAL_PARITY = "statistical_parity"
EQUALIZED_ODDS = "equalized_odds"
DEMOGRAPHIC_PARITY = "demographic_parity"
INDIVIDUAL_FAIRNESS = "individual_fairness"
COUNTERFACTUAL_FAIRNESS = "counterfactual_fairness"
@dataclass
class EthicalAssessment:
principle: EthicalPrinciple
score: float # 0.0 to 1.0
details: Dict[str, Any]
recommendations: List[str]
timestamp: datetime
@dataclass
class FairnessMetrics:
demographic_parity: float
equalized_odds: float
calibration: float
individual_fairness: float
counterfactual_fairness: Optional[float] = None
class ResponsibleAIFramework:
def __init__(self, domain: str = "general"):
self.domain = domain
self.ethical_assessors = {}
self.bias_detectors = {}
self.fairness_enhancers = {}
self.audit_trail = []
# Initialize assessors
self._initialize_ethical_assessors()
# Domain-specific configurations
self.domain_configs = {
'healthcare': {
'critical_attributes': ['race', 'gender', 'age', 'socioeconomic_status'],
'fairness_threshold': 0.9,
'transparency_requirement': 'high',
'privacy_level': 'strict'
},
'finance': {
'critical_attributes': ['race', 'gender', 'age', 'zip_code'],
'fairness_threshold': 0.85,
'transparency_requirement': 'medium',
'privacy_level': 'strict'
},
'hiring': {
'critical_attributes': ['race', 'gender', 'age', 'education'],
'fairness_threshold': 0.9,
'transparency_requirement': 'high',
'privacy_level': 'medium'
}
}
def _initialize_ethical_assessors(self):
"""Initialize ethical assessment components"""
self.ethical_assessors[EthicalPrinciple.FAIRNESS] = FairnessAssessor()
self.ethical_assessors[EthicalPrinciple.TRANSPARENCY] = TransparencyAssessor()
self.ethical_assessors[EthicalPrinciple.ACCOUNTABILITY] = AccountabilityAssessor()
self.ethical_assessors[EthicalPrinciple.PRIVACY] = PrivacyAssessor()
self.bias_detectors[BiasType.STATISTICAL_PARITY] = StatisticalParityDetector()
self.bias_detectors[BiasType.EQUALIZED_ODDS] = EqualizedOddsDetector()
self.bias_detectors[BiasType.DEMOGRAPHIC_PARITY] = DemographicParityDetector()
self.bias_detectors[BiasType.INDIVIDUAL_FAIRNESS] = IndividualFairnessDetector()
async def assess_model_ethics(self,
model: Any,
dataset: pd.DataFrame,
protected_attributes: List[str],
target_column: str) -> Dict[str, EthicalAssessment]:
"""Comprehensive ethical assessment of AI model"""
assessments = {}
# Run assessments for each principle
for principle, assessor in self.ethical_assessors.items():
try:
assessment = await assessor.assess(
model=model,
dataset=dataset,
protected_attributes=protected_attributes,
target_column=target_column,
domain_config=self.domain_configs.get(self.domain, {})
)
assessments[principle.value] = assessment
# Log assessment
self.audit_trail.append({
'timestamp': datetime.now(),
'action': 'ethical_assessment',
'principle': principle.value,
'score': assessment.score,
'details': assessment.details
})
except Exception as e:
logging.error(f"Assessment failed for {principle.value}: {e}")
assessments[principle.value] = EthicalAssessment(
principle=principle,
score=0.0,
details={'error': str(e)},
recommendations=['Fix assessment error'],
timestamp=datetime.now()
)
return assessments
async def detect_bias(self,
model: Any,
dataset: pd.DataFrame,
protected_attributes: List[str],
target_column: str) -> Dict[str, float]:
"""Detect various types of bias in model"""
bias_scores = {}
# Generate predictions
predictions = model.predict(dataset.drop(columns=[target_column]))
true_labels = dataset[target_column].values
# Calculate bias metrics
for bias_type, detector in self.bias_detectors.items():
try:
bias_score = await detector.detect(
dataset=dataset,
predictions=predictions,
true_labels=true_labels,
protected_attributes=protected_attributes
)
bias_scores[bias_type.value] = bias_score
except Exception as e:
logging.error(f"Bias detection failed for {bias_type.value}: {e}")
bias_scores[bias_type.value] = 1.0 # Assume maximum bias on error
return bias_scores
async def generate_ethics_report(self,
model: Any,
dataset: pd.DataFrame,
protected_attributes: List[str],
target_column: str) -> Dict:
"""Generate comprehensive ethics report"""
# Run assessments
ethical_assessments = await self.assess_model_ethics(
model, dataset, protected_attributes, target_column
)
# Detect biases
bias_scores = await self.detect_bias(
model, dataset, protected_attributes, target_column
)
# Calculate overall ethics score
ethics_scores = [assessment.score for assessment in ethical_assessments.values()]
overall_score = np.mean(ethics_scores)
# Generate recommendations
recommendations = []
for assessment in ethical_assessments.values():
recommendations.extend(assessment.recommendations)
# Risk assessment
risk_level = self._assess_risk_level(overall_score, bias_scores)
report = {
'timestamp': datetime.now().isoformat(),
'model_info': {
'domain': self.domain,
'protected_attributes': protected_attributes,
'target_column': target_column
},
'overall_ethics_score': overall_score,
'principle_scores': {
principle: assessment.score
for principle, assessment in ethical_assessments.items()
},
'bias_scores': bias_scores,
'risk_level': risk_level,
'recommendations': list(set(recommendations)), # Remove duplicates
'detailed_assessments': {
principle: assessment.details
for principle, assessment in ethical_assessments.items()
}
}
return report
def _assess_risk_level(self,
overall_score: float,
bias_scores: Dict[str, float]) -> str:
"""Assess overall risk level"""
domain_config = self.domain_configs.get(self.domain, {})
fairness_threshold = domain_config.get('fairness_threshold', 0.8)
# High risk conditions
if overall_score < 0.5:
return "HIGH"
if any(score > 0.3 for score in bias_scores.values()):
return "HIGH"
if overall_score < fairness_threshold:
return "MEDIUM"
# Check for any concerning bias
if any(score > 0.1 for score in bias_scores.values()):
return "MEDIUM"
return "LOW"
class FairnessAssessor:
async def assess(self, **kwargs) -> EthicalAssessment:
"""Assess fairness of model"""
model = kwargs['model']
dataset = kwargs['dataset']
protected_attributes = kwargs['protected_attributes']
target_column = kwargs['target_column']
# Calculate fairness metrics
fairness_metrics = await self._calculate_fairness_metrics(
model, dataset, protected_attributes, target_column
)
# Overall fairness score (average of metrics)
fairness_score = np.mean([
fairness_metrics.demographic_parity,
fairness_metrics.equalized_odds,
fairness_metrics.calibration,
fairness_metrics.individual_fairness
])
recommendations = []
if fairness_score < 0.8:
recommendations.extend([
"Apply bias mitigation techniques",
"Rebalance training dataset",
"Use fairness-aware learning algorithms",
"Implement post-processing fairness corrections"
])
return EthicalAssessment(
principle=EthicalPrinciple.FAIRNESS,
score=fairness_score,
details={
'demographic_parity': fairness_metrics.demographic_parity,
'equalized_odds': fairness_metrics.equalized_odds,
'calibration': fairness_metrics.calibration,
'individual_fairness': fairness_metrics.individual_fairness,
'protected_attributes_analyzed': protected_attributes
},
recommendations=recommendations,
timestamp=datetime.now()
)
async def _calculate_fairness_metrics(self,
model: Any,
dataset: pd.DataFrame,
protected_attributes: List[str],
target_column: str) -> FairnessMetrics:
"""Calculate comprehensive fairness metrics"""
predictions = model.predict(dataset.drop(columns=[target_column]))
true_labels = dataset[target_column].values
# Demographic parity
demographic_parity = self._calculate_demographic_parity(
dataset, predictions, protected_attributes
)
# Equalized odds
equalized_odds = self._calculate_equalized_odds(
dataset, predictions, true_labels, protected_attributes
)
# Calibration
calibration = self._calculate_calibration(
dataset, predictions, true_labels, protected_attributes
)
# Individual fairness
individual_fairness = self._calculate_individual_fairness(
model, dataset, protected_attributes
)
return FairnessMetrics(
demographic_parity=demographic_parity,
equalized_odds=equalized_odds,
calibration=calibration,
individual_fairness=individual_fairness
)
def _calculate_demographic_parity(self,
dataset: pd.DataFrame,
predictions: np.ndarray,
protected_attributes: List[str]) -> float:
"""Calculate demographic parity metric"""
parity_scores = []
for attr in protected_attributes:
if attr not in dataset.columns:
continue
# Get unique values for this attribute
unique_values = dataset[attr].unique()
if len(unique_values) < 2:
continue
# Calculate positive prediction rates for each group
group_rates = []
for value in unique_values:
group_mask = dataset[attr] == value
group_predictions = predictions[group_mask]
if len(group_predictions) > 0:
positive_rate = np.mean(group_predictions)
group_rates.append(positive_rate)
if len(group_rates) > 1:
# Calculate parity as 1 - max difference between groups
max_diff = max(group_rates) - min(group_rates)
parity_score = 1.0 - max_diff
parity_scores.append(max(0.0, parity_score))
return np.mean(parity_scores) if parity_scores else 1.0
def _calculate_equalized_odds(self,
dataset: pd.DataFrame,
predictions: np.ndarray,
true_labels: np.ndarray,
protected_attributes: List[str]) -> float:
"""Calculate equalized odds metric"""
odds_scores = []
for attr in protected_attributes:
if attr not in dataset.columns:
continue
unique_values = dataset[attr].unique()
if len(unique_values) < 2:
continue
# Calculate TPR and FPR for each group
group_tprs = []
group_fprs = []
for value in unique_values:
group_mask = dataset[attr] == value
group_preds = predictions[group_mask]
group_true = true_labels[group_mask]
if len(group_true) > 0:
# True Positive Rate
true_positives = np.sum((group_preds == 1) & (group_true == 1))
actual_positives = np.sum(group_true == 1)
tpr = true_positives / actual_positives if actual_positives > 0 else 0
# False Positive Rate
false_positives = np.sum((group_preds == 1) & (group_true == 0))
actual_negatives = np.sum(group_true == 0)
fpr = false_positives / actual_negatives if actual_negatives > 0 else 0
group_tprs.append(tpr)
group_fprs.append(fpr)
# Calculate equalized odds score
if len(group_tprs) > 1 and len(group_fprs) > 1:
tpr_diff = max(group_tprs) - min(group_tprs)
fpr_diff = max(group_fprs) - min(group_fprs)
# Average difference (lower is better)
avg_diff = (tpr_diff + fpr_diff) / 2
odds_score = 1.0 - avg_diff
odds_scores.append(max(0.0, odds_score))
return np.mean(odds_scores) if odds_scores else 1.0
class BiasDetector(ABC):
@abstractmethod
async def detect(self, **kwargs) -> float:
"""Detect bias and return score (0.0 = no bias, 1.0 = maximum bias)"""
pass
class StatisticalParityDetector(BiasDetector):
async def detect(self, **kwargs) -> float:
"""Detect statistical parity violations"""
dataset = kwargs['dataset']
predictions = kwargs['predictions']
protected_attributes = kwargs['protected_attributes']
violations = []
for attr in protected_attributes:
if attr not in dataset.columns:
continue
unique_values = dataset[attr].unique()
if len(unique_values) < 2:
continue
# Calculate selection rates for each group
selection_rates = []
for value in unique_values:
group_mask = dataset[attr] == value
group_predictions = predictions[group_mask]
if len(group_predictions) > 0:
selection_rate = np.mean(group_predictions)
selection_rates.append(selection_rate)
if len(selection_rates) > 1:
# Calculate violation as maximum difference
max_rate = max(selection_rates)
min_rate = min(selection_rates)
# 80% rule: no group should have selection rate < 80% of highest
if min_rate < 0.8 * max_rate:
violation = 1.0 - (min_rate / max_rate)
violations.append(violation)
else:
violations.append(0.0)
return np.mean(violations) if violations else 0.0
class EqualizedOddsDetector(BiasDetector):
async def detect(self, **kwargs) -> float:
"""Detect equalized odds violations"""
dataset = kwargs['dataset']
predictions = kwargs['predictions']
true_labels = kwargs['true_labels']
protected_attributes = kwargs['protected_attributes']
violations = []
for attr in protected_attributes:
if attr not in dataset.columns:
continue
unique_values = dataset[attr].unique()
if len(unique_values) < 2:
continue
# Calculate TPR and FPR differences
tprs = []
fprs = []
for value in unique_values:
group_mask = dataset[attr] == value
group_preds = predictions[group_mask]
group_true = true_labels[group_mask]
if len(group_true) > 0:
# Calculate TPR and FPR
tp = np.sum((group_preds == 1) & (group_true == 1))
fp = np.sum((group_preds == 1) & (group_true == 0))
tn = np.sum((group_preds == 0) & (group_true == 0))
fn = np.sum((group_preds == 0) & (group_true == 1))
tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
tprs.append(tpr)
fprs.append(fpr)
# Calculate violations
if len(tprs) > 1 and len(fprs) > 1:
tpr_violation = max(tprs) - min(tprs)
fpr_violation = max(fprs) - min(fprs)
avg_violation = (tpr_violation + fpr_violation) / 2
violations.append(avg_violation)
return np.mean(violations) if violations else 0.0
Explainability and Interpretability
Model Interpretability Framework
# explainability_framework.py
from typing import Dict, List, Optional, Any, Tuple
import shap
import lime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.inspection import permutation_importance
from sklearn.tree import export_text
class ModelExplainabilityFramework:
def __init__(self, model: Any, model_type: str = "black_box"):
self.model = model
self.model_type = model_type
self.explainers = {}
self.global_explanations = {}
self.local_explanations = {}
# Initialize explainers based on model type
self._initialize_explainers()
def _initialize_explainers(self):
"""Initialize appropriate explainers based on model type"""
# SHAP explainers (work for most model types)
try:
if hasattr(self.model, 'predict_proba'):
self.explainers['shap'] = shap.Explainer(self.model)
else:
self.explainers['shap'] = shap.Explainer(
self.model.predict,
shap.maskers.Independent
)
except Exception as e:
print(f"SHAP initialization failed: {e}")
# LIME explainers
self.explainers['lime_tabular'] = None # Will be initialized when needed
self.explainers['lime_image'] = None
self.explainers['lime_text'] = None
# Model-specific explainers
if self.model_type == "tree":
self.explainers['tree_interpretation'] = TreeInterpreter(self.model)
elif self.model_type == "linear":
self.explainers['linear_interpretation'] = LinearInterpreter(self.model)
elif self.model_type == "neural_network":
self.explainers['gradient_analysis'] = GradientAnalyzer(self.model)
async def generate_global_explanations(self,
X_train: pd.DataFrame,
feature_names: List[str] = None) -> Dict[str, Any]:
"""Generate global model explanations"""
explanations = {}
# Feature importance (if available)
if hasattr(self.model, 'feature_importances_'):
explanations['feature_importance'] = self._get_feature_importance(
feature_names or X_train.columns.tolist()
)
# Permutation importance
explanations['permutation_importance'] = await self._get_permutation_importance(
X_train, feature_names
)
# SHAP global explanations
if 'shap' in self.explainers:
explanations['shap_global'] = await self._get_shap_global(X_train)
# Model-specific global explanations
if self.model_type == "tree":
explanations['tree_rules'] = await self._get_tree_rules()
elif self.model_type == "linear":
explanations['linear_coefficients'] = await self._get_linear_coefficients(
feature_names
)
self.global_explanations = explanations
return explanations
async def generate_local_explanation(self,
instance: np.ndarray,
feature_names: List[str] = None,
explanation_type: str = "shap") -> Dict[str, Any]:
"""Generate explanation for single prediction"""
explanation = {}
if explanation_type == "shap" and 'shap' in self.explainers:
explanation = await self._get_shap_local(instance, feature_names)
elif explanation_type == "lime":
explanation = await self._get_lime_local(instance, feature_names)
elif explanation_type == "gradient" and self.model_type == "neural_network":
explanation = await self._get_gradient_explanation(instance, feature_names)
# Store for future reference
instance_id = hash(instance.tobytes())
self.local_explanations[instance_id] = explanation
return explanation
async def _get_shap_global(self, X_train: pd.DataFrame) -> Dict:
"""Get SHAP global explanations"""
try:
explainer = self.explainers['shap']
# Calculate SHAP values for sample of training data
sample_size = min(1000, len(X_train))
X_sample = X_train.sample(n=sample_size, random_state=42)
shap_values = explainer(X_sample)
# Global feature importance
feature_importance = np.abs(shap_values.values).mean(0)
return {
'feature_importance': feature_importance.tolist(),
'feature_names': X_train.columns.tolist(),
'summary_plot_data': {
'shap_values': shap_values.values.tolist(),
'feature_values': X_sample.values.tolist()
}
}
except Exception as e:
return {'error': f"SHAP global explanation failed: {e}"}
async def _get_shap_local(self,
instance: np.ndarray,
feature_names: List[str]) -> Dict:
"""Get SHAP explanation for single instance"""
try:
explainer = self.explainers['shap']
# Reshape instance if needed
if len(instance.shape) == 1:
instance = instance.reshape(1, -1)
shap_values = explainer(instance)
return {
'shap_values': shap_values.values[0].tolist(),
'base_value': shap_values.base_values[0] if hasattr(shap_values, 'base_values') else 0,
'feature_names': feature_names,
'feature_values': instance[0].tolist(),
'prediction': self.model.predict(instance)[0]
}
except Exception as e:
return {'error': f"SHAP local explanation failed: {e}"}
async def _get_lime_local(self,
instance: np.ndarray,
feature_names: List[str]) -> Dict:
"""Get LIME explanation for single instance"""
try:
# Initialize LIME tabular explainer if not done
if self.explainers['lime_tabular'] is None:
from lime.lime_tabular import LimeTabularExplainer
# This would need training data statistics
# For now, using dummy values
self.explainers['lime_tabular'] = LimeTabularExplainer(
training_data=np.random.randn(100, len(instance)),
feature_names=feature_names,
class_names=['Class 0', 'Class 1'],
mode='classification'
)
explainer = self.explainers['lime_tabular']
# Generate explanation
explanation = explainer.explain_instance(
instance,
self.model.predict_proba,
num_features=len(feature_names)
)
# Extract explanation data
exp_data = explanation.as_map()[1] # Assuming binary classification
return {
'feature_contributions': dict(exp_data),
'feature_names': feature_names,
'prediction': self.model.predict([instance])[0],
'prediction_proba': self.model.predict_proba([instance])[0].tolist()
}
except Exception as e:
return {'error': f"LIME local explanation failed: {e}"}
async def generate_explanation_report(self,
X_test: pd.DataFrame,
y_test: np.ndarray,
sample_size: int = 10) -> Dict:
"""Generate comprehensive explanation report"""
report = {
'timestamp': datetime.now().isoformat(),
'model_type': self.model_type,
'global_explanations': {},
'local_explanations': [],
'explanation_quality_metrics': {}
}
# Generate global explanations
try:
global_explanations = await self.generate_global_explanations(X_test)
report['global_explanations'] = global_explanations
except Exception as e:
report['global_explanations'] = {'error': str(e)}
# Generate sample local explanations
sample_indices = np.random.choice(
len(X_test),
size=min(sample_size, len(X_test)),
replace=False
)
for idx in sample_indices:
try:
instance = X_test.iloc[idx].values
local_exp = await self.generate_local_explanation(
instance,
X_test.columns.tolist()
)
local_exp['true_label'] = y_test[idx]
local_exp['instance_index'] = int(idx)
report['local_explanations'].append(local_exp)
except Exception as e:
report['local_explanations'].append({
'instance_index': int(idx),
'error': str(e)
})
# Calculate explanation quality metrics
report['explanation_quality_metrics'] = await self._calculate_explanation_quality()
return report
async def _calculate_explanation_quality(self) -> Dict:
"""Calculate metrics for explanation quality"""
metrics = {}
# Consistency: How consistent are explanations for similar instances
# Completeness: Do explanations cover all important features
# Accuracy: Do explanations reflect actual model behavior
# For now, return placeholder metrics
metrics['consistency_score'] = 0.85
metrics['completeness_score'] = 0.90
metrics['accuracy_score'] = 0.88
return metrics
class TreeInterpreter:
def __init__(self, model):
self.model = model
async def get_tree_rules(self) -> List[str]:
"""Extract interpretable rules from tree model"""
if hasattr(self.model, 'tree_'):
# Single decision tree
tree_rules = export_text(self.model, feature_names=None)
return [tree_rules]
elif hasattr(self.model, 'estimators_'):
# Random Forest or similar ensemble
rules = []
for i, tree in enumerate(self.model.estimators_[:5]): # First 5 trees
rule = export_text(tree, feature_names=None)
rules.append(f"Tree {i+1}:\n{rule}")
return rules
else:
return ["Tree structure not accessible"]
class LinearInterpreter:
def __init__(self, model):
self.model = model
async def get_coefficients(self, feature_names: List[str]) -> Dict:
"""Get linear model coefficients"""
if hasattr(self.model, 'coef_'):
coefficients = self.model.coef_
if len(coefficients.shape) > 1:
coefficients = coefficients[0] # Binary classification
return {
'coefficients': coefficients.tolist(),
'feature_names': feature_names,
'intercept': float(self.model.intercept_[0]) if hasattr(self.model, 'intercept_') else 0.0
}
else:
return {'error': 'Model coefficients not accessible'}
class GradientAnalyzer:
def __init__(self, model):
self.model = model
async def analyze_gradients(self,
instance: np.ndarray,
feature_names: List[str]) -> Dict:
"""Analyze gradients for neural network explanation"""
try:
import torch
# Convert to tensor if needed
if not isinstance(instance, torch.Tensor):
instance_tensor = torch.FloatTensor(instance).unsqueeze(0)
instance_tensor.requires_grad_(True)
# Forward pass
output = self.model(instance_tensor)
# Backward pass
output.backward()
# Get gradients
gradients = instance_tensor.grad.detach().numpy()[0]
return {
'gradients': gradients.tolist(),
'feature_names': feature_names,
'prediction': output.detach().numpy()[0].tolist()
}
except Exception as e:
return {'error': f"Gradient analysis failed: {e}"}
Privacy Preservation
Privacy-Preserving Techniques
# privacy_preservation.py
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
import hashlib
from sklearn.preprocessing import StandardScaler
from scipy.stats import laplace
@dataclass
class PrivacyConfig:
technique: str # 'differential_privacy', 'k_anonymity', 'federated_learning'
privacy_budget: float # epsilon for differential privacy
noise_multiplier: float
max_grad_norm: float
k_value: int # for k-anonymity
l_diversity: int # for l-diversity
class PrivacyPreservingFramework:
def __init__(self, privacy_config: PrivacyConfig):
self.config = privacy_config
self.privacy_accountant = PrivacyAccountant()
async def apply_differential_privacy(self,
dataset: pd.DataFrame,
sensitive_columns: List[str]) -> pd.DataFrame:
"""Apply differential privacy to dataset"""
dp_dataset = dataset.copy()
for column in sensitive_columns:
if column not in dp_dataset.columns:
continue
if dp_dataset[column].dtype in ['int64', 'float64']:
# Numerical data - add Laplace noise
sensitivity = self._calculate_sensitivity(dp_dataset[column])
noise_scale = sensitivity / self.config.privacy_budget
noise = np.random.laplace(0, noise_scale, size=len(dp_dataset))
dp_dataset[column] = dp_dataset[column] + noise
else:
# Categorical data - use exponential mechanism
dp_dataset[column] = self._apply_exponential_mechanism(
dp_dataset[column],
self.config.privacy_budget
)
# Record privacy spending
self.privacy_accountant.spend_privacy_budget(
self.config.privacy_budget,
len(sensitive_columns)
)
return dp_dataset
async def apply_k_anonymity(self,
dataset: pd.DataFrame,
quasi_identifiers: List[str]) -> pd.DataFrame:
"""Apply k-anonymity to dataset"""
anonymized_dataset = dataset.copy()
# Group by quasi-identifiers
groups = anonymized_dataset.groupby(quasi_identifiers)
# Remove groups with fewer than k records
large_enough_groups = groups.filter(
lambda x: len(x) >= self.config.k_value
)
# For remaining small groups, generalize values
small_groups = groups.filter(
lambda x: len(x) < self.config.k_value
)
if not small_groups.empty:
generalized_small_groups = self._generalize_quasi_identifiers(
small_groups,
quasi_identifiers
)
# Combine with large groups
anonymized_dataset = pd.concat([
large_enough_groups,
generalized_small_groups
]).reset_index(drop=True)
else:
anonymized_dataset = large_enough_groups.reset_index(drop=True)
return anonymized_dataset
async def apply_l_diversity(self,
dataset: pd.DataFrame,
quasi_identifiers: List[str],
sensitive_attribute: str) -> pd.DataFrame:
"""Apply l-diversity to dataset"""
diverse_dataset = dataset.copy()
# Group by quasi-identifiers
groups = diverse_dataset.groupby(quasi_identifiers)
# Filter groups that have at least l distinct values for sensitive attribute
diverse_groups = []
for name, group in groups:
distinct_sensitive_values = group[sensitive_attribute].nunique()
if distinct_sensitive_values >= self.config.l_diversity:
diverse_groups.append(group)
else:
# Suppress or generalize this group
generalized_group = self._generalize_sensitive_attribute(
group,
sensitive_attribute
)
diverse_groups.append(generalized_group)
if diverse_groups:
result_dataset = pd.concat(diverse_groups).reset_index(drop=True)
else:
result_dataset = pd.DataFrame(columns=dataset.columns)
return result_dataset
def _calculate_sensitivity(self, column: pd.Series) -> float:
"""Calculate global sensitivity for numerical column"""
# For most practical cases, use range as sensitivity
return float(column.max() - column.min())
def _apply_exponential_mechanism(self,
column: pd.Series,
epsilon: float) -> pd.Series:
"""Apply exponential mechanism for categorical data"""
# Count frequencies
value_counts = column.value_counts()
# Calculate utilities (frequencies normalized)
utilities = value_counts / value_counts.sum()
# Apply exponential mechanism
probabilities = np.exp(epsilon * utilities / 2)
probabilities = probabilities / probabilities.sum()
# Sample from distribution
result = []
for _ in range(len(column)):
chosen_value = np.random.choice(
value_counts.index,
p=probabilities
)
result.append(chosen_value)
return pd.Series(result, index=column.index)
def _generalize_quasi_identifiers(self,
small_groups: pd.DataFrame,
quasi_identifiers: List[str]) -> pd.DataFrame:
"""Generalize quasi-identifiers for small groups"""
generalized = small_groups.copy()
for column in quasi_identifiers:
if generalized[column].dtype in ['int64', 'float64']:
# Numerical generalization - create ranges
min_val = generalized[column].min()
max_val = generalized[column].max()
generalized[column] = f"{min_val}-{max_val}"
else:
# Categorical generalization - use "*"
generalized[column] = "*"
return generalized
class PrivacyAccountant:
def __init__(self):
self.privacy_spending = []
self.total_epsilon_spent = 0.0
def spend_privacy_budget(self, epsilon: float, num_queries: int = 1):
"""Record privacy budget spending"""
self.privacy_spending.append({
'timestamp': datetime.now(),
'epsilon': epsilon,
'num_queries': num_queries
})
self.total_epsilon_spent += epsilon
def get_remaining_privacy_budget(self, total_budget: float) -> float:
"""Calculate remaining privacy budget"""
return max(0.0, total_budget - self.total_epsilon_spent)
def generate_privacy_report(self) -> Dict:
"""Generate privacy spending report"""
return {
'total_epsilon_spent': self.total_epsilon_spent,
'num_queries': len(self.privacy_spending),
'spending_history': self.privacy_spending,
'average_epsilon_per_query': (
self.total_epsilon_spent / len(self.privacy_spending)
if self.privacy_spending else 0.0
)
}
class FederatedLearningFramework:
def __init__(self, num_clients: int, privacy_config: PrivacyConfig):
self.num_clients = num_clients
self.privacy_config = privacy_config
self.client_models = {}
self.global_model = None
async def federated_averaging(self,
client_updates: List[Dict],
client_weights: List[float] = None) -> Dict:
"""Perform federated averaging with differential privacy"""
if client_weights is None:
client_weights = [1.0 / len(client_updates)] * len(client_updates)
# Aggregate model updates
aggregated_update = {}
for layer_name in client_updates[0].keys():
layer_updates = []
for i, client_update in enumerate(client_updates):
# Clip gradients for privacy
clipped_update = self._clip_gradients(
client_update[layer_name],
self.privacy_config.max_grad_norm
)
# Weight by client importance
weighted_update = clipped_update * client_weights[i]
layer_updates.append(weighted_update)
# Average updates
aggregated_layer = np.sum(layer_updates, axis=0)
# Add noise for differential privacy
if self.privacy_config.noise_multiplier > 0:
noise = np.random.normal(
0,
self.privacy_config.noise_multiplier * self.privacy_config.max_grad_norm,
size=aggregated_layer.shape
)
aggregated_layer += noise
aggregated_update[layer_name] = aggregated_layer
return aggregated_update
def _clip_gradients(self, gradients: np.ndarray, max_norm: float) -> np.ndarray:
"""Clip gradients to maximum norm"""
gradient_norm = np.linalg.norm(gradients)
if gradient_norm > max_norm:
clipped_gradients = gradients * (max_norm / gradient_norm)
else:
clipped_gradients = gradients
return clipped_gradients
Best Practices Checklist
Conclusion
Building ethical AI systems requires systematic approaches that embed fairness, transparency, and accountability throughout the entire ML lifecycle. By implementing comprehensive bias detection, explainability frameworks, privacy preservation techniques, and continuous monitoring, you can create AI systems that serve all users fairly and responsibly. Remember that AI ethics is not a one-time consideration but an ongoing commitment to building technology that benefits society. Start with clear ethical principles, measure what matters, and continuously improve based on real-world impact and stakeholder feedback.