Deploy LLMs in Production Successfully

David Childs

Deploy LLMs at scale with proven production patterns, infrastructure strategies, and performance optimization for enterprise applications.

Deploying LLMs in production is vastly different from running demos. After building systems that serve millions of LLM requests daily, I've learned that success requires careful architecture, robust infrastructure, and deep understanding of both the models and their operational characteristics. Here's your complete guide to production LLM deployment.

LLM Architecture Patterns

Production LLM System Design

# llm_architecture.py
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import asyncio
import aiohttp
from enum import Enum

class ModelProvider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    HUGGINGFACE = "huggingface"
    SELF_HOSTED = "self_hosted"

@dataclass
class LLMConfig:
    provider: ModelProvider
    model_name: str
    max_tokens: int = 2048
    temperature: float = 0.7
    top_p: float = 1.0
    timeout: int = 30
    retry_count: int = 3
    rate_limit: int = 100  # requests per minute

class LLMOrchestrator:
    def __init__(self, configs: List[LLMConfig]):
        self.configs = configs
        self.providers = self._initialize_providers()
        self.request_queue = asyncio.Queue()
        self.response_cache = {}
        
    def _initialize_providers(self) -> Dict:
        """Initialize provider connections"""
        providers = {}
        
        for config in self.configs:
            if config.provider == ModelProvider.OPENAI:
                providers[config.model_name] = OpenAIProvider(config)
            elif config.provider == ModelProvider.ANTHROPIC:
                providers[config.model_name] = AnthropicProvider(config)
            elif config.provider == ModelProvider.HUGGINGFACE:
                providers[config.model_name] = HuggingFaceProvider(config)
            elif config.provider == ModelProvider.SELF_HOSTED:
                providers[config.model_name] = SelfHostedProvider(config)
        
        return providers
    
    async def process_request(self, prompt: str, model: str = None,
                            context: Dict = None) -> Dict:
        """Process LLM request with fallback and caching"""
        
        # Check cache
        cache_key = self._generate_cache_key(prompt, model, context)
        if cache_key in self.response_cache:
            return self.response_cache[cache_key]
        
        # Select provider
        provider = self._select_provider(model)
        
        # Process with retry logic
        for attempt in range(provider.config.retry_count):
            try:
                response = await provider.generate(prompt, context)
                
                # Cache response
                self.response_cache[cache_key] = response
                
                # Log metrics
                await self._log_metrics(provider, response)
                
                return response
                
            except Exception as e:
                if attempt == provider.config.retry_count - 1:
                    # Try fallback provider
                    return await self._fallback_request(prompt, context)
                
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    def _select_provider(self, model: str = None) -> Any:
        """Select provider based on model and load"""
        
        if model and model in self.providers:
            return self.providers[model]
        
        # Load balancing logic
        return self._get_least_loaded_provider()
    
    def _get_least_loaded_provider(self):
        """Get provider with lowest current load"""
        
        min_load = float('inf')
        selected_provider = None
        
        for provider in self.providers.values():
            current_load = provider.get_current_load()
            if current_load < min_load:
                min_load = current_load
                selected_provider = provider
        
        return selected_provider

Model Serving Infrastructure

# model_serving.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import Optional, List, Dict
import ray
from ray import serve
import numpy as np

@serve.deployment(
    num_replicas=3,
    ray_actor_options={"num_gpus": 1},
    max_concurrent_queries=10,
)
class LLMModelServer:
    def __init__(self, model_name: str, quantization: str = None):
        self.model_name = model_name
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Load model with optimizations
        self.model = self._load_optimized_model(model_name, quantization)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        # Initialize metrics
        self.request_count = 0
        self.total_tokens = 0
        
    def _load_optimized_model(self, model_name: str, 
                             quantization: Optional[str] = None):
        """Load model with optimizations"""
        
        model_kwargs = {
            "torch_dtype": torch.float16,
            "device_map": "auto",
            "low_cpu_mem_usage": True,
        }
        
        if quantization == "8bit":
            model_kwargs["load_in_8bit"] = True
        elif quantization == "4bit":
            model_kwargs["load_in_4bit"] = True
            model_kwargs["bnb_4bit_compute_dtype"] = torch.float16
        
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            **model_kwargs
        )
        
        # Enable gradient checkpointing for memory efficiency
        if hasattr(model, "gradient_checkpointing_enable"):
            model.gradient_checkpointing_enable()
        
        # Compile model for faster inference (PyTorch 2.0+)
        if hasattr(torch, "compile"):
            model = torch.compile(model)
        
        return model
    
    async def __call__(self, request: Dict) -> Dict:
        """Handle inference request"""
        
        prompt = request.get("prompt", "")
        max_length = request.get("max_length", 100)
        temperature = request.get("temperature", 0.7)
        top_p = request.get("top_p", 0.9)
        stream = request.get("stream", False)
        
        # Tokenize input
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=2048
        ).to(self.device)
        
        # Generate response
        if stream:
            return await self._stream_generate(inputs, max_length, temperature, top_p)
        else:
            return await self._batch_generate(inputs, max_length, temperature, top_p)
    
    async def _batch_generate(self, inputs, max_length, temperature, top_p):
        """Batch generation"""
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_length,
                temperature=temperature,
                top_p=top_p,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id,
            )
        
        response = self.tokenizer.decode(
            outputs[0][inputs['input_ids'].shape[1]:],
            skip_special_tokens=True
        )
        
        # Update metrics
        self.request_count += 1
        self.total_tokens += outputs.shape[1]
        
        return {
            "response": response,
            "tokens_used": outputs.shape[1],
            "model": self.model_name
        }
    
    async def _stream_generate(self, inputs, max_length, temperature, top_p):
        """Streaming generation"""
        
        from transformers import TextIteratorStreamer
        import threading
        
        streamer = TextIteratorStreamer(
            self.tokenizer,
            skip_special_tokens=True
        )
        
        # Generate in separate thread
        generation_kwargs = {
            **inputs,
            "max_new_tokens": max_length,
            "temperature": temperature,
            "top_p": top_p,
            "do_sample": True,
            "streamer": streamer,
        }
        
        thread = threading.Thread(
            target=self.model.generate,
            kwargs=generation_kwargs
        )
        thread.start()
        
        # Stream tokens
        for token in streamer:
            yield {"token": token, "finished": False}
        
        yield {"token": "", "finished": True}

Scaling Strategies

Horizontal Scaling with Load Balancing

# scaling_manager.py
import asyncio
from typing import List, Dict
import aioredis
import kubernetes
from kubernetes import client, config

class LLMScalingManager:
    def __init__(self, min_replicas: int = 1, max_replicas: int = 10):
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.current_replicas = min_replicas
        
        # Initialize Kubernetes client
        config.load_incluster_config()  # For in-cluster deployment
        self.k8s_apps = client.AppsV1Api()
        self.k8s_core = client.CoreV1Api()
        
        # Metrics storage
        self.redis = None
        self.metrics = {
            'request_count': 0,
            'avg_latency': 0,
            'queue_size': 0,
            'gpu_utilization': 0
        }
    
    async def initialize(self):
        """Initialize connections"""
        self.redis = await aioredis.create_redis_pool('redis://localhost')
    
    async def autoscale(self):
        """Autoscaling logic based on metrics"""
        
        while True:
            metrics = await self.collect_metrics()
            
            # Scaling decision
            target_replicas = self.calculate_target_replicas(metrics)
            
            if target_replicas != self.current_replicas:
                await self.scale_deployment(target_replicas)
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    def calculate_target_replicas(self, metrics: Dict) -> int:
        """Calculate target number of replicas"""
        
        # Scaling based on multiple metrics
        scale_factors = []
        
        # Queue size based scaling
        if metrics['queue_size'] > 100:
            scale_factors.append(min(
                metrics['queue_size'] / 50,
                self.max_replicas
            ))
        
        # Latency based scaling
        if metrics['avg_latency'] > 5000:  # 5 seconds
            scale_factors.append(min(
                metrics['avg_latency'] / 2000,
                self.max_replicas
            ))
        
        # GPU utilization based scaling
        if metrics['gpu_utilization'] > 80:
            scale_factors.append(self.current_replicas + 1)
        elif metrics['gpu_utilization'] < 30 and self.current_replicas > self.min_replicas:
            scale_factors.append(self.current_replicas - 1)
        
        if scale_factors:
            target = int(max(scale_factors))
        else:
            target = self.current_replicas
        
        return max(self.min_replicas, min(target, self.max_replicas))
    
    async def scale_deployment(self, replicas: int):
        """Scale Kubernetes deployment"""
        
        try:
            # Update deployment
            deployment = self.k8s_apps.read_namespaced_deployment(
                name="llm-server",
                namespace="default"
            )
            
            deployment.spec.replicas = replicas
            
            self.k8s_apps.patch_namespaced_deployment(
                name="llm-server",
                namespace="default",
                body=deployment
            )
            
            self.current_replicas = replicas
            
            # Log scaling event
            await self.redis.lpush(
                'scaling_events',
                f"Scaled to {replicas} replicas"
            )
            
        except Exception as e:
            print(f"Scaling error: {e}")
    
    async def collect_metrics(self) -> Dict:
        """Collect metrics from various sources"""
        
        metrics = {}
        
        # Get queue size from Redis
        queue_size = await self.redis.llen('request_queue')
        metrics['queue_size'] = queue_size
        
        # Get latency metrics
        latencies = await self.redis.lrange('latencies', 0, 100)
        if latencies:
            metrics['avg_latency'] = sum(map(float, latencies)) / len(latencies)
        else:
            metrics['avg_latency'] = 0
        
        # Get GPU metrics from pods
        gpu_utils = []
        pods = self.k8s_core.list_namespaced_pod(
            namespace="default",
            label_selector="app=llm-server"
        )
        
        for pod in pods.items:
            # Get GPU metrics (requires nvidia-smi or similar)
            gpu_util = await self.get_pod_gpu_utilization(pod.metadata.name)
            if gpu_util:
                gpu_utils.append(gpu_util)
        
        metrics['gpu_utilization'] = sum(gpu_utils) / len(gpu_utils) if gpu_utils else 0
        
        return metrics

Request Batching and Queueing

# request_batching.py
import asyncio
from typing import List, Dict, Any
import time
from collections import deque
import numpy as np

class BatchProcessor:
    def __init__(self, 
                 batch_size: int = 8,
                 max_wait_time: float = 0.1,
                 max_sequence_length: int = 2048):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.max_sequence_length = max_sequence_length
        
        self.request_queue = deque()
        self.processing = False
        
    async def add_request(self, request: Dict) -> Any:
        """Add request to batch queue"""
        
        future = asyncio.Future()
        
        self.request_queue.append({
            'request': request,
            'future': future,
            'timestamp': time.time()
        })
        
        # Start processing if not already running
        if not self.processing:
            asyncio.create_task(self._process_batches())
        
        return await future
    
    async def _process_batches(self):
        """Process requests in batches"""
        
        self.processing = True
        
        while self.request_queue:
            batch = []
            batch_start_time = time.time()
            
            # Collect batch
            while len(batch) < self.batch_size and self.request_queue:
                # Check if we've waited too long
                if batch and (time.time() - batch_start_time) > self.max_wait_time:
                    break
                
                if self.request_queue:
                    batch.append(self.request_queue.popleft())
                else:
                    await asyncio.sleep(0.01)
            
            if batch:
                # Process batch
                await self._process_batch(batch)
        
        self.processing = False
    
    async def _process_batch(self, batch: List[Dict]):
        """Process a batch of requests"""
        
        try:
            # Prepare batch inputs
            prompts = [item['request']['prompt'] for item in batch]
            
            # Pad sequences for efficient processing
            padded_inputs = self._pad_sequences(prompts)
            
            # Run inference
            results = await self._run_batch_inference(padded_inputs)
            
            # Distribute results
            for i, item in enumerate(batch):
                item['future'].set_result(results[i])
                
        except Exception as e:
            # Handle errors
            for item in batch:
                item['future'].set_exception(e)
    
    def _pad_sequences(self, sequences: List[str]) -> Dict:
        """Pad sequences to same length for batching"""
        
        # This would use actual tokenizer in production
        max_length = min(
            max(len(seq) for seq in sequences),
            self.max_sequence_length
        )
        
        padded = []
        attention_masks = []
        
        for seq in sequences:
            if len(seq) < max_length:
                padding_length = max_length - len(seq)
                padded_seq = seq + ' ' * padding_length
                mask = [1] * len(seq) + [0] * padding_length
            else:
                padded_seq = seq[:max_length]
                mask = [1] * max_length
            
            padded.append(padded_seq)
            attention_masks.append(mask)
        
        return {
            'sequences': padded,
            'attention_masks': attention_masks
        }

Caching and Optimization

Multi-Level Caching Strategy

# caching_system.py
import hashlib
import json
import time
from typing import Optional, Dict, Any
import redis
import asyncio
from functools import lru_cache

class LLMCacheSystem:
    def __init__(self, 
                 redis_host: str = "localhost",
                 redis_port: int = 6379,
                 ttl: int = 3600):
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            decode_responses=True
        )
        self.ttl = ttl
        
        # Local LRU cache for hot data
        self.local_cache_size = 1000
        self._local_cache = {}
        
        # Semantic cache for similar queries
        self.semantic_cache = SemanticCache()
        
    def generate_cache_key(self, prompt: str, params: Dict) -> str:
        """Generate deterministic cache key"""
        
        cache_data = {
            'prompt': prompt,
            'temperature': params.get('temperature', 0.7),
            'max_tokens': params.get('max_tokens', 100),
            'model': params.get('model', 'default')
        }
        
        cache_str = json.dumps(cache_data, sort_keys=True)
        return hashlib.sha256(cache_str.encode()).hexdigest()
    
    async def get_cached_response(self, prompt: str, params: Dict) -> Optional[Dict]:
        """Get response from cache hierarchy"""
        
        cache_key = self.generate_cache_key(prompt, params)
        
        # Check L1 cache (local memory)
        if cache_key in self._local_cache:
            self._update_cache_stats('l1_hit')
            return self._local_cache[cache_key]
        
        # Check L2 cache (Redis)
        cached = self.redis_client.get(cache_key)
        if cached:
            self._update_cache_stats('l2_hit')
            response = json.loads(cached)
            
            # Promote to L1
            self._local_cache[cache_key] = response
            self._manage_local_cache_size()
            
            return response
        
        # Check semantic cache
        similar_response = await self.semantic_cache.find_similar(prompt)
        if similar_response:
            self._update_cache_stats('semantic_hit')
            return similar_response
        
        self._update_cache_stats('miss')
        return None
    
    async def cache_response(self, prompt: str, params: Dict, 
                           response: Dict):
        """Cache response at multiple levels"""
        
        cache_key = self.generate_cache_key(prompt, params)
        
        # Cache in Redis with TTL
        self.redis_client.setex(
            cache_key,
            self.ttl,
            json.dumps(response)
        )
        
        # Cache locally
        self._local_cache[cache_key] = response
        self._manage_local_cache_size()
        
        # Add to semantic cache
        await self.semantic_cache.add(prompt, response)
    
    def _manage_local_cache_size(self):
        """Manage local cache size with LRU eviction"""
        
        if len(self._local_cache) > self.local_cache_size:
            # Remove oldest entries (simple FIFO for demonstration)
            # In production, use proper LRU implementation
            for key in list(self._local_cache.keys())[:100]:
                del self._local_cache[key]
    
    def _update_cache_stats(self, stat_type: str):
        """Update cache statistics"""
        
        self.redis_client.hincrby('cache_stats', stat_type, 1)
        self.redis_client.hincrby('cache_stats', 'total_requests', 1)

class SemanticCache:
    """Semantic similarity based caching"""
    
    def __init__(self, similarity_threshold: float = 0.95):
        self.similarity_threshold = similarity_threshold
        self.embeddings = {}  # In production, use vector DB
        
    async def find_similar(self, prompt: str) -> Optional[Dict]:
        """Find semantically similar cached response"""
        
        prompt_embedding = await self._get_embedding(prompt)
        
        for cached_prompt, (cached_embedding, response) in self.embeddings.items():
            similarity = self._cosine_similarity(prompt_embedding, cached_embedding)
            
            if similarity > self.similarity_threshold:
                return response
        
        return None
    
    async def add(self, prompt: str, response: Dict):
        """Add to semantic cache"""
        
        embedding = await self._get_embedding(prompt)
        self.embeddings[prompt] = (embedding, response)
        
        # Limit cache size
        if len(self.embeddings) > 10000:
            # Remove oldest entries
            oldest = list(self.embeddings.keys())[:1000]
            for key in oldest:
                del self.embeddings[key]
    
    async def _get_embedding(self, text: str):
        """Get text embedding (simplified)"""
        
        # In production, use actual embedding model
        # For now, return mock embedding
        import hashlib
        hash_obj = hashlib.md5(text.encode())
        hash_hex = hash_obj.hexdigest()
        
        # Convert to vector (mock)
        vector = [int(hash_hex[i:i+2], 16) / 255 for i in range(0, 32, 2)]
        return vector
    
    def _cosine_similarity(self, vec1, vec2):
        """Calculate cosine similarity"""
        
        dot_product = sum(a * b for a, b in zip(vec1, vec2))
        norm1 = sum(a ** 2 for a in vec1) ** 0.5
        norm2 = sum(b ** 2 for b in vec2) ** 0.5
        
        return dot_product / (norm1 * norm2) if norm1 * norm2 != 0 else 0

Monitoring and Observability

Comprehensive Monitoring System

# monitoring.py
import time
import asyncio
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from typing import Dict, Any
import logging
from dataclasses import dataclass
from datetime import datetime

@dataclass
class LLMMetrics:
    # Prometheus metrics
    request_counter = Counter('llm_requests_total', 'Total LLM requests', ['model', 'status'])
    request_duration = Histogram('llm_request_duration_seconds', 'Request duration', ['model'])
    tokens_processed = Counter('llm_tokens_processed_total', 'Total tokens processed', ['model'])
    active_requests = Gauge('llm_active_requests', 'Currently active requests', ['model'])
    cache_hits = Counter('llm_cache_hits_total', 'Cache hit rate', ['cache_level'])
    error_counter = Counter('llm_errors_total', 'Total errors', ['model', 'error_type'])
    gpu_utilization = Gauge('llm_gpu_utilization_percent', 'GPU utilization', ['gpu_id'])
    memory_usage = Gauge('llm_memory_usage_bytes', 'Memory usage', ['type'])

class LLMMonitor:
    def __init__(self):
        self.metrics = LLMMetrics()
        self.logger = self._setup_logging()
        
        # Start Prometheus metrics server
        start_http_server(8000)
        
        # Track request patterns
        self.request_patterns = {}
        
    def _setup_logging(self):
        """Setup structured logging"""
        
        logger = logging.getLogger('llm_monitor')
        logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    async def track_request(self, model: str, request: Dict) -> Any:
        """Track LLM request with metrics"""
        
        request_id = self._generate_request_id()
        
        # Increment active requests
        self.metrics.active_requests.labels(model=model).inc()
        
        # Log request
        self.logger.info(f"Request {request_id} started", extra={
            'request_id': request_id,
            'model': model,
            'prompt_length': len(request.get('prompt', '')),
            'timestamp': datetime.utcnow().isoformat()
        })
        
        start_time = time.time()
        
        try:
            # Process request (placeholder)
            response = await self._process_request(model, request)
            
            # Record success metrics
            duration = time.time() - start_time
            self.metrics.request_counter.labels(model=model, status='success').inc()
            self.metrics.request_duration.labels(model=model).observe(duration)
            
            # Track tokens
            tokens_used = response.get('tokens_used', 0)
            self.metrics.tokens_processed.labels(model=model).inc(tokens_used)
            
            # Log completion
            self.logger.info(f"Request {request_id} completed", extra={
                'request_id': request_id,
                'duration': duration,
                'tokens_used': tokens_used
            })
            
            return response
            
        except Exception as e:
            # Record error metrics
            self.metrics.request_counter.labels(model=model, status='error').inc()
            self.metrics.error_counter.labels(
                model=model,
                error_type=type(e).__name__
            ).inc()
            
            # Log error
            self.logger.error(f"Request {request_id} failed", extra={
                'request_id': request_id,
                'error': str(e),
                'error_type': type(e).__name__
            })
            
            raise
            
        finally:
            # Decrement active requests
            self.metrics.active_requests.labels(model=model).dec()
    
    async def monitor_system_resources(self):
        """Monitor system resources continuously"""
        
        import psutil
        import GPUtil
        
        while True:
            # CPU and Memory
            cpu_percent = psutil.cpu_percent()
            memory = psutil.virtual_memory()
            
            self.metrics.memory_usage.labels(type='system').set(memory.used)
            
            # GPU metrics
            try:
                gpus = GPUtil.getGPUs()
                for gpu in gpus:
                    self.metrics.gpu_utilization.labels(gpu_id=gpu.id).set(gpu.load * 100)
                    self.metrics.memory_usage.labels(type=f'gpu_{gpu.id}').set(
                        gpu.memoryUsed * 1024 * 1024
                    )
            except:
                pass
            
            await asyncio.sleep(10)
    
    def _generate_request_id(self) -> str:
        """Generate unique request ID"""
        
        import uuid
        return str(uuid.uuid4())
    
    async def _process_request(self, model: str, request: Dict) -> Dict:
        """Process request (placeholder)"""
        
        # Simulate processing
        await asyncio.sleep(0.1)
        
        return {
            'response': 'Generated response',
            'tokens_used': 100
        }

Cost Optimization

Cost Management System

# cost_optimization.py
class LLMCostOptimizer:
    def __init__(self):
        self.pricing = {
            'gpt-4': {'input': 0.03, 'output': 0.06},  # per 1K tokens
            'gpt-3.5-turbo': {'input': 0.001, 'output': 0.002},
            'claude-2': {'input': 0.008, 'output': 0.024},
            'self-hosted': {'input': 0.0001, 'output': 0.0001}  # Estimated
        }
        
        self.usage_tracker = {}
        self.cost_limits = {}
        
    async def route_request(self, request: Dict) -> str:
        """Route request to most cost-effective model"""
        
        complexity = self._assess_complexity(request['prompt'])
        
        if complexity == 'simple':
            # Use cheaper model for simple requests
            return 'gpt-3.5-turbo'
        elif complexity == 'medium':
            # Balance cost and quality
            return 'claude-2'
        else:
            # Use best model for complex requests
            return 'gpt-4'
    
    def _assess_complexity(self, prompt: str) -> str:
        """Assess prompt complexity"""
        
        # Simple heuristics (in production, use ML classifier)
        word_count = len(prompt.split())
        
        if word_count < 50:
            return 'simple'
        elif word_count < 200:
            return 'medium'
        else:
            return 'complex'
    
    def track_usage(self, model: str, input_tokens: int, output_tokens: int):
        """Track token usage for cost calculation"""
        
        if model not in self.usage_tracker:
            self.usage_tracker[model] = {'input': 0, 'output': 0}
        
        self.usage_tracker[model]['input'] += input_tokens
        self.usage_tracker[model]['output'] += output_tokens
    
    def calculate_costs(self) -> Dict:
        """Calculate current costs"""
        
        total_cost = 0
        breakdown = {}
        
        for model, usage in self.usage_tracker.items():
            if model in self.pricing:
                input_cost = (usage['input'] / 1000) * self.pricing[model]['input']
                output_cost = (usage['output'] / 1000) * self.pricing[model]['output']
                
                model_cost = input_cost + output_cost
                total_cost += model_cost
                
                breakdown[model] = {
                    'input_tokens': usage['input'],
                    'output_tokens': usage['output'],
                    'cost': model_cost
                }
        
        return {
            'total_cost': total_cost,
            'breakdown': breakdown
        }
    
    async def optimize_batch_size(self, current_batch_size: int,
                                 latency_target: float) -> int:
        """Dynamically optimize batch size for cost/performance"""
        
        # Measure current performance
        current_latency = await self._measure_latency(current_batch_size)
        
        if current_latency < latency_target * 0.8:
            # Can increase batch size for better efficiency
            return min(current_batch_size + 2, 32)
        elif current_latency > latency_target:
            # Need to reduce batch size
            return max(current_batch_size - 2, 1)
        else:
            return current_batch_size

Deployment Configuration

Kubernetes Deployment

# llm-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-server
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-server
  template:
    metadata:
      labels:
        app: llm-server
    spec:
      nodeSelector:
        gpu: "true"
      containers:
      - name: llm-container
        image: llm-server:latest
        resources:
          requests:
            memory: "16Gi"
            cpu: "4"
            nvidia.com/gpu: "1"
          limits:
            memory: "32Gi"
            cpu: "8"
            nvidia.com/gpu: "1"
        env:
        - name: MODEL_NAME
          value: "llama-2-7b"
        - name: CACHE_ENABLED
          value: "true"
        - name: MAX_BATCH_SIZE
          value: "8"
        ports:
        - containerPort: 8080
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 300
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: llm-service
spec:
  selector:
    app: llm-server
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-server
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: gpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: llm_queue_size
      target:
        type: AverageValue
        averageValue: "30"

Best Practices Checklist

  • Implement request batching for efficiency
  • Use multi-level caching strategy
  • Set up comprehensive monitoring
  • Implement autoscaling policies
  • Use model quantization when appropriate
  • Configure request routing based on complexity
  • Implement circuit breakers for resilience
  • Set up cost tracking and limits
  • Use streaming for long responses
  • Implement proper error handling and retries
  • Configure load balancing across models
  • Set up A/B testing framework
  • Monitor and optimize token usage
  • Implement rate limiting per user/API key
  • Regular model performance benchmarking

Conclusion

Deploying LLMs in production requires careful orchestration of models, infrastructure, and monitoring. Success comes from balancing performance, cost, and reliability while maintaining the flexibility to adapt to changing requirements. Start with a solid foundation, monitor everything, and continuously optimize based on real-world usage patterns. Remember, the goal is not just to deploy an LLM, but to create a sustainable, scalable AI system that delivers value reliably.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles