DynamoDB Design Patterns That Scale

David Childs

Master DynamoDB with advanced data modeling patterns, single-table design, and optimization strategies for high-performance NoSQL applications.

DynamoDB promises infinite scale, but poor design decisions will haunt you forever. After migrating systems handling billions of items, I've learned that DynamoDB success requires thinking differently about data. Here's your guide to DynamoDB patterns that actually work in production.

Single Table Design

The Power of Single Table

# single_table_design.py
class SingleTableDesign:
    """
    Single table design pattern for e-commerce application
    PK and SK patterns:
    - USER#<id>           | PROFILE           -> User profile
    - USER#<id>           | ORDER#<timestamp> -> User orders
    - PRODUCT#<id>        | METADATA          -> Product info
    - PRODUCT#<id>        | REVIEW#<user_id>  -> Product reviews
    - ORDER#<id>          | METADATA          -> Order details
    - ORDER#<id>          | ITEM#<product_id> -> Order items
    """
    
    def __init__(self, table_name):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
    
    def create_user(self, user_id, profile_data):
        """Create user with profile"""
        items = [
            {
                'PK': f'USER#{user_id}',
                'SK': 'PROFILE',
                'Type': 'UserProfile',
                'Email': profile_data['email'],
                'Name': profile_data['name'],
                'CreatedAt': datetime.now().isoformat(),
                'GSI1PK': f'EMAIL#{profile_data["email"]}',
                'GSI1SK': f'USER#{user_id}'
            }
        ]
        
        # Write in batch
        with self.table.batch_writer() as batch:
            for item in items:
                batch.put_item(Item=item)
    
    def create_order(self, order_id, user_id, items):
        """Create order with items"""
        timestamp = datetime.now().isoformat()
        
        # Order metadata
        order_items = [{
            'PK': f'ORDER#{order_id}',
            'SK': 'METADATA',
            'Type': 'Order',
            'UserId': user_id,
            'Status': 'PENDING',
            'Total': sum(item['price'] * item['quantity'] for item in items),
            'CreatedAt': timestamp,
            'GSI1PK': f'USER#{user_id}',
            'GSI1SK': f'ORDER#{timestamp}'
        }]
        
        # Order items
        for item in items:
            order_items.append({
                'PK': f'ORDER#{order_id}',
                'SK': f'ITEM#{item["product_id"]}',
                'Type': 'OrderItem',
                'ProductId': item['product_id'],
                'ProductName': item['name'],
                'Price': item['price'],
                'Quantity': item['quantity']
            })
        
        # Add to user's order list
        order_items.append({
            'PK': f'USER#{user_id}',
            'SK': f'ORDER#{timestamp}',
            'Type': 'UserOrder',
            'OrderId': order_id,
            'Total': sum(item['price'] * item['quantity'] for item in items),
            'Status': 'PENDING'
        })
        
        # Write all items
        with self.table.batch_writer() as batch:
            for item in order_items:
                batch.put_item(Item=item)
    
    def get_user_orders(self, user_id, limit=10):
        """Get user's recent orders"""
        response = self.table.query(
            KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') & 
                                  Key('SK').begins_with('ORDER#'),
            ScanIndexForward=False,  # Most recent first
            Limit=limit
        )
        return response['Items']
    
    def get_order_with_items(self, order_id):
        """Get order with all items"""
        response = self.table.query(
            KeyConditionExpression=Key('PK').eq(f'ORDER#{order_id}')
        )
        
        # Separate metadata and items
        metadata = None
        items = []
        
        for item in response['Items']:
            if item['SK'] == 'METADATA':
                metadata = item
            elif item['SK'].startswith('ITEM#'):
                items.append(item)
        
        return {'metadata': metadata, 'items': items}

Access Patterns and GSI Design

Optimizing for Query Patterns

# access_patterns.py
class AccessPatternOptimizer:
    """
    GSI Design for common access patterns:
    GSI1: Email lookup (GSI1PK: EMAIL#<email>, GSI1SK: USER#<id>)
    GSI2: Status queries (GSI2PK: STATUS#<status>, GSI2SK: ORDER#<timestamp>)
    GSI3: Product sales (GSI3PK: PRODUCT#<id>, GSI3SK: SALE#<timestamp>)
    """
    
    def __init__(self, table_name):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
    
    def find_user_by_email(self, email):
        """Use GSI1 to find user by email"""
        response = self.table.query(
            IndexName='GSI1',
            KeyConditionExpression=Key('GSI1PK').eq(f'EMAIL#{email}')
        )
        
        if response['Items']:
            user_id = response['Items'][0]['GSI1SK'].split('#')[1]
            # Get full user profile
            return self.get_user_profile(user_id)
        return None
    
    def get_orders_by_status(self, status, limit=50):
        """Use GSI2 to query orders by status"""
        response = self.table.query(
            IndexName='GSI2',
            KeyConditionExpression=Key('GSI2PK').eq(f'STATUS#{status}'),
            ScanIndexForward=False,
            Limit=limit
        )
        return response['Items']
    
    def get_product_sales(self, product_id, start_date=None, end_date=None):
        """Use GSI3 to get product sales history"""
        key_condition = Key('GSI3PK').eq(f'PRODUCT#{product_id}')
        
        if start_date and end_date:
            key_condition = key_condition & Key('GSI3SK').between(
                f'SALE#{start_date}',
                f'SALE#{end_date}'
            )
        
        response = self.table.query(
            IndexName='GSI3',
            KeyConditionExpression=key_condition
        )
        return response['Items']
    
    def update_order_status(self, order_id, new_status, old_status):
        """Update order status with GSI update"""
        
        # Get order metadata first
        response = self.table.get_item(
            Key={
                'PK': f'ORDER#{order_id}',
                'SK': 'METADATA'
            }
        )
        
        if not response.get('Item'):
            raise ValueError('Order not found')
        
        order = response['Item']
        
        # Update with new GSI values
        self.table.update_item(
            Key={
                'PK': f'ORDER#{order_id}',
                'SK': 'METADATA'
            },
            UpdateExpression='SET #status = :new_status, GSI2PK = :gsi2pk, GSI2SK = :gsi2sk',
            ExpressionAttributeNames={
                '#status': 'Status'
            },
            ExpressionAttributeValues={
                ':new_status': new_status,
                ':gsi2pk': f'STATUS#{new_status}',
                ':gsi2sk': f'ORDER#{order["CreatedAt"]}'
            }
        )

Advanced Query Patterns

Hierarchical Data with Composite Keys

# hierarchical_data.py
class HierarchicalDataPattern:
    """
    Pattern for storing hierarchical data (e.g., categories, folders)
    PK: TENANT#<id>
    SK: PATH#/category/subcategory/item
    """
    
    def __init__(self, table_name):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
    
    def create_item(self, tenant_id, path, metadata):
        """Create item in hierarchy"""
        self.table.put_item(
            Item={
                'PK': f'TENANT#{tenant_id}',
                'SK': f'PATH#{path}',
                'Type': 'HierarchicalItem',
                'Metadata': metadata,
                'Level': path.count('/'),
                'Parent': '/'.join(path.split('/')[:-1]) or '/'
            }
        )
    
    def get_children(self, tenant_id, parent_path):
        """Get direct children of a path"""
        # Query for items that start with parent path
        response = self.table.query(
            KeyConditionExpression=(
                Key('PK').eq(f'TENANT#{tenant_id}') & 
                Key('SK').begins_with(f'PATH#{parent_path}/')
            )
        )
        
        # Filter for direct children only
        parent_level = parent_path.count('/')
        children = [
            item for item in response['Items']
            if item['Level'] == parent_level + 1
        ]
        
        return children
    
    def get_ancestors(self, tenant_id, item_path):
        """Get all ancestors of an item"""
        ancestors = []
        parts = item_path.split('/')
        
        # Build ancestor paths
        for i in range(1, len(parts)):
            ancestor_path = '/'.join(parts[:i])
            ancestors.append({
                'PK': f'TENANT#{tenant_id}',
                'SK': f'PATH#{ancestor_path}'
            })
        
        # Batch get ancestors
        if ancestors:
            response = self.dynamodb.batch_get_item(
                RequestItems={
                    self.table.name: {
                        'Keys': ancestors
                    }
                }
            )
            return response['Responses'][self.table.name]
        
        return []

Time Series Data

# time_series.py
class TimeSeriesPattern:
    """
    Pattern for time series data with automatic sharding
    PK: METRIC#<name>#SHARD#<n>
    SK: TIMESTAMP#<iso_timestamp>
    """
    
    def __init__(self, table_name, shards=10):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
        self.shards = shards
    
    def write_metric(self, metric_name, value, timestamp=None):
        """Write time series data point"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # Distribute writes across shards
        shard = random.randint(0, self.shards - 1)
        
        self.table.put_item(
            Item={
                'PK': f'METRIC#{metric_name}#SHARD#{shard}',
                'SK': f'TIMESTAMP#{timestamp.isoformat()}',
                'Value': Decimal(str(value)),
                'TTL': int((timestamp + timedelta(days=30)).timestamp())  # 30 day retention
            }
        )
    
    def query_metrics(self, metric_name, start_time, end_time):
        """Query metrics across all shards"""
        all_items = []
        
        # Query each shard in parallel
        with ThreadPoolExecutor(max_workers=self.shards) as executor:
            futures = []
            
            for shard in range(self.shards):
                future = executor.submit(
                    self._query_shard,
                    metric_name,
                    shard,
                    start_time,
                    end_time
                )
                futures.append(future)
            
            # Collect results
            for future in futures:
                items = future.result()
                all_items.extend(items)
        
        # Sort by timestamp
        all_items.sort(key=lambda x: x['SK'])
        
        return all_items
    
    def _query_shard(self, metric_name, shard, start_time, end_time):
        """Query single shard"""
        response = self.table.query(
            KeyConditionExpression=(
                Key('PK').eq(f'METRIC#{metric_name}#SHARD#{shard}') &
                Key('SK').between(
                    f'TIMESTAMP#{start_time.isoformat()}',
                    f'TIMESTAMP#{end_time.isoformat()}'
                )
            )
        )
        return response['Items']

Transactions and Batch Operations

ACID Transactions

# transactions.py
class TransactionManager:
    def __init__(self, table_name):
        self.dynamodb = boto3.client('dynamodb')
        self.table_name = table_name
    
    def transfer_funds(self, from_account, to_account, amount):
        """Atomic fund transfer between accounts"""
        
        try:
            response = self.dynamodb.transact_write_items(
                TransactItems=[
                    {
                        # Debit from account
                        'Update': {
                            'TableName': self.table_name,
                            'Key': {
                                'PK': {'S': f'ACCOUNT#{from_account}'},
                                'SK': {'S': 'BALANCE'}
                            },
                            'UpdateExpression': 'SET Balance = Balance - :amount',
                            'ConditionExpression': 'Balance >= :amount',
                            'ExpressionAttributeValues': {
                                ':amount': {'N': str(amount)}
                            }
                        }
                    },
                    {
                        # Credit to account
                        'Update': {
                            'TableName': self.table_name,
                            'Key': {
                                'PK': {'S': f'ACCOUNT#{to_account}'},
                                'SK': {'S': 'BALANCE'}
                            },
                            'UpdateExpression': 'SET Balance = Balance + :amount',
                            'ExpressionAttributeValues': {
                                ':amount': {'N': str(amount)}
                            }
                        }
                    },
                    {
                        # Create transaction record
                        'Put': {
                            'TableName': self.table_name,
                            'Item': {
                                'PK': {'S': f'TRANSACTION#{uuid.uuid4()}'},
                                'SK': {'S': f'TIMESTAMP#{datetime.now().isoformat()}'},
                                'FromAccount': {'S': from_account},
                                'ToAccount': {'S': to_account},
                                'Amount': {'N': str(amount)},
                                'Status': {'S': 'COMPLETED'}
                            }
                        }
                    }
                ]
            )
            
            return {'success': True, 'transaction_id': response['ResponseMetadata']['RequestId']}
            
        except ClientError as e:
            if e.response['Error']['Code'] == 'TransactionCanceledException':
                return {'success': False, 'error': 'Insufficient funds or account not found'}
            raise

Batch Operations with Error Handling

# batch_operations.py
class BatchOperationManager:
    def __init__(self, table_name):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
    
    def batch_write_with_retry(self, items, max_retries=3):
        """Batch write with automatic retry for unprocessed items"""
        
        unprocessed_items = []
        
        # Process in batches of 25 (DynamoDB limit)
        for i in range(0, len(items), 25):
            batch = items[i:i+25]
            retry_count = 0
            
            while retry_count < max_retries:
                try:
                    with self.table.batch_writer() as writer:
                        for item in batch:
                            writer.put_item(Item=item)
                    break
                    
                except ClientError as e:
                    if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                        # Exponential backoff
                        time.sleep(2 ** retry_count)
                        retry_count += 1
                    else:
                        raise
            
            if retry_count == max_retries:
                unprocessed_items.extend(batch)
        
        return {
            'processed': len(items) - len(unprocessed_items),
            'unprocessed': unprocessed_items
        }

DynamoDB Streams

Change Data Capture

# streams_processor.py
def lambda_handler(event, context):
    """Process DynamoDB stream events"""
    
    for record in event['Records']:
        event_name = record['eventName']
        
        if event_name == 'INSERT':
            handle_insert(record['dynamodb']['NewImage'])
        elif event_name == 'MODIFY':
            handle_update(
                record['dynamodb']['OldImage'],
                record['dynamodb']['NewImage']
            )
        elif event_name == 'REMOVE':
            handle_delete(record['dynamodb']['OldImage'])

def handle_insert(new_image):
    """Handle new item insertion"""
    
    # Deserialize DynamoDB format
    item = deserialize_dynamodb_item(new_image)
    
    # Check if it's an order
    if item.get('Type') == 'Order':
        # Send order confirmation email
        send_order_confirmation(item)
        
        # Update inventory
        update_inventory(item)
        
        # Trigger fulfillment workflow
        start_fulfillment(item)

def handle_update(old_image, new_image):
    """Handle item updates"""
    
    old_item = deserialize_dynamodb_item(old_image)
    new_item = deserialize_dynamodb_item(new_image)
    
    # Check for status changes
    if old_item.get('Status') != new_item.get('Status'):
        notify_status_change(new_item)

def deserialize_dynamodb_item(dynamodb_item):
    """Convert DynamoDB format to Python dict"""
    deserializer = boto3.dynamodb.types.TypeDeserializer()
    return {k: deserializer.deserialize(v) for k, v in dynamodb_item.items()}

Performance Optimization

Hot Partition Management

# hot_partition_manager.py
class HotPartitionManager:
    """Manage hot partitions with adaptive sharding"""
    
    def __init__(self, table_name):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
        self.cloudwatch = boto3.client('cloudwatch')
    
    def write_with_jitter(self, partition_key, sort_key, item):
        """Add jitter to partition key for hot partitions"""
        
        # Check if partition is hot
        if self.is_hot_partition(partition_key):
            # Add random suffix to distribute load
            suffix = random.randint(0, 9)
            partition_key = f"{partition_key}#{suffix}"
            item['OriginalPK'] = partition_key.split('#')[0]
        
        item['PK'] = partition_key
        item['SK'] = sort_key
        
        self.table.put_item(Item=item)
    
    def read_with_scatter_gather(self, original_pk, sk_prefix):
        """Read from sharded partitions"""
        
        all_items = []
        
        # Check if using sharding
        if self.is_hot_partition(original_pk):
            # Query all shards in parallel
            with ThreadPoolExecutor(max_workers=10) as executor:
                futures = []
                
                for shard in range(10):
                    pk = f"{original_pk}#{shard}"
                    future = executor.submit(
                        self._query_shard,
                        pk,
                        sk_prefix
                    )
                    futures.append(future)
                
                for future in futures:
                    items = future.result()
                    all_items.extend(items)
        else:
            # Query single partition
            all_items = self._query_shard(original_pk, sk_prefix)
        
        return all_items
    
    def is_hot_partition(self, partition_key):
        """Check if partition is hot based on CloudWatch metrics"""
        
        # Get consumed capacity metrics
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/DynamoDB',
            MetricName='UserErrors',
            Dimensions=[
                {'Name': 'TableName', 'Value': self.table.name}
            ],
            StartTime=datetime.now() - timedelta(minutes=5),
            EndTime=datetime.now(),
            Period=60,
            Statistics=['Sum']
        )
        
        # Check for throttling
        throttle_count = sum(p['Sum'] for p in response['Datapoints'])
        
        return throttle_count > 10  # Threshold for hot partition

On-Demand vs Provisioned Capacity

# capacity_optimizer.py
class CapacityOptimizer:
    def __init__(self, table_name):
        self.dynamodb = boto3.client('dynamodb')
        self.cloudwatch = boto3.client('cloudwatch')
        self.table_name = table_name
    
    def analyze_usage_pattern(self, days=7):
        """Analyze usage pattern to recommend capacity mode"""
        
        # Get consumed capacity metrics
        end_time = datetime.now()
        start_time = end_time - timedelta(days=days)
        
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/DynamoDB',
            MetricName='ConsumedReadCapacityUnits',
            Dimensions=[
                {'Name': 'TableName', 'Value': self.table_name}
            ],
            StartTime=start_time,
            EndTime=end_time,
            Period=3600,  # Hourly
            Statistics=['Sum', 'Average', 'Maximum']
        )
        
        # Calculate metrics
        datapoints = response['Datapoints']
        
        if not datapoints:
            return {'recommendation': 'INSUFFICIENT_DATA'}
        
        avg_consumption = sum(p['Average'] for p in datapoints) / len(datapoints)
        max_consumption = max(p['Maximum'] for p in datapoints)
        variance = max_consumption / avg_consumption if avg_consumption > 0 else 0
        
        # Recommendation logic
        if variance > 10:
            # High variance - spiky traffic
            recommendation = 'ON_DEMAND'
            reason = 'Traffic pattern shows high variance (>10x)'
        elif avg_consumption < 10:
            # Low usage
            recommendation = 'ON_DEMAND'
            reason = 'Low average consumption (<10 RCU)'
        else:
            # Steady traffic
            recommendation = 'PROVISIONED'
            reason = 'Steady traffic pattern with predictable load'
            
        return {
            'recommendation': recommendation,
            'reason': reason,
            'avg_consumption': avg_consumption,
            'max_consumption': max_consumption,
            'variance': variance
        }
    
    def switch_to_on_demand(self):
        """Switch table to on-demand billing"""
        
        self.dynamodb.update_table(
            TableName=self.table_name,
            BillingMode='PAY_PER_REQUEST'
        )
    
    def switch_to_provisioned(self, read_units, write_units):
        """Switch table to provisioned capacity"""
        
        self.dynamodb.update_table(
            TableName=self.table_name,
            BillingMode='PROVISIONED',
            ProvisionedThroughput={
                'ReadCapacityUnits': read_units,
                'WriteCapacityUnits': write_units
            }
        )

Cost Optimization

TTL and Archival Strategy

# ttl_archival.py
class TTLArchivalManager:
    def __init__(self, table_name, archive_bucket):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)
        self.s3 = boto3.client('s3')
        self.archive_bucket = archive_bucket
    
    def set_ttl(self, item, days_to_live):
        """Set TTL on item"""
        ttl_timestamp = int((datetime.now() + timedelta(days=days_to_live)).timestamp())
        item['TTL'] = ttl_timestamp
        return item
    
    def archive_old_data(self, days_old=90):
        """Archive old data to S3 before TTL deletion"""
        
        cutoff_date = datetime.now() - timedelta(days=days_old)
        
        # Scan for old items (use with caution on large tables)
        response = self.table.scan(
            FilterExpression=Attr('CreatedAt').lt(cutoff_date.isoformat())
        )
        
        # Archive to S3
        for item in response['Items']:
            # Convert Decimal to float for JSON serialization
            item_json = json.dumps(item, default=str)
            
            # Store in S3 with hierarchical key
            key = f"archive/{item['Type']}/{item['PK']}/{item['SK']}.json"
            
            self.s3.put_object(
                Bucket=self.archive_bucket,
                Key=key,
                Body=item_json,
                StorageClass='GLACIER'
            )
            
            # Set TTL for deletion
            self.table.update_item(
                Key={'PK': item['PK'], 'SK': item['SK']},
                UpdateExpression='SET #ttl = :ttl',
                ExpressionAttributeNames={'#ttl': 'TTL'},
                ExpressionAttributeValues={
                    ':ttl': int((datetime.now() + timedelta(days=7)).timestamp())
                }
            )

Best Practices Checklist

  • Design for your access patterns, not your entities
  • Use single table design when appropriate
  • Leverage GSIs for alternative access patterns
  • Implement proper error handling and retries
  • Use transactions for ACID operations
  • Enable point-in-time recovery for production
  • Configure auto-scaling or use on-demand
  • Implement TTL for data lifecycle
  • Use DynamoDB Streams for event-driven architectures
  • Monitor hot partitions with CloudWatch
  • Batch operations for efficiency
  • Use conditional writes to prevent race conditions
  • Archive old data to S3
  • Enable encryption at rest
  • Regular backups with AWS Backup

Conclusion

DynamoDB excels when you embrace its constraints and design for them. Think in terms of access patterns, not relationships. Use single table design judiciously, leverage streams for event-driven architectures, and always plan for scale from day one. The key to DynamoDB success is understanding that it's not a relational database—it's a high-performance key-value and document store that rewards good design with incredible scale and performance.

Share this article

DC

David Childs

Consulting Systems Engineer with over 10 years of experience building scalable infrastructure and helping organizations optimize their technology stack.

Related Articles