Master DynamoDB with advanced data modeling patterns, single-table design, and optimization strategies for high-performance NoSQL applications.
DynamoDB promises infinite scale, but poor design decisions will haunt you forever. After migrating systems handling billions of items, I've learned that DynamoDB success requires thinking differently about data. Here's your guide to DynamoDB patterns that actually work in production.
Single Table Design
The Power of Single Table
# single_table_design.py
class SingleTableDesign:
"""
Single table design pattern for e-commerce application
PK and SK patterns:
- USER#<id> | PROFILE -> User profile
- USER#<id> | ORDER#<timestamp> -> User orders
- PRODUCT#<id> | METADATA -> Product info
- PRODUCT#<id> | REVIEW#<user_id> -> Product reviews
- ORDER#<id> | METADATA -> Order details
- ORDER#<id> | ITEM#<product_id> -> Order items
"""
def __init__(self, table_name):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def create_user(self, user_id, profile_data):
"""Create user with profile"""
items = [
{
'PK': f'USER#{user_id}',
'SK': 'PROFILE',
'Type': 'UserProfile',
'Email': profile_data['email'],
'Name': profile_data['name'],
'CreatedAt': datetime.now().isoformat(),
'GSI1PK': f'EMAIL#{profile_data["email"]}',
'GSI1SK': f'USER#{user_id}'
}
]
# Write in batch
with self.table.batch_writer() as batch:
for item in items:
batch.put_item(Item=item)
def create_order(self, order_id, user_id, items):
"""Create order with items"""
timestamp = datetime.now().isoformat()
# Order metadata
order_items = [{
'PK': f'ORDER#{order_id}',
'SK': 'METADATA',
'Type': 'Order',
'UserId': user_id,
'Status': 'PENDING',
'Total': sum(item['price'] * item['quantity'] for item in items),
'CreatedAt': timestamp,
'GSI1PK': f'USER#{user_id}',
'GSI1SK': f'ORDER#{timestamp}'
}]
# Order items
for item in items:
order_items.append({
'PK': f'ORDER#{order_id}',
'SK': f'ITEM#{item["product_id"]}',
'Type': 'OrderItem',
'ProductId': item['product_id'],
'ProductName': item['name'],
'Price': item['price'],
'Quantity': item['quantity']
})
# Add to user's order list
order_items.append({
'PK': f'USER#{user_id}',
'SK': f'ORDER#{timestamp}',
'Type': 'UserOrder',
'OrderId': order_id,
'Total': sum(item['price'] * item['quantity'] for item in items),
'Status': 'PENDING'
})
# Write all items
with self.table.batch_writer() as batch:
for item in order_items:
batch.put_item(Item=item)
def get_user_orders(self, user_id, limit=10):
"""Get user's recent orders"""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f'USER#{user_id}') &
Key('SK').begins_with('ORDER#'),
ScanIndexForward=False, # Most recent first
Limit=limit
)
return response['Items']
def get_order_with_items(self, order_id):
"""Get order with all items"""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f'ORDER#{order_id}')
)
# Separate metadata and items
metadata = None
items = []
for item in response['Items']:
if item['SK'] == 'METADATA':
metadata = item
elif item['SK'].startswith('ITEM#'):
items.append(item)
return {'metadata': metadata, 'items': items}
Access Patterns and GSI Design
Optimizing for Query Patterns
# access_patterns.py
class AccessPatternOptimizer:
"""
GSI Design for common access patterns:
GSI1: Email lookup (GSI1PK: EMAIL#<email>, GSI1SK: USER#<id>)
GSI2: Status queries (GSI2PK: STATUS#<status>, GSI2SK: ORDER#<timestamp>)
GSI3: Product sales (GSI3PK: PRODUCT#<id>, GSI3SK: SALE#<timestamp>)
"""
def __init__(self, table_name):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def find_user_by_email(self, email):
"""Use GSI1 to find user by email"""
response = self.table.query(
IndexName='GSI1',
KeyConditionExpression=Key('GSI1PK').eq(f'EMAIL#{email}')
)
if response['Items']:
user_id = response['Items'][0]['GSI1SK'].split('#')[1]
# Get full user profile
return self.get_user_profile(user_id)
return None
def get_orders_by_status(self, status, limit=50):
"""Use GSI2 to query orders by status"""
response = self.table.query(
IndexName='GSI2',
KeyConditionExpression=Key('GSI2PK').eq(f'STATUS#{status}'),
ScanIndexForward=False,
Limit=limit
)
return response['Items']
def get_product_sales(self, product_id, start_date=None, end_date=None):
"""Use GSI3 to get product sales history"""
key_condition = Key('GSI3PK').eq(f'PRODUCT#{product_id}')
if start_date and end_date:
key_condition = key_condition & Key('GSI3SK').between(
f'SALE#{start_date}',
f'SALE#{end_date}'
)
response = self.table.query(
IndexName='GSI3',
KeyConditionExpression=key_condition
)
return response['Items']
def update_order_status(self, order_id, new_status, old_status):
"""Update order status with GSI update"""
# Get order metadata first
response = self.table.get_item(
Key={
'PK': f'ORDER#{order_id}',
'SK': 'METADATA'
}
)
if not response.get('Item'):
raise ValueError('Order not found')
order = response['Item']
# Update with new GSI values
self.table.update_item(
Key={
'PK': f'ORDER#{order_id}',
'SK': 'METADATA'
},
UpdateExpression='SET #status = :new_status, GSI2PK = :gsi2pk, GSI2SK = :gsi2sk',
ExpressionAttributeNames={
'#status': 'Status'
},
ExpressionAttributeValues={
':new_status': new_status,
':gsi2pk': f'STATUS#{new_status}',
':gsi2sk': f'ORDER#{order["CreatedAt"]}'
}
)
Advanced Query Patterns
Hierarchical Data with Composite Keys
# hierarchical_data.py
class HierarchicalDataPattern:
"""
Pattern for storing hierarchical data (e.g., categories, folders)
PK: TENANT#<id>
SK: PATH#/category/subcategory/item
"""
def __init__(self, table_name):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def create_item(self, tenant_id, path, metadata):
"""Create item in hierarchy"""
self.table.put_item(
Item={
'PK': f'TENANT#{tenant_id}',
'SK': f'PATH#{path}',
'Type': 'HierarchicalItem',
'Metadata': metadata,
'Level': path.count('/'),
'Parent': '/'.join(path.split('/')[:-1]) or '/'
}
)
def get_children(self, tenant_id, parent_path):
"""Get direct children of a path"""
# Query for items that start with parent path
response = self.table.query(
KeyConditionExpression=(
Key('PK').eq(f'TENANT#{tenant_id}') &
Key('SK').begins_with(f'PATH#{parent_path}/')
)
)
# Filter for direct children only
parent_level = parent_path.count('/')
children = [
item for item in response['Items']
if item['Level'] == parent_level + 1
]
return children
def get_ancestors(self, tenant_id, item_path):
"""Get all ancestors of an item"""
ancestors = []
parts = item_path.split('/')
# Build ancestor paths
for i in range(1, len(parts)):
ancestor_path = '/'.join(parts[:i])
ancestors.append({
'PK': f'TENANT#{tenant_id}',
'SK': f'PATH#{ancestor_path}'
})
# Batch get ancestors
if ancestors:
response = self.dynamodb.batch_get_item(
RequestItems={
self.table.name: {
'Keys': ancestors
}
}
)
return response['Responses'][self.table.name]
return []
Time Series Data
# time_series.py
class TimeSeriesPattern:
"""
Pattern for time series data with automatic sharding
PK: METRIC#<name>#SHARD#<n>
SK: TIMESTAMP#<iso_timestamp>
"""
def __init__(self, table_name, shards=10):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
self.shards = shards
def write_metric(self, metric_name, value, timestamp=None):
"""Write time series data point"""
if timestamp is None:
timestamp = datetime.now()
# Distribute writes across shards
shard = random.randint(0, self.shards - 1)
self.table.put_item(
Item={
'PK': f'METRIC#{metric_name}#SHARD#{shard}',
'SK': f'TIMESTAMP#{timestamp.isoformat()}',
'Value': Decimal(str(value)),
'TTL': int((timestamp + timedelta(days=30)).timestamp()) # 30 day retention
}
)
def query_metrics(self, metric_name, start_time, end_time):
"""Query metrics across all shards"""
all_items = []
# Query each shard in parallel
with ThreadPoolExecutor(max_workers=self.shards) as executor:
futures = []
for shard in range(self.shards):
future = executor.submit(
self._query_shard,
metric_name,
shard,
start_time,
end_time
)
futures.append(future)
# Collect results
for future in futures:
items = future.result()
all_items.extend(items)
# Sort by timestamp
all_items.sort(key=lambda x: x['SK'])
return all_items
def _query_shard(self, metric_name, shard, start_time, end_time):
"""Query single shard"""
response = self.table.query(
KeyConditionExpression=(
Key('PK').eq(f'METRIC#{metric_name}#SHARD#{shard}') &
Key('SK').between(
f'TIMESTAMP#{start_time.isoformat()}',
f'TIMESTAMP#{end_time.isoformat()}'
)
)
)
return response['Items']
Transactions and Batch Operations
ACID Transactions
# transactions.py
class TransactionManager:
def __init__(self, table_name):
self.dynamodb = boto3.client('dynamodb')
self.table_name = table_name
def transfer_funds(self, from_account, to_account, amount):
"""Atomic fund transfer between accounts"""
try:
response = self.dynamodb.transact_write_items(
TransactItems=[
{
# Debit from account
'Update': {
'TableName': self.table_name,
'Key': {
'PK': {'S': f'ACCOUNT#{from_account}'},
'SK': {'S': 'BALANCE'}
},
'UpdateExpression': 'SET Balance = Balance - :amount',
'ConditionExpression': 'Balance >= :amount',
'ExpressionAttributeValues': {
':amount': {'N': str(amount)}
}
}
},
{
# Credit to account
'Update': {
'TableName': self.table_name,
'Key': {
'PK': {'S': f'ACCOUNT#{to_account}'},
'SK': {'S': 'BALANCE'}
},
'UpdateExpression': 'SET Balance = Balance + :amount',
'ExpressionAttributeValues': {
':amount': {'N': str(amount)}
}
}
},
{
# Create transaction record
'Put': {
'TableName': self.table_name,
'Item': {
'PK': {'S': f'TRANSACTION#{uuid.uuid4()}'},
'SK': {'S': f'TIMESTAMP#{datetime.now().isoformat()}'},
'FromAccount': {'S': from_account},
'ToAccount': {'S': to_account},
'Amount': {'N': str(amount)},
'Status': {'S': 'COMPLETED'}
}
}
}
]
)
return {'success': True, 'transaction_id': response['ResponseMetadata']['RequestId']}
except ClientError as e:
if e.response['Error']['Code'] == 'TransactionCanceledException':
return {'success': False, 'error': 'Insufficient funds or account not found'}
raise
Batch Operations with Error Handling
# batch_operations.py
class BatchOperationManager:
def __init__(self, table_name):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def batch_write_with_retry(self, items, max_retries=3):
"""Batch write with automatic retry for unprocessed items"""
unprocessed_items = []
# Process in batches of 25 (DynamoDB limit)
for i in range(0, len(items), 25):
batch = items[i:i+25]
retry_count = 0
while retry_count < max_retries:
try:
with self.table.batch_writer() as writer:
for item in batch:
writer.put_item(Item=item)
break
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
# Exponential backoff
time.sleep(2 ** retry_count)
retry_count += 1
else:
raise
if retry_count == max_retries:
unprocessed_items.extend(batch)
return {
'processed': len(items) - len(unprocessed_items),
'unprocessed': unprocessed_items
}
DynamoDB Streams
Change Data Capture
# streams_processor.py
def lambda_handler(event, context):
"""Process DynamoDB stream events"""
for record in event['Records']:
event_name = record['eventName']
if event_name == 'INSERT':
handle_insert(record['dynamodb']['NewImage'])
elif event_name == 'MODIFY':
handle_update(
record['dynamodb']['OldImage'],
record['dynamodb']['NewImage']
)
elif event_name == 'REMOVE':
handle_delete(record['dynamodb']['OldImage'])
def handle_insert(new_image):
"""Handle new item insertion"""
# Deserialize DynamoDB format
item = deserialize_dynamodb_item(new_image)
# Check if it's an order
if item.get('Type') == 'Order':
# Send order confirmation email
send_order_confirmation(item)
# Update inventory
update_inventory(item)
# Trigger fulfillment workflow
start_fulfillment(item)
def handle_update(old_image, new_image):
"""Handle item updates"""
old_item = deserialize_dynamodb_item(old_image)
new_item = deserialize_dynamodb_item(new_image)
# Check for status changes
if old_item.get('Status') != new_item.get('Status'):
notify_status_change(new_item)
def deserialize_dynamodb_item(dynamodb_item):
"""Convert DynamoDB format to Python dict"""
deserializer = boto3.dynamodb.types.TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in dynamodb_item.items()}
Performance Optimization
Hot Partition Management
# hot_partition_manager.py
class HotPartitionManager:
"""Manage hot partitions with adaptive sharding"""
def __init__(self, table_name):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
self.cloudwatch = boto3.client('cloudwatch')
def write_with_jitter(self, partition_key, sort_key, item):
"""Add jitter to partition key for hot partitions"""
# Check if partition is hot
if self.is_hot_partition(partition_key):
# Add random suffix to distribute load
suffix = random.randint(0, 9)
partition_key = f"{partition_key}#{suffix}"
item['OriginalPK'] = partition_key.split('#')[0]
item['PK'] = partition_key
item['SK'] = sort_key
self.table.put_item(Item=item)
def read_with_scatter_gather(self, original_pk, sk_prefix):
"""Read from sharded partitions"""
all_items = []
# Check if using sharding
if self.is_hot_partition(original_pk):
# Query all shards in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for shard in range(10):
pk = f"{original_pk}#{shard}"
future = executor.submit(
self._query_shard,
pk,
sk_prefix
)
futures.append(future)
for future in futures:
items = future.result()
all_items.extend(items)
else:
# Query single partition
all_items = self._query_shard(original_pk, sk_prefix)
return all_items
def is_hot_partition(self, partition_key):
"""Check if partition is hot based on CloudWatch metrics"""
# Get consumed capacity metrics
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName='UserErrors',
Dimensions=[
{'Name': 'TableName', 'Value': self.table.name}
],
StartTime=datetime.now() - timedelta(minutes=5),
EndTime=datetime.now(),
Period=60,
Statistics=['Sum']
)
# Check for throttling
throttle_count = sum(p['Sum'] for p in response['Datapoints'])
return throttle_count > 10 # Threshold for hot partition
On-Demand vs Provisioned Capacity
# capacity_optimizer.py
class CapacityOptimizer:
def __init__(self, table_name):
self.dynamodb = boto3.client('dynamodb')
self.cloudwatch = boto3.client('cloudwatch')
self.table_name = table_name
def analyze_usage_pattern(self, days=7):
"""Analyze usage pattern to recommend capacity mode"""
# Get consumed capacity metrics
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName='ConsumedReadCapacityUnits',
Dimensions=[
{'Name': 'TableName', 'Value': self.table_name}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # Hourly
Statistics=['Sum', 'Average', 'Maximum']
)
# Calculate metrics
datapoints = response['Datapoints']
if not datapoints:
return {'recommendation': 'INSUFFICIENT_DATA'}
avg_consumption = sum(p['Average'] for p in datapoints) / len(datapoints)
max_consumption = max(p['Maximum'] for p in datapoints)
variance = max_consumption / avg_consumption if avg_consumption > 0 else 0
# Recommendation logic
if variance > 10:
# High variance - spiky traffic
recommendation = 'ON_DEMAND'
reason = 'Traffic pattern shows high variance (>10x)'
elif avg_consumption < 10:
# Low usage
recommendation = 'ON_DEMAND'
reason = 'Low average consumption (<10 RCU)'
else:
# Steady traffic
recommendation = 'PROVISIONED'
reason = 'Steady traffic pattern with predictable load'
return {
'recommendation': recommendation,
'reason': reason,
'avg_consumption': avg_consumption,
'max_consumption': max_consumption,
'variance': variance
}
def switch_to_on_demand(self):
"""Switch table to on-demand billing"""
self.dynamodb.update_table(
TableName=self.table_name,
BillingMode='PAY_PER_REQUEST'
)
def switch_to_provisioned(self, read_units, write_units):
"""Switch table to provisioned capacity"""
self.dynamodb.update_table(
TableName=self.table_name,
BillingMode='PROVISIONED',
ProvisionedThroughput={
'ReadCapacityUnits': read_units,
'WriteCapacityUnits': write_units
}
)
Cost Optimization
TTL and Archival Strategy
# ttl_archival.py
class TTLArchivalManager:
def __init__(self, table_name, archive_bucket):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
self.s3 = boto3.client('s3')
self.archive_bucket = archive_bucket
def set_ttl(self, item, days_to_live):
"""Set TTL on item"""
ttl_timestamp = int((datetime.now() + timedelta(days=days_to_live)).timestamp())
item['TTL'] = ttl_timestamp
return item
def archive_old_data(self, days_old=90):
"""Archive old data to S3 before TTL deletion"""
cutoff_date = datetime.now() - timedelta(days=days_old)
# Scan for old items (use with caution on large tables)
response = self.table.scan(
FilterExpression=Attr('CreatedAt').lt(cutoff_date.isoformat())
)
# Archive to S3
for item in response['Items']:
# Convert Decimal to float for JSON serialization
item_json = json.dumps(item, default=str)
# Store in S3 with hierarchical key
key = f"archive/{item['Type']}/{item['PK']}/{item['SK']}.json"
self.s3.put_object(
Bucket=self.archive_bucket,
Key=key,
Body=item_json,
StorageClass='GLACIER'
)
# Set TTL for deletion
self.table.update_item(
Key={'PK': item['PK'], 'SK': item['SK']},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'TTL'},
ExpressionAttributeValues={
':ttl': int((datetime.now() + timedelta(days=7)).timestamp())
}
)
Best Practices Checklist
Conclusion
DynamoDB excels when you embrace its constraints and design for them. Think in terms of access patterns, not relationships. Use single table design judiciously, leverage streams for event-driven architectures, and always plan for scale from day one. The key to DynamoDB success is understanding that it's not a relational database—it's a high-performance key-value and document store that rewards good design with incredible scale and performance.