Automate model design with Neural Architecture Search using evolutionary algorithms, reinforcement learning, and efficient search strategies.
Neural Architecture Search has revolutionized how we design deep learning models, automating the discovery of optimal architectures that often surpass human-designed networks. After implementing NAS systems that have discovered state-of-the-art architectures for computer vision, NLP, and mobile deployment, I've learned that successful NAS requires balancing search efficiency, architecture diversity, and practical constraints. Here's your comprehensive guide to automated neural architecture design.
NAS Framework and Foundations
Comprehensive NAS Framework
# nas_framework.py
from typing import Dict, List, Optional, Any, Tuple, Callable, Union
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from copy import deepcopy
import random
import time
import json
import logging
class SearchSpace(Enum):
MACRO = "macro" # Overall architecture topology
MICRO = "micro" # Cell/block internal structure
MIXED = "mixed" # Both macro and micro
class SearchStrategy(Enum):
RANDOM = "random"
EVOLUTIONARY = "evolutionary"
REINFORCEMENT_LEARNING = "reinforcement_learning"
BAYESIAN_OPTIMIZATION = "bayesian_optimization"
DIFFERENTIABLE = "differentiable"
PROGRESSIVE = "progressive"
@dataclass
class ArchitectureSpec:
"""Architecture specification"""
id: str
layers: List[Dict[str, Any]]
connections: List[Tuple[int, int]]
metadata: Dict[str, Any] = field(default_factory=dict)
performance_metrics: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
'id': self.id,
'layers': self.layers,
'connections': self.connections,
'metadata': self.metadata,
'performance_metrics': self.performance_metrics
}
@dataclass
class SearchConstraints:
"""Constraints for architecture search"""
max_layers: int = 50
max_parameters: int = 50_000_000 # 50M parameters
max_latency_ms: float = 100.0
max_memory_mb: float = 1000.0
min_accuracy: float = 0.90
target_platform: str = "gpu" # gpu, cpu, mobile, edge
class NeuralArchitectureSearch:
def __init__(self,
search_space_config: Dict,
search_strategy: SearchStrategy,
constraints: SearchConstraints,
performance_evaluator):
self.search_space = SearchSpaceManager(search_space_config)
self.search_strategy = search_strategy
self.constraints = constraints
self.evaluator = performance_evaluator
# Initialize search algorithm
self.searcher = self._create_searcher()
# Search state
self.search_history = []
self.best_architectures = []
self.current_population = []
# Performance tracking
self.search_metrics = {
'architectures_evaluated': 0,
'search_time_hours': 0,
'best_accuracy': 0.0,
'pareto_frontier': []
}
def _create_searcher(self):
"""Create searcher based on strategy"""
if self.search_strategy == SearchStrategy.EVOLUTIONARY:
return EvolutionarySearcher(self.search_space, self.constraints)
elif self.search_strategy == SearchStrategy.REINFORCEMENT_LEARNING:
return RLSearcher(self.search_space, self.constraints)
elif self.search_strategy == SearchStrategy.BAYESIAN_OPTIMIZATION:
return BayesianSearcher(self.search_space, self.constraints)
elif self.search_strategy == SearchStrategy.DIFFERENTIABLE:
return DifferentiableSearcher(self.search_space, self.constraints)
elif self.search_strategy == SearchStrategy.PROGRESSIVE:
return ProgressiveSearcher(self.search_space, self.constraints)
else:
return RandomSearcher(self.search_space, self.constraints)
async def search(self,
training_data,
validation_data,
search_budget: int = 1000,
time_budget_hours: float = 24.0) -> List[ArchitectureSpec]:
"""Execute neural architecture search"""
start_time = time.time()
# Initialize search
await self.searcher.initialize(training_data, validation_data)
# Main search loop
for iteration in range(search_budget):
# Check time budget
elapsed_hours = (time.time() - start_time) / 3600
if elapsed_hours >= time_budget_hours:
logging.info(f"Time budget exceeded: {elapsed_hours:.2f} hours")
break
# Generate candidate architectures
candidates = await self.searcher.generate_candidates(
iteration,
self.search_history
)
# Evaluate candidates
evaluated_candidates = []
for candidate in candidates:
try:
# Check constraints before evaluation
if not self._check_constraints(candidate):
continue
# Evaluate architecture
performance = await self.evaluator.evaluate(
candidate,
training_data,
validation_data
)
candidate.performance_metrics = performance
evaluated_candidates.append(candidate)
# Update metrics
self.search_metrics['architectures_evaluated'] += 1
if performance.get('accuracy', 0) > self.search_metrics['best_accuracy']:
self.search_metrics['best_accuracy'] = performance['accuracy']
except Exception as e:
logging.error(f"Architecture evaluation failed: {e}")
continue
# Update search state
self.search_history.extend(evaluated_candidates)
# Update searcher with results
await self.searcher.update(evaluated_candidates, iteration)
# Update best architectures
self._update_best_architectures(evaluated_candidates)
# Log progress
if iteration % 10 == 0:
self._log_search_progress(iteration, elapsed_hours)
# Final search metrics
self.search_metrics['search_time_hours'] = (time.time() - start_time) / 3600
return self.best_architectures
def _check_constraints(self, architecture: ArchitectureSpec) -> bool:
"""Check if architecture meets constraints"""
# Parameter count constraint
param_count = self._estimate_parameters(architecture)
if param_count > self.constraints.max_parameters:
return False
# Layer count constraint
if len(architecture.layers) > self.constraints.max_layers:
return False
# Platform-specific constraints
if not self._check_platform_constraints(architecture):
return False
return True
def _estimate_parameters(self, architecture: ArchitectureSpec) -> int:
"""Estimate parameter count for architecture"""
total_params = 0
layer_outputs = {}
for i, layer in enumerate(architecture.layers):
layer_type = layer['type']
layer_config = layer['config']
if layer_type == 'conv2d':
in_channels = layer_config.get('in_channels', 3)
out_channels = layer_config['out_channels']
kernel_size = layer_config.get('kernel_size', 3)
# Conv parameters: (kernel_size^2 * in_channels * out_channels) + bias
conv_params = (kernel_size ** 2) * in_channels * out_channels + out_channels
total_params += conv_params
layer_outputs[i] = out_channels
elif layer_type == 'linear':
in_features = layer_config.get('in_features', 512)
out_features = layer_config['out_features']
# Linear parameters: (in_features * out_features) + bias
linear_params = in_features * out_features + out_features
total_params += linear_params
layer_outputs[i] = out_features
# Add other layer types as needed
return total_params
def _update_best_architectures(self, new_candidates: List[ArchitectureSpec]):
"""Update list of best architectures"""
# Combine with existing best architectures
all_candidates = self.best_architectures + new_candidates
# Sort by performance (multi-objective)
all_candidates.sort(
key=lambda arch: self._calculate_architecture_score(arch),
reverse=True
)
# Keep top architectures (Pareto frontier)
self.best_architectures = self._select_pareto_optimal(
all_candidates,
max_size=20
)
def _calculate_architecture_score(self, architecture: ArchitectureSpec) -> float:
"""Calculate overall architecture score"""
metrics = architecture.performance_metrics
# Multi-objective scoring
accuracy = metrics.get('accuracy', 0.0)
latency = metrics.get('latency_ms', float('inf'))
parameters = metrics.get('parameters', float('inf'))
# Normalize metrics
accuracy_score = accuracy
latency_score = max(0, 1.0 - (latency / 1000)) # Penalty for high latency
param_score = max(0, 1.0 - (parameters / self.constraints.max_parameters))
# Weighted combination
total_score = (0.6 * accuracy_score +
0.2 * latency_score +
0.2 * param_score)
return total_score
def _select_pareto_optimal(self,
candidates: List[ArchitectureSpec],
max_size: int) -> List[ArchitectureSpec]:
"""Select Pareto optimal architectures"""
if len(candidates) <= max_size:
return candidates
# For simplicity, select top performers across different objectives
accuracy_sorted = sorted(
candidates,
key=lambda x: x.performance_metrics.get('accuracy', 0),
reverse=True
)
latency_sorted = sorted(
candidates,
key=lambda x: x.performance_metrics.get('latency_ms', float('inf'))
)
param_sorted = sorted(
candidates,
key=lambda x: x.performance_metrics.get('parameters', float('inf'))
)
# Select diverse set
selected = set()
selected.update(accuracy_sorted[:max_size//3])
selected.update(latency_sorted[:max_size//3])
selected.update(param_sorted[:max_size//3])
# Fill remaining with top overall scores
remaining_slots = max_size - len(selected)
if remaining_slots > 0:
overall_sorted = sorted(
[c for c in candidates if c not in selected],
key=lambda x: self._calculate_architecture_score(x),
reverse=True
)
selected.update(overall_sorted[:remaining_slots])
return list(selected)
class SearchSpaceManager:
"""Manages the search space definition and sampling"""
def __init__(self, config: Dict):
self.config = config
self.layer_types = config.get('layer_types', ['conv2d', 'linear', 'relu', 'pool'])
self.connection_patterns = config.get('connection_patterns', ['sequential', 'residual', 'dense'])
self.parameter_ranges = config.get('parameter_ranges', {})
def sample_architecture(self) -> ArchitectureSpec:
"""Sample random architecture from search space"""
# Sample number of layers
num_layers = np.random.randint(3, 20)
layers = []
for i in range(num_layers):
layer_type = np.random.choice(self.layer_types)
layer_config = self._sample_layer_config(layer_type)
layers.append({
'type': layer_type,
'config': layer_config
})
# Generate connections
connections = self._sample_connections(num_layers)
architecture_id = f"arch_{int(time.time() * 1000000) % 1000000}"
return ArchitectureSpec(
id=architecture_id,
layers=layers,
connections=connections,
metadata={'sampled': True}
)
def _sample_layer_config(self, layer_type: str) -> Dict:
"""Sample configuration for specific layer type"""
if layer_type == 'conv2d':
return {
'out_channels': np.random.choice([16, 32, 64, 128, 256]),
'kernel_size': np.random.choice([1, 3, 5, 7]),
'stride': np.random.choice([1, 2]),
'padding': 'same'
}
elif layer_type == 'linear':
return {
'out_features': np.random.choice([64, 128, 256, 512, 1024])
}
elif layer_type == 'pool':
return {
'type': np.random.choice(['max', 'avg']),
'kernel_size': np.random.choice([2, 3]),
'stride': np.random.choice([1, 2])
}
else:
return {}
def _sample_connections(self, num_layers: int) -> List[Tuple[int, int]]:
"""Sample connection pattern between layers"""
connections = []
# Sequential connections
for i in range(num_layers - 1):
connections.append((i, i + 1))
# Add skip connections randomly
skip_probability = 0.3
for i in range(num_layers):
for j in range(i + 2, num_layers):
if np.random.random() < skip_probability:
connections.append((i, j))
return connections
def mutate_architecture(self,
architecture: ArchitectureSpec,
mutation_rate: float = 0.1) -> ArchitectureSpec:
"""Mutate an existing architecture"""
mutated = deepcopy(architecture)
mutated.id = f"mutated_{mutated.id}_{int(time.time() * 1000000) % 1000000}"
# Mutate layers
for layer in mutated.layers:
if np.random.random() < mutation_rate:
layer['config'] = self._mutate_layer_config(layer['type'], layer['config'])
# Mutate connections
if np.random.random() < mutation_rate:
mutated.connections = self._mutate_connections(
mutated.connections,
len(mutated.layers)
)
# Possibly add/remove layers
if np.random.random() < mutation_rate / 2:
mutated = self._mutate_topology(mutated)
return mutated
def crossover_architectures(self,
parent1: ArchitectureSpec,
parent2: ArchitectureSpec) -> ArchitectureSpec:
"""Create offspring through crossover"""
# Simple crossover: take layers from both parents
min_layers = min(len(parent1.layers), len(parent2.layers))
crossover_point = np.random.randint(1, min_layers)
offspring_layers = (parent1.layers[:crossover_point] +
parent2.layers[crossover_point:])
# Inherit connections from both parents (simplified)
offspring_connections = parent1.connections[:len(offspring_layers)-1]
offspring_id = f"cross_{int(time.time() * 1000000) % 1000000}"
return ArchitectureSpec(
id=offspring_id,
layers=offspring_layers,
connections=offspring_connections,
metadata={
'parent1': parent1.id,
'parent2': parent2.id,
'crossover_point': crossover_point
}
)
Evolutionary Architecture Search
# evolutionary_search.py
class EvolutionarySearcher:
def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
self.search_space = search_space
self.constraints = constraints
# Evolution parameters
self.population_size = 50
self.mutation_rate = 0.2
self.crossover_rate = 0.8
self.selection_pressure = 2
self.elitism_ratio = 0.1
# Evolution state
self.population = []
self.generation = 0
self.fitness_history = []
async def initialize(self, training_data, validation_data):
"""Initialize population"""
self.population = []
# Create initial population
for _ in range(self.population_size):
individual = self.search_space.sample_architecture()
self.population.append(individual)
logging.info(f"Initialized population with {len(self.population)} individuals")
async def generate_candidates(self,
iteration: int,
search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
"""Generate candidates for next generation"""
# Selection
selected_parents = self._selection(self.population)
# Generate offspring
offspring = []
while len(offspring) < self.population_size - int(self.population_size * self.elitism_ratio):
# Select two parents
parent1 = random.choice(selected_parents)
parent2 = random.choice(selected_parents)
# Crossover
if random.random() < self.crossover_rate:
child = self.search_space.crossover_architectures(parent1, parent2)
else:
child = deepcopy(random.choice([parent1, parent2]))
child.id = f"clone_{int(time.time() * 1000000) % 1000000}"
# Mutation
if random.random() < self.mutation_rate:
child = self.search_space.mutate_architecture(child, self.mutation_rate)
offspring.append(child)
# Add elite individuals
elite_count = int(self.population_size * self.elitism_ratio)
elite_individuals = sorted(
self.population,
key=lambda x: self._calculate_fitness(x),
reverse=True
)[:elite_count]
offspring.extend(elite_individuals)
return offspring
async def update(self, evaluated_candidates: List[ArchitectureSpec], iteration: int):
"""Update population with evaluation results"""
# Update population
self.population = evaluated_candidates
self.generation += 1
# Record fitness history
fitnesses = [self._calculate_fitness(ind) for ind in self.population]
self.fitness_history.append({
'generation': self.generation,
'best_fitness': max(fitnesses),
'average_fitness': np.mean(fitnesses),
'worst_fitness': min(fitnesses)
})
# Log progress
logging.info(f"Generation {self.generation}: Best fitness = {max(fitnesses):.4f}")
def _selection(self, population: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
"""Tournament selection"""
selected = []
tournament_size = max(2, int(len(population) * 0.1))
for _ in range(len(population)):
# Tournament selection
tournament = random.sample(population, tournament_size)
winner = max(tournament, key=lambda x: self._calculate_fitness(x))
selected.append(winner)
return selected
def _calculate_fitness(self, individual: ArchitectureSpec) -> float:
"""Calculate fitness score for individual"""
if not individual.performance_metrics:
return 0.0
# Multi-objective fitness
accuracy = individual.performance_metrics.get('accuracy', 0.0)
efficiency = self._calculate_efficiency(individual)
# Weighted combination
fitness = 0.7 * accuracy + 0.3 * efficiency
return fitness
def _calculate_efficiency(self, individual: ArchitectureSpec) -> float:
"""Calculate efficiency score (latency/parameters penalty)"""
latency = individual.performance_metrics.get('latency_ms', float('inf'))
parameters = individual.performance_metrics.get('parameters', float('inf'))
# Normalize to 0-1 range
latency_score = max(0, 1.0 - (latency / 1000))
param_score = max(0, 1.0 - (parameters / self.constraints.max_parameters))
return (latency_score + param_score) / 2
class RLSearcher:
"""Reinforcement Learning based architecture search"""
def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
self.search_space = search_space
self.constraints = constraints
# RL parameters
self.state_size = 128
self.action_size = 64
self.learning_rate = 0.001
# Neural network controller
self.controller = self._build_controller()
self.optimizer = optim.Adam(self.controller.parameters(), lr=self.learning_rate)
# Experience buffer
self.experience_buffer = []
self.buffer_size = 1000
def _build_controller(self) -> nn.Module:
"""Build LSTM controller network"""
class ArchitectureController(nn.Module):
def __init__(self, state_size: int, action_size: int, hidden_size: int = 256):
super().__init__()
self.hidden_size = hidden_size
self.lstm = nn.LSTM(state_size, hidden_size, batch_first=True)
self.action_head = nn.Linear(hidden_size, action_size)
self.value_head = nn.Linear(hidden_size, 1)
def forward(self, x, hidden=None):
lstm_out, hidden = self.lstm(x, hidden)
actions = torch.softmax(self.action_head(lstm_out), dim=-1)
values = self.value_head(lstm_out)
return actions, values, hidden
return ArchitectureController(self.state_size, self.action_size)
async def initialize(self, training_data, validation_data):
"""Initialize RL controller"""
# Initialize experience buffer with random architectures
for _ in range(20):
arch = self.search_space.sample_architecture()
state = self._architecture_to_state(arch)
self.experience_buffer.append({
'state': state,
'action': np.random.randint(self.action_size),
'reward': 0.0,
'architecture': arch
})
async def generate_candidates(self,
iteration: int,
search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
"""Generate architectures using RL controller"""
candidates = []
for _ in range(10): # Generate 10 candidates per iteration
architecture = await self._sample_architecture_with_controller()
candidates.append(architecture)
return candidates
async def _sample_architecture_with_controller(self) -> ArchitectureSpec:
"""Sample architecture using trained controller"""
architecture_tokens = []
hidden = None
# Generate architecture sequence
for step in range(20): # Max 20 decisions
# Current state (simplified)
state = torch.zeros(1, 1, self.state_size)
# Get action probabilities
action_probs, _, hidden = self.controller(state, hidden)
# Sample action
action = torch.multinomial(action_probs.squeeze(), 1).item()
architecture_tokens.append(action)
# Early stopping condition
if self._is_terminal_action(action):
break
# Convert tokens to architecture
architecture = self._tokens_to_architecture(architecture_tokens)
return architecture
def _architecture_to_state(self, architecture: ArchitectureSpec) -> np.ndarray:
"""Convert architecture to state representation"""
# Simplified state encoding
state = np.zeros(self.state_size)
# Encode basic architecture properties
state[0] = len(architecture.layers) / 50 # Normalized layer count
state[1] = len(architecture.connections) / 100 # Normalized connections
# Encode layer types
layer_type_counts = {}
for layer in architecture.layers:
layer_type = layer['type']
layer_type_counts[layer_type] = layer_type_counts.get(layer_type, 0) + 1
# Fill in layer type encodings
type_mapping = {'conv2d': 2, 'linear': 3, 'relu': 4, 'pool': 5}
for i, (layer_type, count) in enumerate(layer_type_counts.items()):
if i < 10: # Limit to first 10 positions
state[2 + i] = count / 10 # Normalized count
return state
def _tokens_to_architecture(self, tokens: List[int]) -> ArchitectureSpec:
"""Convert action tokens to architecture specification"""
layers = []
connections = []
# Decode tokens to architecture (simplified)
for i, token in enumerate(tokens):
if token < 20: # Layer tokens
layer_type = ['conv2d', 'linear', 'relu', 'pool'][token % 4]
config = self.search_space._sample_layer_config(layer_type)
layers.append({'type': layer_type, 'config': config})
elif token < 40: # Connection tokens
if i > 0 and i < len(layers):
connections.append((token % i, i))
# Ensure sequential connections
for i in range(len(layers) - 1):
connections.append((i, i + 1))
architecture_id = f"rl_{int(time.time() * 1000000) % 1000000}"
return ArchitectureSpec(
id=architecture_id,
layers=layers,
connections=connections,
metadata={'generated_by': 'rl_controller'}
)
class DifferentiableSearcher:
"""Differentiable Architecture Search (DARTS)"""
def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
self.search_space = search_space
self.constraints = constraints
# DARTS parameters
self.learning_rate_model = 0.025
self.learning_rate_arch = 3e-4
self.weight_decay = 3e-4
# Architecture parameters (learnable)
self.arch_params = None
self.model_params = None
async def initialize(self, training_data, validation_data):
"""Initialize DARTS search"""
# Initialize architecture parameters
# This would create alpha parameters for each possible operation
num_operations = 8 # Number of candidate operations
num_edges = 14 # Number of edges in search space
self.arch_params = torch.randn(num_edges, num_operations, requires_grad=True)
# Initialize optimizers
self.arch_optimizer = optim.Adam([self.arch_params], lr=self.learning_rate_arch)
async def generate_candidates(self,
iteration: int,
search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
"""Generate candidates using current architecture parameters"""
# In DARTS, we continuously update a single super-network
# For this simplified version, we derive discrete architectures
candidates = []
# Sample architectures from current alpha parameters
for _ in range(5):
architecture = self._derive_architecture_from_alphas()
candidates.append(architecture)
return candidates
def _derive_architecture_from_alphas(self) -> ArchitectureSpec:
"""Derive discrete architecture from alpha parameters"""
layers = []
connections = []
# Convert continuous alpha parameters to discrete architecture
with torch.no_grad():
for edge_idx in range(len(self.arch_params)):
# Select operation with highest alpha
best_op_idx = torch.argmax(self.arch_params[edge_idx]).item()
# Map operation index to actual operation
operations = ['conv3x3', 'conv5x5', 'dil_conv3x3', 'dil_conv5x5',
'sep_conv3x3', 'sep_conv5x5', 'avg_pool3x3', 'max_pool3x3']
selected_op = operations[best_op_idx]
# Convert to layer specification
if 'conv' in selected_op:
kernel_size = 3 if '3x3' in selected_op else 5
layers.append({
'type': 'conv2d',
'config': {
'out_channels': 32,
'kernel_size': kernel_size,
'stride': 1
}
})
elif 'pool' in selected_op:
layers.append({
'type': 'pool',
'config': {
'type': 'avg' if 'avg' in selected_op else 'max',
'kernel_size': 3
}
})
# Create connections (simplified)
for i in range(len(layers) - 1):
connections.append((i, i + 1))
architecture_id = f"darts_{int(time.time() * 1000000) % 1000000}"
return ArchitectureSpec(
id=architecture_id,
layers=layers,
connections=connections,
metadata={'generated_by': 'darts', 'iteration': len(connections)}
)
async def update(self, evaluated_candidates: List[ArchitectureSpec], iteration: int):
"""Update architecture parameters based on validation performance"""
# In real DARTS, this would update the alpha parameters
# based on validation loss gradients
# For this simplified version, we adjust alphas based on performance
if evaluated_candidates:
best_candidate = max(
evaluated_candidates,
key=lambda x: x.performance_metrics.get('accuracy', 0)
)
# Reward operations used in best architecture (simplified)
# In practice, this would involve proper gradient-based updates
pass
class ProgressiveSearcher:
"""Progressive search that starts simple and increases complexity"""
def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
self.search_space = search_space
self.constraints = constraints
# Progressive parameters
self.current_complexity = 1
self.max_complexity = 10
self.complexity_schedule = [1, 2, 3, 5, 7, 10]
async def initialize(self, training_data, validation_data):
"""Initialize progressive search"""
self.current_complexity = 1
logging.info("Starting progressive search with minimal complexity")
async def generate_candidates(self,
iteration: int,
search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
"""Generate candidates with current complexity level"""
# Update complexity based on iteration
complexity_stage = min(iteration // 50, len(self.complexity_schedule) - 1)
self.current_complexity = self.complexity_schedule[complexity_stage]
candidates = []
for _ in range(10):
# Sample architecture with current complexity
arch = self._sample_architecture_with_complexity(self.current_complexity)
candidates.append(arch)
return candidates
def _sample_architecture_with_complexity(self, complexity: int) -> ArchitectureSpec:
"""Sample architecture with specific complexity level"""
# Number of layers based on complexity
num_layers = min(3 + complexity * 2, 30)
layers = []
for i in range(num_layers):
# Layer complexity increases with overall complexity
if complexity <= 2:
# Simple layers only
layer_type = np.random.choice(['conv2d', 'relu', 'pool'])
elif complexity <= 5:
# Add linear layers
layer_type = np.random.choice(['conv2d', 'linear', 'relu', 'pool'])
else:
# Full complexity
layer_type = np.random.choice(self.search_space.layer_types)
config = self.search_space._sample_layer_config(layer_type)
# Limit parameter counts based on complexity
if layer_type == 'conv2d' and 'out_channels' in config:
max_channels = min(config['out_channels'], 32 * complexity)
config['out_channels'] = max_channels
layers.append({
'type': layer_type,
'config': config
})
# Simple connections for low complexity
connections = []
for i in range(len(layers) - 1):
connections.append((i, i + 1))
# Add skip connections for higher complexity
if complexity > 3:
for i in range(len(layers)):
for j in range(i + 2, min(i + complexity, len(layers))):
if np.random.random() < 0.2: # 20% chance
connections.append((i, j))
architecture_id = f"prog_{complexity}_{int(time.time() * 1000000) % 1000000}"
return ArchitectureSpec(
id=architecture_id,
layers=layers,
connections=connections,
metadata={
'complexity_level': complexity,
'generated_by': 'progressive_search'
}
)
Performance Evaluation and Optimization
Efficient Architecture Evaluation
# architecture_evaluation.py
class ArchitectureEvaluator:
def __init__(self,
device: str = "cuda",
evaluation_strategy: str = "weight_sharing"):
self.device = device
self.evaluation_strategy = evaluation_strategy
# Evaluation strategies
self.evaluators = {
'full_training': FullTrainingEvaluator(device),
'weight_sharing': WeightSharingEvaluator(device),
'early_stopping': EarlyStoppingEvaluator(device),
'proxy_tasks': ProxyTaskEvaluator(device),
'performance_prediction': PerformancePredictorEvaluator(device)
}
self.current_evaluator = self.evaluators[evaluation_strategy]
# Caching for efficiency
self.evaluation_cache = {}
self.architecture_embeddings = {}
async def evaluate(self,
architecture: ArchitectureSpec,
training_data,
validation_data,
budget_epochs: int = 50) -> Dict[str, float]:
"""Evaluate architecture performance"""
# Check cache first
cache_key = self._get_cache_key(architecture)
if cache_key in self.evaluation_cache:
cached_result = self.evaluation_cache[cache_key]
logging.info(f"Using cached evaluation for {architecture.id}")
return cached_result
# Build model from architecture
model = await self._build_model_from_architecture(architecture)
if model is None:
return {'accuracy': 0.0, 'loss': float('inf'), 'parameters': 0}
# Evaluate using selected strategy
performance = await self.current_evaluator.evaluate(
model,
architecture,
training_data,
validation_data,
budget_epochs
)
# Add architecture-specific metrics
performance['parameters'] = self._count_parameters(model)
performance['flops'] = await self._estimate_flops(model)
performance['latency_ms'] = await self._measure_latency(model)
performance['memory_mb'] = await self._measure_memory(model)
# Cache result
self.evaluation_cache[cache_key] = performance
return performance
async def _build_model_from_architecture(self,
architecture: ArchitectureSpec) -> Optional[nn.Module]:
"""Build PyTorch model from architecture specification"""
try:
return ArchitectureBuilder.build(architecture)
except Exception as e:
logging.error(f"Failed to build model: {e}")
return None
def _count_parameters(self, model: nn.Module) -> int:
"""Count total parameters in model"""
return sum(p.numel() for p in model.parameters())
async def _estimate_flops(self, model: nn.Module) -> float:
"""Estimate FLOPs for model"""
# Simplified FLOP estimation
total_flops = 0
for module in model.modules():
if isinstance(module, nn.Conv2d):
# Conv2d FLOPs: output_elements * (kernel_size^2 * in_channels + 1)
kernel_flops = (module.kernel_size[0] * module.kernel_size[1] *
module.in_channels)
output_elements = 224 * 224 * module.out_channels # Assuming 224x224 input
total_flops += output_elements * kernel_flops
elif isinstance(module, nn.Linear):
# Linear FLOPs: in_features * out_features
total_flops += module.in_features * module.out_features
return total_flops
async def _measure_latency(self, model: nn.Module, num_runs: int = 100) -> float:
"""Measure model inference latency"""
model.eval()
model.to(self.device)
# Warmup
dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
with torch.no_grad():
for _ in range(10):
_ = model(dummy_input)
# Actual measurement
torch.cuda.synchronize() if self.device == 'cuda' else None
start_time = time.time()
with torch.no_grad():
for _ in range(num_runs):
_ = model(dummy_input)
torch.cuda.synchronize() if self.device == 'cuda' else None
end_time = time.time()
avg_latency_ms = ((end_time - start_time) / num_runs) * 1000
return avg_latency_ms
def _get_cache_key(self, architecture: ArchitectureSpec) -> str:
"""Generate cache key for architecture"""
# Create deterministic hash from architecture
arch_str = json.dumps(architecture.to_dict(), sort_keys=True)
return hashlib.md5(arch_str.encode()).hexdigest()
class WeightSharingEvaluator:
"""Evaluator using weight sharing across architectures"""
def __init__(self, device: str):
self.device = device
self.super_network = None
async def evaluate(self,
model: nn.Module,
architecture: ArchitectureSpec,
training_data,
validation_data,
budget_epochs: int) -> Dict[str, float]:
"""Evaluate using weight sharing super-network"""
# Initialize super-network if not done
if self.super_network is None:
await self._initialize_super_network(training_data)
# Extract subnet for this architecture
subnet = self._extract_subnet(architecture)
# Quick evaluation on validation set
accuracy = await self._evaluate_subnet(subnet, validation_data)
return {
'accuracy': accuracy,
'loss': 1.0 - accuracy, # Simplified
'evaluation_cost': 0.1 # Much lower than full training
}
async def _initialize_super_network(self, training_data):
"""Initialize and train super-network once"""
# Create large super-network containing all possible operations
self.super_network = self._build_super_network()
# Train super-network (this would be done once, then reused)
logging.info("Training super-network (one-time cost)...")
# Training code would go here
def _build_super_network(self) -> nn.Module:
"""Build super-network containing all candidate operations"""
class SuperNetwork(nn.Module):
def __init__(self):
super().__init__()
# Define all possible operations
self.operations = nn.ModuleList([
nn.Conv2d(3, 32, 3, padding=1),
nn.Conv2d(32, 64, 3, padding=1),
nn.Conv2d(64, 128, 3, padding=1),
nn.AdaptiveAvgPool2d(1),
nn.Linear(128, 10)
])
def forward(self, x, architecture_mask=None):
# Forward pass using architecture-specific mask
for i, op in enumerate(self.operations):
if architecture_mask is None or architecture_mask[i]:
x = op(x)
return x
return SuperNetwork()
class EarlyStoppingEvaluator:
"""Evaluator using early stopping for efficiency"""
def __init__(self, device: str):
self.device = device
self.min_epochs = 5
self.patience = 3
async def evaluate(self,
model: nn.Module,
architecture: ArchitectureSpec,
training_data,
validation_data,
budget_epochs: int) -> Dict[str, float]:
"""Evaluate with early stopping"""
model.to(self.device)
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
criterion = nn.CrossEntropyLoss()
best_accuracy = 0.0
patience_counter = 0
for epoch in range(budget_epochs):
# Training phase
model.train()
train_loss = 0.0
for batch_idx, (data, target) in enumerate(training_data):
if batch_idx > 10: # Limit batches for efficiency
break
data, target = data.to(self.device), target.to(self.device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation phase
if epoch >= self.min_epochs - 1:
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(validation_data):
if batch_idx > 5: # Limit validation batches
break
data, target = data.to(self.device), target.to(self.device)
output = model(data)
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
accuracy = correct / total
# Early stopping logic
if accuracy > best_accuracy:
best_accuracy = accuracy
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= self.patience:
logging.info(f"Early stopping at epoch {epoch + 1}")
break
return {
'accuracy': best_accuracy,
'loss': 1.0 - best_accuracy,
'epochs_trained': epoch + 1,
'early_stopped': patience_counter >= self.patience
}
class ArchitectureBuilder:
"""Builds PyTorch models from architecture specifications"""
@staticmethod
def build(architecture: ArchitectureSpec) -> nn.Module:
"""Build model from architecture specification"""
layers = []
layer_modules = {}
# Build individual layers
for i, layer_spec in enumerate(architecture.layers):
layer_type = layer_spec['type']
layer_config = layer_spec['config']
if layer_type == 'conv2d':
layer = nn.Conv2d(
in_channels=layer_config.get('in_channels', 3),
out_channels=layer_config['out_channels'],
kernel_size=layer_config.get('kernel_size', 3),
stride=layer_config.get('stride', 1),
padding=layer_config.get('padding', 1)
)
elif layer_type == 'linear':
layer = nn.Linear(
in_features=layer_config.get('in_features', 512),
out_features=layer_config['out_features']
)
elif layer_type == 'relu':
layer = nn.ReLU(inplace=True)
elif layer_type == 'pool':
pool_type = layer_config.get('type', 'max')
kernel_size = layer_config.get('kernel_size', 2)
if pool_type == 'max':
layer = nn.MaxPool2d(kernel_size)
else:
layer = nn.AvgPool2d(kernel_size)
else:
continue # Skip unknown layer types
layer_modules[i] = layer
# Build model with connections
model = DynamicModel(layer_modules, architecture.connections)
return model
class DynamicModel(nn.Module):
"""Dynamic model that supports flexible connections"""
def __init__(self, layer_modules: Dict[int, nn.Module], connections: List[Tuple[int, int]]):
super().__init__()
self.layer_modules = nn.ModuleDict({str(k): v for k, v in layer_modules.items()})
self.connections = connections
# Analyze connection topology
self.execution_order = self._topological_sort()
def _topological_sort(self) -> List[int]:
"""Topological sort of layers for execution order"""
# Simple topological sort implementation
in_degree = {}
graph = {}
# Initialize
for layer_id in self.layer_modules.keys():
layer_id_int = int(layer_id)
in_degree[layer_id_int] = 0
graph[layer_id_int] = []
# Build graph and calculate in-degrees
for src, dst in self.connections:
graph[src].append(dst)
in_degree[dst] += 1
# Topological sort
queue = [node for node, degree in in_degree.items() if degree == 0]
execution_order = []
while queue:
current = queue.pop(0)
execution_order.append(current)
for neighbor in graph[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return execution_order
def forward(self, x):
"""Forward pass through dynamic architecture"""
layer_outputs = {}
for layer_id in self.execution_order:
layer_module = self.layer_modules[str(layer_id)]
# Determine input for this layer
if layer_id == 0:
# First layer uses network input
layer_input = x
else:
# Collect inputs from predecessor layers
predecessor_outputs = []
for src, dst in self.connections:
if dst == layer_id and src in layer_outputs:
predecessor_outputs.append(layer_outputs[src])
if len(predecessor_outputs) == 1:
layer_input = predecessor_outputs[0]
elif len(predecessor_outputs) > 1:
# Combine multiple inputs (simple concatenation or addition)
try:
layer_input = torch.cat(predecessor_outputs, dim=1)
except:
layer_input = sum(predecessor_outputs)
else:
# No valid input, skip this layer
continue
# Execute layer
try:
layer_output = layer_module(layer_input)
layer_outputs[layer_id] = layer_output
except Exception as e:
logging.warning(f"Layer {layer_id} execution failed: {e}")
layer_outputs[layer_id] = layer_input # Pass input through
# Return output from last layer
if self.execution_order:
final_output = layer_outputs.get(self.execution_order[-1], x)
else:
final_output = x
return final_output
Best Practices Checklist
Conclusion
Neural Architecture Search transforms the art of neural network design into an automated optimization process. By implementing systematic search strategies, efficient evaluation methods, and proper constraint handling, you can discover architectures that often exceed human-designed alternatives. The key to successful NAS is balancing exploration efficiency with search thoroughness while maintaining practical deployment constraints. Start with clear objectives, implement efficient evaluation, and let the algorithms discover optimal architectures for your specific use case and hardware constraints.