AWS Lambda Patterns for Production

David Childs

Master serverless architecture with AWS Lambda patterns for event processing, API development, and cost-optimized computing at scale.

Serverless promised to eliminate server management, but it introduced new complexities. After building Lambda functions processing billions of events monthly, I've identified patterns that separate successful serverless applications from costly failures. Here's what actually works in production.

Lambda Architecture Patterns

Event-Driven Processing

# event_processor.py
import json
import boto3
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.tracing import capture_lambda_handler

logger = Logger()
tracer = Tracer()
metrics = Metrics()

@tracer.capture_lambda_handler
@logger.inject_lambda_context(correlation_id_path=correlation_paths.EVENT_BRIDGE)
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event, context):
    """Process events from EventBridge"""
    
    try:
        # Parse event
        detail = event.get('detail', {})
        event_type = detail.get('eventType')
        
        # Route to appropriate processor
        processors = {
            'ORDER_CREATED': process_order,
            'PAYMENT_COMPLETED': process_payment,
            'SHIPMENT_DISPATCHED': process_shipment
        }
        
        processor = processors.get(event_type)
        if not processor:
            logger.warning(f"Unknown event type: {event_type}")
            return {'statusCode': 400, 'body': 'Unknown event type'}
        
        # Process event
        result = processor(detail)
        
        # Record metrics
        metrics.add_metric(name="EventProcessed", unit=MetricUnit.Count, value=1)
        metrics.add_metadata(key="event_type", value=event_type)
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
        
    except Exception as e:
        logger.exception(f"Error processing event: {str(e)}")
        metrics.add_metric(name="EventProcessingError", unit=MetricUnit.Count, value=1)
        raise

@tracer.capture_method
def process_order(order_detail):
    """Process order creation"""
    # Business logic here
    return {"status": "processed", "orderId": order_detail.get('orderId')}

API Gateway Integration

# api_handler.py
from typing import Dict, Any
import json
from decimal import Decimal
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler import APIGatewayRestResolver, Response
from aws_lambda_powertools.event_handler.exceptions import NotFoundError
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.middleware import middleware

logger = Logger()
app = APIGatewayRestResolver()

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super(DecimalEncoder, self).default(obj)

@app.get("/products")
@tracer.capture_method
def get_products():
    """List all products with pagination"""
    query_params = app.current_event.query_string_parameters or {}
    page = int(query_params.get('page', 1))
    limit = int(query_params.get('limit', 10))
    
    # Implement pagination
    products = fetch_products_from_db(page, limit)
    
    return Response(
        status_code=200,
        content_type="application/json",
        body=json.dumps(products, cls=DecimalEncoder),
        headers={"X-Total-Count": str(len(products))}
    )

@app.post("/products")
@tracer.capture_method
def create_product():
    """Create new product"""
    try:
        product_data = app.current_event.json_body
        
        # Validate input
        validated_data = validate_product(product_data)
        
        # Save to database
        product = save_product(validated_data)
        
        return Response(
            status_code=201,
            content_type="application/json",
            body=json.dumps(product, cls=DecimalEncoder)
        )
    except ValueError as e:
        return Response(
            status_code=400,
            body=json.dumps({"error": str(e)})
        )

@app.get("/products/<product_id>")
@tracer.capture_method
def get_product(product_id: str):
    """Get single product"""
    try:
        product = fetch_product_by_id(product_id)
        if not product:
            raise NotFoundError(f"Product {product_id} not found")
        
        return Response(
            status_code=200,
            content_type="application/json",
            body=json.dumps(product, cls=DecimalEncoder)
        )
    except NotFoundError as e:
        return Response(
            status_code=404,
            body=json.dumps({"error": str(e)})
        )

@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    return app.resolve(event, context)

Cold Start Optimization

Reducing Initialization Time

# optimized_lambda.py
import os
import sys

# Lazy imports to reduce cold start
_boto3 = None
_pandas = None

def get_boto3():
    global _boto3
    if _boto3 is None:
        import boto3
        _boto3 = boto3
    return _boto3

def get_pandas():
    global _pandas
    if _pandas is None:
        import pandas
        _pandas = pandas
    return _pandas

# Pre-warm connections outside handler
WARM_CONNECTIONS = {}

def get_db_connection():
    """Reuse database connections across invocations"""
    if 'db' not in WARM_CONNECTIONS:
        import psycopg2
        WARM_CONNECTIONS['db'] = psycopg2.connect(
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD']
        )
    return WARM_CONNECTIONS['db']

# Provisioned concurrency configuration
def configure_provisioned_concurrency():
    """Terraform configuration for provisioned concurrency"""
    return """
    resource "aws_lambda_provisioned_concurrency_config" "main" {
      function_name                     = aws_lambda_function.main.function_name
      provisioned_concurrent_executions = 5
      qualifier                         = aws_lambda_alias.live.name
    }
    
    resource "aws_appautoscaling_target" "lambda" {
      max_capacity       = 100
      min_capacity       = 5
      resource_id        = "function:${aws_lambda_function.main.function_name}:provisioned-concurrency/${aws_lambda_alias.live.name}"
      scalable_dimension = "lambda:function:ProvisionedConcurrency"
      service_namespace  = "lambda"
    }
    """

def lambda_handler(event, context):
    """Optimized handler with lazy loading"""
    
    # Use connection from warm start
    db = get_db_connection()
    
    # Lazy load heavy libraries only when needed
    if event.get('requiresAnalysis'):
        pd = get_pandas()
        # Use pandas here
    
    # Process event
    return {'statusCode': 200}

Asynchronous Processing Patterns

SQS Integration with Dead Letter Queue

# sqs_processor.py
import json
import boto3
from typing import Dict, List
from dataclasses import dataclass
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes import SQSRecord

logger = Logger()
processor = BatchProcessor(event_type=EventType.SQS)

@dataclass
class ProcessingResult:
    success: bool
    message_id: str
    error: str = None

def record_handler(record: SQSRecord) -> ProcessingResult:
    """Process individual SQS message"""
    try:
        # Parse message body
        message = json.loads(record.body)
        
        # Process message based on type
        message_type = message.get('type')
        
        if message_type == 'email':
            send_email(message)
        elif message_type == 'sms':
            send_sms(message)
        else:
            raise ValueError(f"Unknown message type: {message_type}")
        
        logger.info(f"Successfully processed message {record.message_id}")
        return ProcessingResult(success=True, message_id=record.message_id)
        
    except Exception as e:
        logger.exception(f"Error processing message {record.message_id}: {str(e)}")
        # Message will go to DLQ after max retries
        raise

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    """Process batch of SQS messages"""
    return processor.response()

# Terraform configuration for SQS with DLQ
def sqs_configuration():
    return """
    resource "aws_sqs_queue" "main" {
      name                       = "processing-queue"
      visibility_timeout_seconds = 300
      message_retention_seconds  = 1209600
      redrive_policy = jsonencode({
        deadLetterTargetArn = aws_sqs_queue.dlq.arn
        maxReceiveCount     = 3
      })
    }
    
    resource "aws_sqs_queue" "dlq" {
      name                      = "processing-queue-dlq"
      message_retention_seconds = 1209600
    }
    
    resource "aws_lambda_event_source_mapping" "sqs" {
      event_source_arn                   = aws_sqs_queue.main.arn
      function_name                      = aws_lambda_function.processor.arn
      batch_size                         = 10
      maximum_batching_window_in_seconds = 5
      
      scaling_config {
        maximum_concurrency = 50
      }
    }
    """

Step Functions Integration

Complex Workflow Orchestration

{
  "Comment": "Order processing workflow",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789:function:validate-order",
      "Next": "ProcessPayment",
      "Catch": [{
        "ErrorEquals": ["ValidationError"],
        "Next": "OrderValidationFailed"
      }],
      "Retry": [{
        "ErrorEquals": ["States.TaskFailed"],
        "IntervalSeconds": 2,
        "MaxAttempts": 3,
        "BackoffRate": 2.0
      }]
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "process-payment",
        "Payload.$": "$"
      },
      "ResultPath": "$.paymentResult",
      "Next": "PaymentSuccess?"
    },
    "PaymentSuccess?": {
      "Type": "Choice",
      "Choices": [{
        "Variable": "$.paymentResult.status",
        "StringEquals": "SUCCESS",
        "Next": "FulfillOrder"
      }],
      "Default": "PaymentFailed"
    },
    "FulfillOrder": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "UpdateInventory",
          "States": {
            "UpdateInventory": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789:function:update-inventory",
              "End": true
            }
          }
        },
        {
          "StartAt": "SendConfirmation",
          "States": {
            "SendConfirmation": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:us-east-1:123456789:function:send-confirmation",
              "End": true
            }
          }
        }
      ],
      "Next": "OrderComplete"
    },
    "OrderComplete": {
      "Type": "Succeed"
    },
    "PaymentFailed": {
      "Type": "Fail",
      "Error": "PaymentFailed",
      "Cause": "Payment processing failed"
    },
    "OrderValidationFailed": {
      "Type": "Fail",
      "Error": "ValidationError",
      "Cause": "Order validation failed"
    }
  }
}

Lambda Layers and Shared Code

Creating Reusable Layers

#!/bin/bash
# create_layer.sh

# Create layer directory structure
mkdir -p layer/python/lib/python3.9/site-packages

# Install dependencies
pip install -r requirements.txt -t layer/python/lib/python3.9/site-packages/

# Create layer zip
cd layer
zip -r ../lambda-layer.zip .
cd ..

# Upload layer
aws lambda publish-layer-version \
    --layer-name shared-dependencies \
    --description "Shared dependencies for Lambda functions" \
    --zip-file fileb://lambda-layer.zip \
    --compatible-runtimes python3.9 \
    --compatible-architectures "x86_64" "arm64"

Layer Usage Pattern

# shared_layer/utils.py
import boto3
from functools import lru_cache
from typing import Dict, Any

@lru_cache(maxsize=1)
def get_ssm_client():
    """Cached SSM client"""
    return boto3.client('ssm')

@lru_cache(maxsize=128)
def get_parameter(name: str, decrypt: bool = True) -> str:
    """Cached parameter retrieval"""
    ssm = get_ssm_client()
    response = ssm.get_parameter(
        Name=name,
        WithDecryption=decrypt
    )
    return response['Parameter']['Value']

def get_secret(secret_name: str) -> Dict[str, Any]:
    """Retrieve secret from Secrets Manager"""
    sm = boto3.client('secretsmanager')
    response = sm.get_secret_value(SecretId=secret_name)
    return json.loads(response['SecretString'])

# Lambda function using the layer
import sys
sys.path.insert(0, '/opt/python')
from utils import get_parameter, get_secret

def lambda_handler(event, context):
    api_key = get_parameter('/app/api_key')
    db_creds = get_secret('rds-credentials')
    # Use the retrieved values

Error Handling and Retries

Robust Error Handling

# error_handler.py
from enum import Enum
from typing import Optional
import json
from functools import wraps
from aws_lambda_powertools import Logger

logger = Logger()

class ErrorType(Enum):
    RETRYABLE = "RETRYABLE"
    NON_RETRYABLE = "NON_RETRYABLE"
    THROTTLE = "THROTTLE"

class LambdaError(Exception):
    def __init__(self, message: str, error_type: ErrorType, status_code: int = 500):
        self.message = message
        self.error_type = error_type
        self.status_code = status_code
        super().__init__(self.message)

def error_handler(func):
    """Decorator for consistent error handling"""
    @wraps(func)
    def wrapper(event, context):
        try:
            return func(event, context)
        except LambdaError as e:
            logger.error(f"Lambda error: {e.message}", extra={"error_type": e.error_type.value})
            
            if e.error_type == ErrorType.RETRYABLE:
                # Let Lambda retry
                raise
            
            return {
                'statusCode': e.status_code,
                'body': json.dumps({
                    'error': e.message,
                    'type': e.error_type.value
                })
            }
        except Exception as e:
            logger.exception("Unexpected error")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': 'Internal server error',
                    'type': ErrorType.NON_RETRYABLE.value
                })
            }
    return wrapper

@error_handler
def lambda_handler(event, context):
    """Main handler with error handling"""
    
    # Validate input
    if not event.get('userId'):
        raise LambdaError(
            "Missing userId",
            ErrorType.NON_RETRYABLE,
            400
        )
    
    # Simulate retryable error
    if should_retry():
        raise LambdaError(
            "Temporary service unavailable",
            ErrorType.RETRYABLE,
            503
        )
    
    return {'statusCode': 200, 'body': 'Success'}

Performance Optimization

Memory and CPU Optimization

# performance_optimizer.py
import time
import json
from concurrent.futures import ThreadPoolExecutor
import multiprocessing

def optimize_memory(memory_mb: int) -> int:
    """Calculate optimal memory based on workload"""
    
    # Memory to vCPU mapping
    if memory_mb <= 1769:
        vcpu = memory_mb / 1769
    elif memory_mb <= 3008:
        vcpu = 2
    else:
        vcpu = memory_mb / 1769
    
    # For CPU-intensive workloads, maximize vCPU
    if is_cpu_intensive():
        return max(3008, memory_mb)  # At least 2 vCPU
    
    # For I/O-bound workloads, minimize memory
    return min(512, memory_mb)

def parallel_processing(items: list, processor_func):
    """Process items in parallel using available cores"""
    
    # Lambda provides 2 vCPU at 3008 MB
    max_workers = multiprocessing.cpu_count()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(processor_func, items))
    
    return results

# Benchmark different memory configurations
def benchmark_configurations():
    """Test different memory allocations"""
    configurations = [128, 256, 512, 1024, 2048, 3008]
    results = {}
    
    for memory in configurations:
        start = time.time()
        
        # Your workload here
        perform_workload()
        
        duration = time.time() - start
        cost = calculate_cost(memory, duration)
        
        results[memory] = {
            'duration': duration,
            'cost': cost,
            'cost_per_request': cost / 1000
        }
    
    return results

Cost Optimization Strategies

Request/Response Size Optimization

# compression.py
import gzip
import base64
import json

def compress_response(data: dict) -> str:
    """Compress large responses"""
    json_str = json.dumps(data)
    
    # Check if compression is worthwhile
    if len(json_str) < 1000:
        return json_str
    
    compressed = gzip.compress(json_str.encode())
    encoded = base64.b64encode(compressed).decode()
    
    return {
        'isBase64Encoded': True,
        'headers': {
            'Content-Encoding': 'gzip',
            'Content-Type': 'application/json'
        },
        'body': encoded
    }

def batch_processor(records: list, batch_size: int = 25):
    """Process records in batches to optimize cost"""
    
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        
        # Process batch in single invocation
        process_batch(batch)
        
        # This reduces number of invocations
        # and therefore reduces cost

Reserved Concurrency Management

# concurrency.tf
resource "aws_lambda_function" "api" {
  function_name = "api-handler"
  runtime       = "python3.9"
  
  # Reserve concurrency for critical functions
  reserved_concurrent_executions = 100
  
  environment {
    variables = {
      # Environment-specific configuration
      STAGE = var.environment
    }
  }
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.api.function_name
  principal     = "apigateway.amazonaws.com"
}

Testing Lambda Functions

Unit Testing

# test_lambda.py
import pytest
import json
from unittest.mock import patch, MagicMock
from lambda_function import lambda_handler

@pytest.fixture
def api_gateway_event():
    """Generate API Gateway event"""
    return {
        "httpMethod": "GET",
        "path": "/products",
        "queryStringParameters": {"page": "1"},
        "headers": {"Authorization": "Bearer token"},
        "body": None
    }

@patch('lambda_function.get_products_from_db')
def test_get_products(mock_db, api_gateway_event):
    """Test product listing"""
    mock_db.return_value = [
        {"id": 1, "name": "Product 1"},
        {"id": 2, "name": "Product 2"}
    ]
    
    response = lambda_handler(api_gateway_event, {})
    
    assert response['statusCode'] == 200
    body = json.loads(response['body'])
    assert len(body) == 2

@patch.dict('os.environ', {'DB_HOST': 'test-db'})
def test_environment_variables():
    """Test environment variable handling"""
    from lambda_function import get_db_config
    
    config = get_db_config()
    assert config['host'] == 'test-db'

Integration Testing

# integration_test.py
import boto3
import json

def test_lambda_integration():
    """Test Lambda function with real AWS services"""
    lambda_client = boto3.client('lambda')
    
    # Invoke function
    response = lambda_client.invoke(
        FunctionName='test-function',
        InvocationType='RequestResponse',
        Payload=json.dumps({
            'test': True,
            'data': 'test-data'
        })
    )
    
    # Verify response
    result = json.loads(response['Payload'].read())
    assert result['statusCode'] == 200

Monitoring and Observability

CloudWatch Insights Queries

-- Find cold starts
fields @timestamp, @initDuration
| filter @type = "REPORT"
| filter @initDuration > 0
| stats count(), avg(@initDuration), max(@initDuration) by bin(5m)

-- Analyze errors
fields @timestamp, @message
| filter @message like /ERROR/
| stats count() by @message
| sort count() desc

-- Performance metrics
fields @timestamp, @duration, @billedDuration, @memorySize, @maxMemoryUsed
| filter @type = "REPORT"
| stats avg(@duration), max(@duration), avg(@memorySize/@maxMemoryUsed) by bin(5m)

Best Practices Summary

  1. Keep functions small and focused
  2. Reuse connections across invocations
  3. Use environment variables for configuration
  4. Implement proper error handling and retries
  5. Optimize memory allocation for cost
  6. Use layers for shared dependencies
  7. Implement structured logging
  8. Monitor cold starts and optimize
  9. Use provisioned concurrency for critical paths
  10. Test thoroughly with unit and integration tests

Conclusion

AWS Lambda enables truly serverless architectures, but success requires understanding its operational model and limitations. Focus on cold start optimization, proper error handling, and cost management. Start with simple functions, then gradually adopt advanced patterns as your serverless maturity grows.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles