Optimize AI Models with Neural Search

David Childs

Automate model design with Neural Architecture Search using evolutionary algorithms, reinforcement learning, and efficient search strategies.

Neural Architecture Search has revolutionized how we design deep learning models, automating the discovery of optimal architectures that often surpass human-designed networks. After implementing NAS systems that have discovered state-of-the-art architectures for computer vision, NLP, and mobile deployment, I've learned that successful NAS requires balancing search efficiency, architecture diversity, and practical constraints. Here's your comprehensive guide to automated neural architecture design.

NAS Framework and Foundations

Comprehensive NAS Framework

# nas_framework.py
from typing import Dict, List, Optional, Any, Tuple, Callable, Union
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from enum import Enum
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from copy import deepcopy
import random
import time
import json
import logging

class SearchSpace(Enum):
    MACRO = "macro"  # Overall architecture topology
    MICRO = "micro"  # Cell/block internal structure
    MIXED = "mixed"   # Both macro and micro

class SearchStrategy(Enum):
    RANDOM = "random"
    EVOLUTIONARY = "evolutionary"
    REINFORCEMENT_LEARNING = "reinforcement_learning"
    BAYESIAN_OPTIMIZATION = "bayesian_optimization"
    DIFFERENTIABLE = "differentiable"
    PROGRESSIVE = "progressive"

@dataclass
class ArchitectureSpec:
    """Architecture specification"""
    id: str
    layers: List[Dict[str, Any]]
    connections: List[Tuple[int, int]]
    metadata: Dict[str, Any] = field(default_factory=dict)
    performance_metrics: Dict[str, float] = field(default_factory=dict)
    
    def to_dict(self) -> Dict:
        return {
            'id': self.id,
            'layers': self.layers,
            'connections': self.connections,
            'metadata': self.metadata,
            'performance_metrics': self.performance_metrics
        }

@dataclass
class SearchConstraints:
    """Constraints for architecture search"""
    max_layers: int = 50
    max_parameters: int = 50_000_000  # 50M parameters
    max_latency_ms: float = 100.0
    max_memory_mb: float = 1000.0
    min_accuracy: float = 0.90
    target_platform: str = "gpu"  # gpu, cpu, mobile, edge

class NeuralArchitectureSearch:
    def __init__(self,
                 search_space_config: Dict,
                 search_strategy: SearchStrategy,
                 constraints: SearchConstraints,
                 performance_evaluator):
        
        self.search_space = SearchSpaceManager(search_space_config)
        self.search_strategy = search_strategy
        self.constraints = constraints
        self.evaluator = performance_evaluator
        
        # Initialize search algorithm
        self.searcher = self._create_searcher()
        
        # Search state
        self.search_history = []
        self.best_architectures = []
        self.current_population = []
        
        # Performance tracking
        self.search_metrics = {
            'architectures_evaluated': 0,
            'search_time_hours': 0,
            'best_accuracy': 0.0,
            'pareto_frontier': []
        }
    
    def _create_searcher(self):
        """Create searcher based on strategy"""
        
        if self.search_strategy == SearchStrategy.EVOLUTIONARY:
            return EvolutionarySearcher(self.search_space, self.constraints)
        elif self.search_strategy == SearchStrategy.REINFORCEMENT_LEARNING:
            return RLSearcher(self.search_space, self.constraints)
        elif self.search_strategy == SearchStrategy.BAYESIAN_OPTIMIZATION:
            return BayesianSearcher(self.search_space, self.constraints)
        elif self.search_strategy == SearchStrategy.DIFFERENTIABLE:
            return DifferentiableSearcher(self.search_space, self.constraints)
        elif self.search_strategy == SearchStrategy.PROGRESSIVE:
            return ProgressiveSearcher(self.search_space, self.constraints)
        else:
            return RandomSearcher(self.search_space, self.constraints)
    
    async def search(self,
                   training_data,
                   validation_data,
                   search_budget: int = 1000,
                   time_budget_hours: float = 24.0) -> List[ArchitectureSpec]:
        """Execute neural architecture search"""
        
        start_time = time.time()
        
        # Initialize search
        await self.searcher.initialize(training_data, validation_data)
        
        # Main search loop
        for iteration in range(search_budget):
            # Check time budget
            elapsed_hours = (time.time() - start_time) / 3600
            if elapsed_hours >= time_budget_hours:
                logging.info(f"Time budget exceeded: {elapsed_hours:.2f} hours")
                break
            
            # Generate candidate architectures
            candidates = await self.searcher.generate_candidates(
                iteration, 
                self.search_history
            )
            
            # Evaluate candidates
            evaluated_candidates = []
            for candidate in candidates:
                try:
                    # Check constraints before evaluation
                    if not self._check_constraints(candidate):
                        continue
                    
                    # Evaluate architecture
                    performance = await self.evaluator.evaluate(
                        candidate,
                        training_data,
                        validation_data
                    )
                    
                    candidate.performance_metrics = performance
                    evaluated_candidates.append(candidate)
                    
                    # Update metrics
                    self.search_metrics['architectures_evaluated'] += 1
                    
                    if performance.get('accuracy', 0) > self.search_metrics['best_accuracy']:
                        self.search_metrics['best_accuracy'] = performance['accuracy']
                    
                except Exception as e:
                    logging.error(f"Architecture evaluation failed: {e}")
                    continue
            
            # Update search state
            self.search_history.extend(evaluated_candidates)
            
            # Update searcher with results
            await self.searcher.update(evaluated_candidates, iteration)
            
            # Update best architectures
            self._update_best_architectures(evaluated_candidates)
            
            # Log progress
            if iteration % 10 == 0:
                self._log_search_progress(iteration, elapsed_hours)
        
        # Final search metrics
        self.search_metrics['search_time_hours'] = (time.time() - start_time) / 3600
        
        return self.best_architectures
    
    def _check_constraints(self, architecture: ArchitectureSpec) -> bool:
        """Check if architecture meets constraints"""
        
        # Parameter count constraint
        param_count = self._estimate_parameters(architecture)
        if param_count > self.constraints.max_parameters:
            return False
        
        # Layer count constraint
        if len(architecture.layers) > self.constraints.max_layers:
            return False
        
        # Platform-specific constraints
        if not self._check_platform_constraints(architecture):
            return False
        
        return True
    
    def _estimate_parameters(self, architecture: ArchitectureSpec) -> int:
        """Estimate parameter count for architecture"""
        
        total_params = 0
        layer_outputs = {}
        
        for i, layer in enumerate(architecture.layers):
            layer_type = layer['type']
            layer_config = layer['config']
            
            if layer_type == 'conv2d':
                in_channels = layer_config.get('in_channels', 3)
                out_channels = layer_config['out_channels']
                kernel_size = layer_config.get('kernel_size', 3)
                
                # Conv parameters: (kernel_size^2 * in_channels * out_channels) + bias
                conv_params = (kernel_size ** 2) * in_channels * out_channels + out_channels
                total_params += conv_params
                
                layer_outputs[i] = out_channels
                
            elif layer_type == 'linear':
                in_features = layer_config.get('in_features', 512)
                out_features = layer_config['out_features']
                
                # Linear parameters: (in_features * out_features) + bias
                linear_params = in_features * out_features + out_features
                total_params += linear_params
                
                layer_outputs[i] = out_features
            
            # Add other layer types as needed
        
        return total_params
    
    def _update_best_architectures(self, new_candidates: List[ArchitectureSpec]):
        """Update list of best architectures"""
        
        # Combine with existing best architectures
        all_candidates = self.best_architectures + new_candidates
        
        # Sort by performance (multi-objective)
        all_candidates.sort(
            key=lambda arch: self._calculate_architecture_score(arch),
            reverse=True
        )
        
        # Keep top architectures (Pareto frontier)
        self.best_architectures = self._select_pareto_optimal(
            all_candidates,
            max_size=20
        )
    
    def _calculate_architecture_score(self, architecture: ArchitectureSpec) -> float:
        """Calculate overall architecture score"""
        
        metrics = architecture.performance_metrics
        
        # Multi-objective scoring
        accuracy = metrics.get('accuracy', 0.0)
        latency = metrics.get('latency_ms', float('inf'))
        parameters = metrics.get('parameters', float('inf'))
        
        # Normalize metrics
        accuracy_score = accuracy
        latency_score = max(0, 1.0 - (latency / 1000))  # Penalty for high latency
        param_score = max(0, 1.0 - (parameters / self.constraints.max_parameters))
        
        # Weighted combination
        total_score = (0.6 * accuracy_score + 
                      0.2 * latency_score + 
                      0.2 * param_score)
        
        return total_score
    
    def _select_pareto_optimal(self, 
                              candidates: List[ArchitectureSpec],
                              max_size: int) -> List[ArchitectureSpec]:
        """Select Pareto optimal architectures"""
        
        if len(candidates) <= max_size:
            return candidates
        
        # For simplicity, select top performers across different objectives
        accuracy_sorted = sorted(
            candidates,
            key=lambda x: x.performance_metrics.get('accuracy', 0),
            reverse=True
        )
        
        latency_sorted = sorted(
            candidates,
            key=lambda x: x.performance_metrics.get('latency_ms', float('inf'))
        )
        
        param_sorted = sorted(
            candidates,
            key=lambda x: x.performance_metrics.get('parameters', float('inf'))
        )
        
        # Select diverse set
        selected = set()
        selected.update(accuracy_sorted[:max_size//3])
        selected.update(latency_sorted[:max_size//3])
        selected.update(param_sorted[:max_size//3])
        
        # Fill remaining with top overall scores
        remaining_slots = max_size - len(selected)
        if remaining_slots > 0:
            overall_sorted = sorted(
                [c for c in candidates if c not in selected],
                key=lambda x: self._calculate_architecture_score(x),
                reverse=True
            )
            selected.update(overall_sorted[:remaining_slots])
        
        return list(selected)

class SearchSpaceManager:
    """Manages the search space definition and sampling"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.layer_types = config.get('layer_types', ['conv2d', 'linear', 'relu', 'pool'])
        self.connection_patterns = config.get('connection_patterns', ['sequential', 'residual', 'dense'])
        self.parameter_ranges = config.get('parameter_ranges', {})
    
    def sample_architecture(self) -> ArchitectureSpec:
        """Sample random architecture from search space"""
        
        # Sample number of layers
        num_layers = np.random.randint(3, 20)
        
        layers = []
        for i in range(num_layers):
            layer_type = np.random.choice(self.layer_types)
            layer_config = self._sample_layer_config(layer_type)
            
            layers.append({
                'type': layer_type,
                'config': layer_config
            })
        
        # Generate connections
        connections = self._sample_connections(num_layers)
        
        architecture_id = f"arch_{int(time.time() * 1000000) % 1000000}"
        
        return ArchitectureSpec(
            id=architecture_id,
            layers=layers,
            connections=connections,
            metadata={'sampled': True}
        )
    
    def _sample_layer_config(self, layer_type: str) -> Dict:
        """Sample configuration for specific layer type"""
        
        if layer_type == 'conv2d':
            return {
                'out_channels': np.random.choice([16, 32, 64, 128, 256]),
                'kernel_size': np.random.choice([1, 3, 5, 7]),
                'stride': np.random.choice([1, 2]),
                'padding': 'same'
            }
        elif layer_type == 'linear':
            return {
                'out_features': np.random.choice([64, 128, 256, 512, 1024])
            }
        elif layer_type == 'pool':
            return {
                'type': np.random.choice(['max', 'avg']),
                'kernel_size': np.random.choice([2, 3]),
                'stride': np.random.choice([1, 2])
            }
        else:
            return {}
    
    def _sample_connections(self, num_layers: int) -> List[Tuple[int, int]]:
        """Sample connection pattern between layers"""
        
        connections = []
        
        # Sequential connections
        for i in range(num_layers - 1):
            connections.append((i, i + 1))
        
        # Add skip connections randomly
        skip_probability = 0.3
        for i in range(num_layers):
            for j in range(i + 2, num_layers):
                if np.random.random() < skip_probability:
                    connections.append((i, j))
        
        return connections
    
    def mutate_architecture(self, 
                           architecture: ArchitectureSpec,
                           mutation_rate: float = 0.1) -> ArchitectureSpec:
        """Mutate an existing architecture"""
        
        mutated = deepcopy(architecture)
        mutated.id = f"mutated_{mutated.id}_{int(time.time() * 1000000) % 1000000}"
        
        # Mutate layers
        for layer in mutated.layers:
            if np.random.random() < mutation_rate:
                layer['config'] = self._mutate_layer_config(layer['type'], layer['config'])
        
        # Mutate connections
        if np.random.random() < mutation_rate:
            mutated.connections = self._mutate_connections(
                mutated.connections,
                len(mutated.layers)
            )
        
        # Possibly add/remove layers
        if np.random.random() < mutation_rate / 2:
            mutated = self._mutate_topology(mutated)
        
        return mutated
    
    def crossover_architectures(self,
                               parent1: ArchitectureSpec,
                               parent2: ArchitectureSpec) -> ArchitectureSpec:
        """Create offspring through crossover"""
        
        # Simple crossover: take layers from both parents
        min_layers = min(len(parent1.layers), len(parent2.layers))
        crossover_point = np.random.randint(1, min_layers)
        
        offspring_layers = (parent1.layers[:crossover_point] + 
                          parent2.layers[crossover_point:])
        
        # Inherit connections from both parents (simplified)
        offspring_connections = parent1.connections[:len(offspring_layers)-1]
        
        offspring_id = f"cross_{int(time.time() * 1000000) % 1000000}"
        
        return ArchitectureSpec(
            id=offspring_id,
            layers=offspring_layers,
            connections=offspring_connections,
            metadata={
                'parent1': parent1.id,
                'parent2': parent2.id,
                'crossover_point': crossover_point
            }
        )

Evolutionary Architecture Search

# evolutionary_search.py
class EvolutionarySearcher:
    def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
        self.search_space = search_space
        self.constraints = constraints
        
        # Evolution parameters
        self.population_size = 50
        self.mutation_rate = 0.2
        self.crossover_rate = 0.8
        self.selection_pressure = 2
        self.elitism_ratio = 0.1
        
        # Evolution state
        self.population = []
        self.generation = 0
        self.fitness_history = []
    
    async def initialize(self, training_data, validation_data):
        """Initialize population"""
        
        self.population = []
        
        # Create initial population
        for _ in range(self.population_size):
            individual = self.search_space.sample_architecture()
            self.population.append(individual)
        
        logging.info(f"Initialized population with {len(self.population)} individuals")
    
    async def generate_candidates(self, 
                                iteration: int,
                                search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
        """Generate candidates for next generation"""
        
        # Selection
        selected_parents = self._selection(self.population)
        
        # Generate offspring
        offspring = []
        
        while len(offspring) < self.population_size - int(self.population_size * self.elitism_ratio):
            # Select two parents
            parent1 = random.choice(selected_parents)
            parent2 = random.choice(selected_parents)
            
            # Crossover
            if random.random() < self.crossover_rate:
                child = self.search_space.crossover_architectures(parent1, parent2)
            else:
                child = deepcopy(random.choice([parent1, parent2]))
                child.id = f"clone_{int(time.time() * 1000000) % 1000000}"
            
            # Mutation
            if random.random() < self.mutation_rate:
                child = self.search_space.mutate_architecture(child, self.mutation_rate)
            
            offspring.append(child)
        
        # Add elite individuals
        elite_count = int(self.population_size * self.elitism_ratio)
        elite_individuals = sorted(
            self.population,
            key=lambda x: self._calculate_fitness(x),
            reverse=True
        )[:elite_count]
        
        offspring.extend(elite_individuals)
        
        return offspring
    
    async def update(self, evaluated_candidates: List[ArchitectureSpec], iteration: int):
        """Update population with evaluation results"""
        
        # Update population
        self.population = evaluated_candidates
        self.generation += 1
        
        # Record fitness history
        fitnesses = [self._calculate_fitness(ind) for ind in self.population]
        self.fitness_history.append({
            'generation': self.generation,
            'best_fitness': max(fitnesses),
            'average_fitness': np.mean(fitnesses),
            'worst_fitness': min(fitnesses)
        })
        
        # Log progress
        logging.info(f"Generation {self.generation}: Best fitness = {max(fitnesses):.4f}")
    
    def _selection(self, population: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
        """Tournament selection"""
        
        selected = []
        tournament_size = max(2, int(len(population) * 0.1))
        
        for _ in range(len(population)):
            # Tournament selection
            tournament = random.sample(population, tournament_size)
            winner = max(tournament, key=lambda x: self._calculate_fitness(x))
            selected.append(winner)
        
        return selected
    
    def _calculate_fitness(self, individual: ArchitectureSpec) -> float:
        """Calculate fitness score for individual"""
        
        if not individual.performance_metrics:
            return 0.0
        
        # Multi-objective fitness
        accuracy = individual.performance_metrics.get('accuracy', 0.0)
        efficiency = self._calculate_efficiency(individual)
        
        # Weighted combination
        fitness = 0.7 * accuracy + 0.3 * efficiency
        
        return fitness
    
    def _calculate_efficiency(self, individual: ArchitectureSpec) -> float:
        """Calculate efficiency score (latency/parameters penalty)"""
        
        latency = individual.performance_metrics.get('latency_ms', float('inf'))
        parameters = individual.performance_metrics.get('parameters', float('inf'))
        
        # Normalize to 0-1 range
        latency_score = max(0, 1.0 - (latency / 1000))
        param_score = max(0, 1.0 - (parameters / self.constraints.max_parameters))
        
        return (latency_score + param_score) / 2

class RLSearcher:
    """Reinforcement Learning based architecture search"""
    
    def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
        self.search_space = search_space
        self.constraints = constraints
        
        # RL parameters
        self.state_size = 128
        self.action_size = 64
        self.learning_rate = 0.001
        
        # Neural network controller
        self.controller = self._build_controller()
        self.optimizer = optim.Adam(self.controller.parameters(), lr=self.learning_rate)
        
        # Experience buffer
        self.experience_buffer = []
        self.buffer_size = 1000
    
    def _build_controller(self) -> nn.Module:
        """Build LSTM controller network"""
        
        class ArchitectureController(nn.Module):
            def __init__(self, state_size: int, action_size: int, hidden_size: int = 256):
                super().__init__()
                
                self.hidden_size = hidden_size
                self.lstm = nn.LSTM(state_size, hidden_size, batch_first=True)
                self.action_head = nn.Linear(hidden_size, action_size)
                self.value_head = nn.Linear(hidden_size, 1)
                
            def forward(self, x, hidden=None):
                lstm_out, hidden = self.lstm(x, hidden)
                actions = torch.softmax(self.action_head(lstm_out), dim=-1)
                values = self.value_head(lstm_out)
                
                return actions, values, hidden
        
        return ArchitectureController(self.state_size, self.action_size)
    
    async def initialize(self, training_data, validation_data):
        """Initialize RL controller"""
        
        # Initialize experience buffer with random architectures
        for _ in range(20):
            arch = self.search_space.sample_architecture()
            state = self._architecture_to_state(arch)
            self.experience_buffer.append({
                'state': state,
                'action': np.random.randint(self.action_size),
                'reward': 0.0,
                'architecture': arch
            })
    
    async def generate_candidates(self,
                                iteration: int,
                                search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
        """Generate architectures using RL controller"""
        
        candidates = []
        
        for _ in range(10):  # Generate 10 candidates per iteration
            architecture = await self._sample_architecture_with_controller()
            candidates.append(architecture)
        
        return candidates
    
    async def _sample_architecture_with_controller(self) -> ArchitectureSpec:
        """Sample architecture using trained controller"""
        
        architecture_tokens = []
        hidden = None
        
        # Generate architecture sequence
        for step in range(20):  # Max 20 decisions
            # Current state (simplified)
            state = torch.zeros(1, 1, self.state_size)
            
            # Get action probabilities
            action_probs, _, hidden = self.controller(state, hidden)
            
            # Sample action
            action = torch.multinomial(action_probs.squeeze(), 1).item()
            architecture_tokens.append(action)
            
            # Early stopping condition
            if self._is_terminal_action(action):
                break
        
        # Convert tokens to architecture
        architecture = self._tokens_to_architecture(architecture_tokens)
        
        return architecture
    
    def _architecture_to_state(self, architecture: ArchitectureSpec) -> np.ndarray:
        """Convert architecture to state representation"""
        
        # Simplified state encoding
        state = np.zeros(self.state_size)
        
        # Encode basic architecture properties
        state[0] = len(architecture.layers) / 50  # Normalized layer count
        state[1] = len(architecture.connections) / 100  # Normalized connections
        
        # Encode layer types
        layer_type_counts = {}
        for layer in architecture.layers:
            layer_type = layer['type']
            layer_type_counts[layer_type] = layer_type_counts.get(layer_type, 0) + 1
        
        # Fill in layer type encodings
        type_mapping = {'conv2d': 2, 'linear': 3, 'relu': 4, 'pool': 5}
        for i, (layer_type, count) in enumerate(layer_type_counts.items()):
            if i < 10:  # Limit to first 10 positions
                state[2 + i] = count / 10  # Normalized count
        
        return state
    
    def _tokens_to_architecture(self, tokens: List[int]) -> ArchitectureSpec:
        """Convert action tokens to architecture specification"""
        
        layers = []
        connections = []
        
        # Decode tokens to architecture (simplified)
        for i, token in enumerate(tokens):
            if token < 20:  # Layer tokens
                layer_type = ['conv2d', 'linear', 'relu', 'pool'][token % 4]
                config = self.search_space._sample_layer_config(layer_type)
                layers.append({'type': layer_type, 'config': config})
            elif token < 40:  # Connection tokens
                if i > 0 and i < len(layers):
                    connections.append((token % i, i))
        
        # Ensure sequential connections
        for i in range(len(layers) - 1):
            connections.append((i, i + 1))
        
        architecture_id = f"rl_{int(time.time() * 1000000) % 1000000}"
        
        return ArchitectureSpec(
            id=architecture_id,
            layers=layers,
            connections=connections,
            metadata={'generated_by': 'rl_controller'}
        )

class DifferentiableSearcher:
    """Differentiable Architecture Search (DARTS)"""
    
    def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
        self.search_space = search_space
        self.constraints = constraints
        
        # DARTS parameters
        self.learning_rate_model = 0.025
        self.learning_rate_arch = 3e-4
        self.weight_decay = 3e-4
        
        # Architecture parameters (learnable)
        self.arch_params = None
        self.model_params = None
        
    async def initialize(self, training_data, validation_data):
        """Initialize DARTS search"""
        
        # Initialize architecture parameters
        # This would create alpha parameters for each possible operation
        num_operations = 8  # Number of candidate operations
        num_edges = 14      # Number of edges in search space
        
        self.arch_params = torch.randn(num_edges, num_operations, requires_grad=True)
        
        # Initialize optimizers
        self.arch_optimizer = optim.Adam([self.arch_params], lr=self.learning_rate_arch)
    
    async def generate_candidates(self,
                                iteration: int,
                                search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
        """Generate candidates using current architecture parameters"""
        
        # In DARTS, we continuously update a single super-network
        # For this simplified version, we derive discrete architectures
        
        candidates = []
        
        # Sample architectures from current alpha parameters
        for _ in range(5):
            architecture = self._derive_architecture_from_alphas()
            candidates.append(architecture)
        
        return candidates
    
    def _derive_architecture_from_alphas(self) -> ArchitectureSpec:
        """Derive discrete architecture from alpha parameters"""
        
        layers = []
        connections = []
        
        # Convert continuous alpha parameters to discrete architecture
        with torch.no_grad():
            for edge_idx in range(len(self.arch_params)):
                # Select operation with highest alpha
                best_op_idx = torch.argmax(self.arch_params[edge_idx]).item()
                
                # Map operation index to actual operation
                operations = ['conv3x3', 'conv5x5', 'dil_conv3x3', 'dil_conv5x5',
                             'sep_conv3x3', 'sep_conv5x5', 'avg_pool3x3', 'max_pool3x3']
                
                selected_op = operations[best_op_idx]
                
                # Convert to layer specification
                if 'conv' in selected_op:
                    kernel_size = 3 if '3x3' in selected_op else 5
                    layers.append({
                        'type': 'conv2d',
                        'config': {
                            'out_channels': 32,
                            'kernel_size': kernel_size,
                            'stride': 1
                        }
                    })
                elif 'pool' in selected_op:
                    layers.append({
                        'type': 'pool',
                        'config': {
                            'type': 'avg' if 'avg' in selected_op else 'max',
                            'kernel_size': 3
                        }
                    })
        
        # Create connections (simplified)
        for i in range(len(layers) - 1):
            connections.append((i, i + 1))
        
        architecture_id = f"darts_{int(time.time() * 1000000) % 1000000}"
        
        return ArchitectureSpec(
            id=architecture_id,
            layers=layers,
            connections=connections,
            metadata={'generated_by': 'darts', 'iteration': len(connections)}
        )
    
    async def update(self, evaluated_candidates: List[ArchitectureSpec], iteration: int):
        """Update architecture parameters based on validation performance"""
        
        # In real DARTS, this would update the alpha parameters
        # based on validation loss gradients
        
        # For this simplified version, we adjust alphas based on performance
        if evaluated_candidates:
            best_candidate = max(
                evaluated_candidates,
                key=lambda x: x.performance_metrics.get('accuracy', 0)
            )
            
            # Reward operations used in best architecture (simplified)
            # In practice, this would involve proper gradient-based updates
            pass

class ProgressiveSearcher:
    """Progressive search that starts simple and increases complexity"""
    
    def __init__(self, search_space: SearchSpaceManager, constraints: SearchConstraints):
        self.search_space = search_space
        self.constraints = constraints
        
        # Progressive parameters
        self.current_complexity = 1
        self.max_complexity = 10
        self.complexity_schedule = [1, 2, 3, 5, 7, 10]
        
    async def initialize(self, training_data, validation_data):
        """Initialize progressive search"""
        
        self.current_complexity = 1
        logging.info("Starting progressive search with minimal complexity")
    
    async def generate_candidates(self,
                                iteration: int,
                                search_history: List[ArchitectureSpec]) -> List[ArchitectureSpec]:
        """Generate candidates with current complexity level"""
        
        # Update complexity based on iteration
        complexity_stage = min(iteration // 50, len(self.complexity_schedule) - 1)
        self.current_complexity = self.complexity_schedule[complexity_stage]
        
        candidates = []
        
        for _ in range(10):
            # Sample architecture with current complexity
            arch = self._sample_architecture_with_complexity(self.current_complexity)
            candidates.append(arch)
        
        return candidates
    
    def _sample_architecture_with_complexity(self, complexity: int) -> ArchitectureSpec:
        """Sample architecture with specific complexity level"""
        
        # Number of layers based on complexity
        num_layers = min(3 + complexity * 2, 30)
        
        layers = []
        for i in range(num_layers):
            # Layer complexity increases with overall complexity
            if complexity <= 2:
                # Simple layers only
                layer_type = np.random.choice(['conv2d', 'relu', 'pool'])
            elif complexity <= 5:
                # Add linear layers
                layer_type = np.random.choice(['conv2d', 'linear', 'relu', 'pool'])
            else:
                # Full complexity
                layer_type = np.random.choice(self.search_space.layer_types)
            
            config = self.search_space._sample_layer_config(layer_type)
            
            # Limit parameter counts based on complexity
            if layer_type == 'conv2d' and 'out_channels' in config:
                max_channels = min(config['out_channels'], 32 * complexity)
                config['out_channels'] = max_channels
            
            layers.append({
                'type': layer_type,
                'config': config
            })
        
        # Simple connections for low complexity
        connections = []
        for i in range(len(layers) - 1):
            connections.append((i, i + 1))
        
        # Add skip connections for higher complexity
        if complexity > 3:
            for i in range(len(layers)):
                for j in range(i + 2, min(i + complexity, len(layers))):
                    if np.random.random() < 0.2:  # 20% chance
                        connections.append((i, j))
        
        architecture_id = f"prog_{complexity}_{int(time.time() * 1000000) % 1000000}"
        
        return ArchitectureSpec(
            id=architecture_id,
            layers=layers,
            connections=connections,
            metadata={
                'complexity_level': complexity,
                'generated_by': 'progressive_search'
            }
        )

Performance Evaluation and Optimization

Efficient Architecture Evaluation

# architecture_evaluation.py
class ArchitectureEvaluator:
    def __init__(self, 
                 device: str = "cuda",
                 evaluation_strategy: str = "weight_sharing"):
        
        self.device = device
        self.evaluation_strategy = evaluation_strategy
        
        # Evaluation strategies
        self.evaluators = {
            'full_training': FullTrainingEvaluator(device),
            'weight_sharing': WeightSharingEvaluator(device),
            'early_stopping': EarlyStoppingEvaluator(device),
            'proxy_tasks': ProxyTaskEvaluator(device),
            'performance_prediction': PerformancePredictorEvaluator(device)
        }
        
        self.current_evaluator = self.evaluators[evaluation_strategy]
        
        # Caching for efficiency
        self.evaluation_cache = {}
        self.architecture_embeddings = {}
    
    async def evaluate(self,
                     architecture: ArchitectureSpec,
                     training_data,
                     validation_data,
                     budget_epochs: int = 50) -> Dict[str, float]:
        """Evaluate architecture performance"""
        
        # Check cache first
        cache_key = self._get_cache_key(architecture)
        if cache_key in self.evaluation_cache:
            cached_result = self.evaluation_cache[cache_key]
            logging.info(f"Using cached evaluation for {architecture.id}")
            return cached_result
        
        # Build model from architecture
        model = await self._build_model_from_architecture(architecture)
        
        if model is None:
            return {'accuracy': 0.0, 'loss': float('inf'), 'parameters': 0}
        
        # Evaluate using selected strategy
        performance = await self.current_evaluator.evaluate(
            model,
            architecture,
            training_data,
            validation_data,
            budget_epochs
        )
        
        # Add architecture-specific metrics
        performance['parameters'] = self._count_parameters(model)
        performance['flops'] = await self._estimate_flops(model)
        performance['latency_ms'] = await self._measure_latency(model)
        performance['memory_mb'] = await self._measure_memory(model)
        
        # Cache result
        self.evaluation_cache[cache_key] = performance
        
        return performance
    
    async def _build_model_from_architecture(self, 
                                           architecture: ArchitectureSpec) -> Optional[nn.Module]:
        """Build PyTorch model from architecture specification"""
        
        try:
            return ArchitectureBuilder.build(architecture)
        except Exception as e:
            logging.error(f"Failed to build model: {e}")
            return None
    
    def _count_parameters(self, model: nn.Module) -> int:
        """Count total parameters in model"""
        
        return sum(p.numel() for p in model.parameters())
    
    async def _estimate_flops(self, model: nn.Module) -> float:
        """Estimate FLOPs for model"""
        
        # Simplified FLOP estimation
        total_flops = 0
        
        for module in model.modules():
            if isinstance(module, nn.Conv2d):
                # Conv2d FLOPs: output_elements * (kernel_size^2 * in_channels + 1)
                kernel_flops = (module.kernel_size[0] * module.kernel_size[1] * 
                               module.in_channels)
                output_elements = 224 * 224 * module.out_channels  # Assuming 224x224 input
                total_flops += output_elements * kernel_flops
                
            elif isinstance(module, nn.Linear):
                # Linear FLOPs: in_features * out_features
                total_flops += module.in_features * module.out_features
        
        return total_flops
    
    async def _measure_latency(self, model: nn.Module, num_runs: int = 100) -> float:
        """Measure model inference latency"""
        
        model.eval()
        model.to(self.device)
        
        # Warmup
        dummy_input = torch.randn(1, 3, 224, 224).to(self.device)
        with torch.no_grad():
            for _ in range(10):
                _ = model(dummy_input)
        
        # Actual measurement
        torch.cuda.synchronize() if self.device == 'cuda' else None
        start_time = time.time()
        
        with torch.no_grad():
            for _ in range(num_runs):
                _ = model(dummy_input)
        
        torch.cuda.synchronize() if self.device == 'cuda' else None
        end_time = time.time()
        
        avg_latency_ms = ((end_time - start_time) / num_runs) * 1000
        
        return avg_latency_ms
    
    def _get_cache_key(self, architecture: ArchitectureSpec) -> str:
        """Generate cache key for architecture"""
        
        # Create deterministic hash from architecture
        arch_str = json.dumps(architecture.to_dict(), sort_keys=True)
        return hashlib.md5(arch_str.encode()).hexdigest()

class WeightSharingEvaluator:
    """Evaluator using weight sharing across architectures"""
    
    def __init__(self, device: str):
        self.device = device
        self.super_network = None
        
    async def evaluate(self,
                     model: nn.Module,
                     architecture: ArchitectureSpec,
                     training_data,
                     validation_data,
                     budget_epochs: int) -> Dict[str, float]:
        """Evaluate using weight sharing super-network"""
        
        # Initialize super-network if not done
        if self.super_network is None:
            await self._initialize_super_network(training_data)
        
        # Extract subnet for this architecture
        subnet = self._extract_subnet(architecture)
        
        # Quick evaluation on validation set
        accuracy = await self._evaluate_subnet(subnet, validation_data)
        
        return {
            'accuracy': accuracy,
            'loss': 1.0 - accuracy,  # Simplified
            'evaluation_cost': 0.1   # Much lower than full training
        }
    
    async def _initialize_super_network(self, training_data):
        """Initialize and train super-network once"""
        
        # Create large super-network containing all possible operations
        self.super_network = self._build_super_network()
        
        # Train super-network (this would be done once, then reused)
        logging.info("Training super-network (one-time cost)...")
        # Training code would go here
        
    def _build_super_network(self) -> nn.Module:
        """Build super-network containing all candidate operations"""
        
        class SuperNetwork(nn.Module):
            def __init__(self):
                super().__init__()
                # Define all possible operations
                self.operations = nn.ModuleList([
                    nn.Conv2d(3, 32, 3, padding=1),
                    nn.Conv2d(32, 64, 3, padding=1),
                    nn.Conv2d(64, 128, 3, padding=1),
                    nn.AdaptiveAvgPool2d(1),
                    nn.Linear(128, 10)
                ])
            
            def forward(self, x, architecture_mask=None):
                # Forward pass using architecture-specific mask
                for i, op in enumerate(self.operations):
                    if architecture_mask is None or architecture_mask[i]:
                        x = op(x)
                return x
        
        return SuperNetwork()

class EarlyStoppingEvaluator:
    """Evaluator using early stopping for efficiency"""
    
    def __init__(self, device: str):
        self.device = device
        self.min_epochs = 5
        self.patience = 3
        
    async def evaluate(self,
                     model: nn.Module,
                     architecture: ArchitectureSpec,
                     training_data,
                     validation_data,
                     budget_epochs: int) -> Dict[str, float]:
        """Evaluate with early stopping"""
        
        model.to(self.device)
        optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
        criterion = nn.CrossEntropyLoss()
        
        best_accuracy = 0.0
        patience_counter = 0
        
        for epoch in range(budget_epochs):
            # Training phase
            model.train()
            train_loss = 0.0
            
            for batch_idx, (data, target) in enumerate(training_data):
                if batch_idx > 10:  # Limit batches for efficiency
                    break
                
                data, target = data.to(self.device), target.to(self.device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()
            
            # Validation phase
            if epoch >= self.min_epochs - 1:
                model.eval()
                correct = 0
                total = 0
                
                with torch.no_grad():
                    for batch_idx, (data, target) in enumerate(validation_data):
                        if batch_idx > 5:  # Limit validation batches
                            break
                        
                        data, target = data.to(self.device), target.to(self.device)
                        output = model(data)
                        _, predicted = torch.max(output.data, 1)
                        total += target.size(0)
                        correct += (predicted == target).sum().item()
                
                accuracy = correct / total
                
                # Early stopping logic
                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    patience_counter = 0
                else:
                    patience_counter += 1
                    
                if patience_counter >= self.patience:
                    logging.info(f"Early stopping at epoch {epoch + 1}")
                    break
        
        return {
            'accuracy': best_accuracy,
            'loss': 1.0 - best_accuracy,
            'epochs_trained': epoch + 1,
            'early_stopped': patience_counter >= self.patience
        }

class ArchitectureBuilder:
    """Builds PyTorch models from architecture specifications"""
    
    @staticmethod
    def build(architecture: ArchitectureSpec) -> nn.Module:
        """Build model from architecture specification"""
        
        layers = []
        layer_modules = {}
        
        # Build individual layers
        for i, layer_spec in enumerate(architecture.layers):
            layer_type = layer_spec['type']
            layer_config = layer_spec['config']
            
            if layer_type == 'conv2d':
                layer = nn.Conv2d(
                    in_channels=layer_config.get('in_channels', 3),
                    out_channels=layer_config['out_channels'],
                    kernel_size=layer_config.get('kernel_size', 3),
                    stride=layer_config.get('stride', 1),
                    padding=layer_config.get('padding', 1)
                )
            elif layer_type == 'linear':
                layer = nn.Linear(
                    in_features=layer_config.get('in_features', 512),
                    out_features=layer_config['out_features']
                )
            elif layer_type == 'relu':
                layer = nn.ReLU(inplace=True)
            elif layer_type == 'pool':
                pool_type = layer_config.get('type', 'max')
                kernel_size = layer_config.get('kernel_size', 2)
                
                if pool_type == 'max':
                    layer = nn.MaxPool2d(kernel_size)
                else:
                    layer = nn.AvgPool2d(kernel_size)
            else:
                continue  # Skip unknown layer types
            
            layer_modules[i] = layer
        
        # Build model with connections
        model = DynamicModel(layer_modules, architecture.connections)
        
        return model

class DynamicModel(nn.Module):
    """Dynamic model that supports flexible connections"""
    
    def __init__(self, layer_modules: Dict[int, nn.Module], connections: List[Tuple[int, int]]):
        super().__init__()
        
        self.layer_modules = nn.ModuleDict({str(k): v for k, v in layer_modules.items()})
        self.connections = connections
        
        # Analyze connection topology
        self.execution_order = self._topological_sort()
        
    def _topological_sort(self) -> List[int]:
        """Topological sort of layers for execution order"""
        
        # Simple topological sort implementation
        in_degree = {}
        graph = {}
        
        # Initialize
        for layer_id in self.layer_modules.keys():
            layer_id_int = int(layer_id)
            in_degree[layer_id_int] = 0
            graph[layer_id_int] = []
        
        # Build graph and calculate in-degrees
        for src, dst in self.connections:
            graph[src].append(dst)
            in_degree[dst] += 1
        
        # Topological sort
        queue = [node for node, degree in in_degree.items() if degree == 0]
        execution_order = []
        
        while queue:
            current = queue.pop(0)
            execution_order.append(current)
            
            for neighbor in graph[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return execution_order
    
    def forward(self, x):
        """Forward pass through dynamic architecture"""
        
        layer_outputs = {}
        
        for layer_id in self.execution_order:
            layer_module = self.layer_modules[str(layer_id)]
            
            # Determine input for this layer
            if layer_id == 0:
                # First layer uses network input
                layer_input = x
            else:
                # Collect inputs from predecessor layers
                predecessor_outputs = []
                for src, dst in self.connections:
                    if dst == layer_id and src in layer_outputs:
                        predecessor_outputs.append(layer_outputs[src])
                
                if len(predecessor_outputs) == 1:
                    layer_input = predecessor_outputs[0]
                elif len(predecessor_outputs) > 1:
                    # Combine multiple inputs (simple concatenation or addition)
                    try:
                        layer_input = torch.cat(predecessor_outputs, dim=1)
                    except:
                        layer_input = sum(predecessor_outputs)
                else:
                    # No valid input, skip this layer
                    continue
            
            # Execute layer
            try:
                layer_output = layer_module(layer_input)
                layer_outputs[layer_id] = layer_output
            except Exception as e:
                logging.warning(f"Layer {layer_id} execution failed: {e}")
                layer_outputs[layer_id] = layer_input  # Pass input through
        
        # Return output from last layer
        if self.execution_order:
            final_output = layer_outputs.get(self.execution_order[-1], x)
        else:
            final_output = x
        
        return final_output

Best Practices Checklist

  • Define clear search space boundaries and constraints
  • Choose appropriate search strategy for your budget
  • Implement efficient architecture evaluation methods
  • Use weight sharing or early stopping for efficiency
  • Consider multiple objectives (accuracy, latency, size)
  • Implement proper architecture validation
  • Cache evaluation results to avoid recomputation
  • Monitor search progress and convergence
  • Validate discovered architectures on held-out data
  • Consider hardware-specific constraints early
  • Use progressive complexity for better exploration
  • Implement proper logging and reproducibility
  • Regular checkpointing of search progress
  • Post-process discovered architectures for deployment
  • Document search methodology and results

Conclusion

Neural Architecture Search transforms the art of neural network design into an automated optimization process. By implementing systematic search strategies, efficient evaluation methods, and proper constraint handling, you can discover architectures that often exceed human-designed alternatives. The key to successful NAS is balancing exploration efficiency with search thoroughness while maintaining practical deployment constraints. Start with clear objectives, implement efficient evaluation, and let the algorithms discover optimal architectures for your specific use case and hardware constraints.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles