Table of Contents
- Introduction to PynamoDB
- Getting Started
- Basic Concepts
- Model Definition
- CRUD Operations
- Querying and Filtering
- Indexes
- Advanced Features
- Best Practices
- Real-World Examples
- Performance Optimization
- Testing
- Deployment Considerations
1. Introduction to PynamoDB
PynamoDB is a Pythonic interface for Amazon DynamoDB, providing an Object-Relational Mapping (ORM) layer that makes working with DynamoDB more intuitive for Python developers.
What is DynamoDB?
graph TB
A[DynamoDB] --> B[NoSQL Database]
A --> C[Fully Managed]
A --> D[Serverless]
A --> E[Auto-scaling]
B --> F[Key-Value Store]
B --> G[Document Store]
C --> H[No Server Management]
D --> I[Pay per Request]
E --> J[Handles Traffic Spikes]Why PynamoDB?
graph LR
A[Raw Boto3] --> B[Complex Syntax]
A --> C[Manual Type Conversion]
A --> D[Error Prone]
E[PynamoDB] --> F[Pythonic Interface]
E --> G[Automatic Type Handling]
E --> H[Model-based Approach]
E --> I[Built-in Validation]2. Getting Started
Installation
pip install pynamodbBashBasic Setup
from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute
import os
# Configure AWS credentials
os.environ['AWS_ACCESS_KEY_ID'] = 'your-access-key'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your-secret-key'
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'PythonYour First Model
class User(Model):
class Meta:
table_name = 'users'
region = 'us-west-2'
email = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
age = NumberAttribute()Python3. Basic Concepts
DynamoDB Key Concepts
graph TD
A[DynamoDB Table] --> B[Primary Key]
B --> C[Hash Key - Partition Key]
B --> D[Range Key - Sort Key - Optional]
A --> E[Items - Records]
A --> F[Attributes - Fields]
A --> G[Secondary Indexes]
G --> H[Global Secondary Index - GSI]
G --> I[Local Secondary Index - LSI]PynamoDB Model Architecture
classDiagram
class Model {
+Meta class
+Attributes
+save()
+delete()
+query()
+scan()
}
class Meta {
+table_name
+region
+host
+billing_mode
}
class Attributes {
+UnicodeAttribute
+NumberAttribute
+BinaryAttribute
+BooleanAttribute
+UTCDateTimeAttribute
+ListAttribute
+MapAttribute
}
Model --> Meta
Model --> Attributes4. Model Definition
Basic Model Structure
from pynamodb.models import Model
from pynamodb.attributes import (
UnicodeAttribute, NumberAttribute, BinaryAttribute,
BooleanAttribute, UTCDateTimeAttribute, ListAttribute,
MapAttribute, JSONAttribute
)
from datetime import datetime
class User(Model):
class Meta:
table_name = 'users'
region = 'us-west-2'
# Optional: for local development
# host = 'http://localhost:8000'
# Primary Key
user_id = UnicodeAttribute(hash_key=True)
# Attributes
email = UnicodeAttribute()
name = UnicodeAttribute()
age = NumberAttribute()
is_active = BooleanAttribute(default=True)
created_at = UTCDateTimeAttribute(default=datetime.now)
profile_data = JSONAttribute(null=True)PythonComposite Primary Key
class OrderItem(Model):
class Meta:
table_name = 'order_items'
region = 'us-west-2'
order_id = UnicodeAttribute(hash_key=True)
item_id = UnicodeAttribute(range_key=True)
quantity = NumberAttribute()
price = NumberAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)PythonAttribute Types Deep Dive
graph TB
A[PynamoDB Attributes] --> B[Simple Types]
A --> C[Complex Types]
A --> D[Custom Types]
B --> E[UnicodeAttribute]
B --> F[NumberAttribute]
B --> G[BinaryAttribute]
B --> H[BooleanAttribute]
B --> I[UTCDateTimeAttribute]
C --> J[ListAttribute]
C --> K[MapAttribute]
C --> L[JSONAttribute]
C --> M[SetAttribute]
D --> N[Custom Serialization]
D --> O[Inheritance]Complex Attributes Example
class Address(MapAttribute):
street = UnicodeAttribute()
city = UnicodeAttribute()
state = UnicodeAttribute()
zip_code = UnicodeAttribute()
class User(Model):
class Meta:
table_name = 'users'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
addresses = ListAttribute(of=Address)
tags = ListAttribute()
metadata = JSONAttribute()Python5. CRUD Operations
Create Operations
sequenceDiagram
participant App
participant PynamoDB
participant DynamoDB
App->>PynamoDB: user.save()
PynamoDB->>DynamoDB: PutItem Request
DynamoDB->>PynamoDB: Success Response
PynamoDB->>App: Model Instance# Creating a new user
user = User(
user_id='user123',
email='john@example.com',
name='John Doe',
age=30
)
user.save()
# Alternative creation method
User.create(
user_id='user124',
email='jane@example.com',
name='Jane Smith',
age=25
)
# Conditional creation (only if doesn't exist)
try:
User.create(
user_id='user125',
email='bob@example.com',
name='Bob Johnson',
age=35,
condition=User.user_id.does_not_exist()
)
except User.DoesNotExist:
print("User already exists")PythonRead Operations
# Get a single item
try:
user = User.get('user123')
print(f"Found user: {user.name}")
except User.DoesNotExist:
print("User not found")
# Get with range key
try:
order_item = OrderItem.get('order123', 'item456')
print(f"Quantity: {order_item.quantity}")
except OrderItem.DoesNotExist:
print("Order item not found")
# Batch get
user_ids = ['user123', 'user124', 'user125']
users = []
for user in User.batch_get(user_ids):
users.append(user)PythonUpdate Operations
# Simple update
user = User.get('user123')
user.age = 31
user.save()
# Conditional update
user.update(actions=[
User.age.set(32),
User.name.set('John Updated')
], condition=User.age < 35)
# Atomic operations
user.update(actions=[
User.age.add(1), # Increment age by 1
User.tags.append(['new-tag']), # Add to list
User.metadata.set({'last_login': datetime.now().isoformat()})
])PythonDelete Operations
# Delete a single item
user = User.get('user123')
user.delete()
# Conditional delete
user.delete(condition=User.is_active == True)
# Direct delete without fetching
User.delete('user124')
# Batch delete
users_to_delete = ['user125', 'user126', 'user127']
with User.batch_write() as batch:
for user_id in users_to_delete:
batch.delete(User(user_id=user_id))Python6. Querying and Filtering
Query vs Scan
graph LR
A[Data Access Patterns] --> B[Query]
A --> C[Scan]
B --> D[Uses Primary Key]
B --> E[Efficient]
B --> F[Limited Results]
C --> G[Examines All Items]
C --> H[Expensive]
C --> I[Flexible Filtering]Query Operations
# Query by hash key
users = User.query('user123')
for user in users:
print(user.name)
# Query with range key condition
order_items = OrderItem.query(
'order123',
OrderItem.item_id.startswith('ELEC')
)
# Query with filter
recent_orders = OrderItem.query(
'order123',
filter_condition=OrderItem.created_at > datetime(2023, 1, 1)
)
# Query with pagination
page_size = 10
last_evaluated_key = None
all_items = []
while True:
if last_evaluated_key:
results = OrderItem.query(
'order123',
limit=page_size,
last_evaluated_key=last_evaluated_key
)
else:
results = OrderItem.query('order123', limit=page_size)
items = list(results)
all_items.extend(items)
if len(items) < page_size:
break
last_evaluated_key = results.last_evaluated_keyPythonScan Operations
# Basic scan
for user in User.scan():
print(f"User: {user.name}, Age: {user.age}")
# Scan with filter
active_users = User.scan(User.is_active == True)
# Scan with multiple conditions
young_active_users = User.scan(
(User.age < 30) & (User.is_active == True)
)
# Parallel scan for better performance
def scan_segment(segment, total_segments):
return User.scan(
segment=segment,
total_segments=total_segments
)
# Use ThreadPoolExecutor for parallel scanning
from concurrent.futures import ThreadPoolExecutor
import threading
all_users = []
lock = threading.Lock()
def process_segment(segment):
segment_users = list(scan_segment(segment, 4))
with lock:
all_users.extend(segment_users)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_segment, i) for i in range(4)]
for future in futures:
future.result()PythonAdvanced Filtering
# Complex filter expressions
from pynamodb.expressions.condition import Exists, NotExists, Size
# Check if attribute exists
users_with_profile = User.scan(
filter_condition=Exists(User.profile_data)
)
# Check attribute size
users_with_tags = User.scan(
filter_condition=Size(User.tags) > 0
)
# Combining conditions
complex_filter = (
(User.age.between(25, 35)) &
(User.is_active == True) &
Exists(User.profile_data)
)
filtered_users = User.scan(filter_condition=complex_filter)Python7. Indexes
Index Types
graph TB
A[DynamoDB Indexes] --> B[Global Secondary Index - GSI]
A --> C[Local Secondary Index - LSI]
B --> D[Different Hash Key]
B --> E[Different Range Key]
B --> F[Separate RCU/WCU]
B --> G[Eventually Consistent]
C --> H[Same Hash Key]
C --> I[Different Range Key]
C --> J[Shares RCU/WCU]
C --> K[Strongly Consistent Option]Global Secondary Index (GSI)
class User(Model):
class Meta:
table_name = 'users'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
email = UnicodeAttribute()
name = UnicodeAttribute()
age = NumberAttribute()
department = UnicodeAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)
# GSI for querying by email
email_index = EmailIndex()
# GSI for querying by department and created_at
department_index = DepartmentIndex()
class EmailIndex(GlobalSecondaryIndex):
class Meta:
index_name = 'email-index'
projection = AllProjection()
email = UnicodeAttribute(hash_key=True)
class DepartmentIndex(GlobalSecondaryIndex):
class Meta:
index_name = 'department-created-index'
projection = AllProjection()
department = UnicodeAttribute(hash_key=True)
created_at = UTCDateTimeAttribute(range_key=True)PythonUsing Indexes
# Query using GSI
# Find user by email
try:
users = User.email_index.query('john@example.com')
user = next(iter(users))
print(f"Found user: {user.name}")
except StopIteration:
print("User not found")
# Query department index
engineering_users = User.department_index.query(
'Engineering',
User.created_at > datetime(2023, 1, 1)
)
for user in engineering_users:
print(f"Engineer: {user.name}, Joined: {user.created_at}")PythonLocal Secondary Index (LSI)
class Order(Model):
class Meta:
table_name = 'orders'
region = 'us-west-2'
customer_id = UnicodeAttribute(hash_key=True)
order_id = UnicodeAttribute(range_key=True)
order_date = UTCDateTimeAttribute()
status = UnicodeAttribute()
total_amount = NumberAttribute()
# LSI for querying by customer and order_date
date_index = OrderDateIndex()
# LSI for querying by customer and status
status_index = OrderStatusIndex()
class OrderDateIndex(LocalSecondaryIndex):
class Meta:
index_name = 'customer-date-index'
projection = AllProjection()
customer_id = UnicodeAttribute(hash_key=True)
order_date = UTCDateTimeAttribute(range_key=True)
class OrderStatusIndex(LocalSecondaryIndex):
class Meta:
index_name = 'customer-status-index'
projection = AllProjection()
customer_id = UnicodeAttribute(hash_key=True)
status = UnicodeAttribute(range_key=True)Python8. Advanced Features
Transactions
sequenceDiagram
participant App
participant PynamoDB
participant DynamoDB
App->>PynamoDB: Transaction.write()
PynamoDB->>DynamoDB: TransactWriteItems
alt Success
DynamoDB->>PynamoDB: All operations succeed
PynamoDB->>App: Success
else Failure
DynamoDB->>PynamoDB: Transaction fails
PynamoDB->>App: Exception
endfrom pynamodb.transactions import Transaction
# Transaction example: Transfer money between accounts
class Account(Model):
class Meta:
table_name = 'accounts'
region = 'us-west-2'
account_id = UnicodeAttribute(hash_key=True)
balance = NumberAttribute()
version = NumberAttribute(default=0)
def transfer_money(from_account_id, to_account_id, amount):
# Get current account states
from_account = Account.get(from_account_id)
to_account = Account.get(to_account_id)
# Check sufficient balance
if from_account.balance < amount:
raise ValueError("Insufficient balance")
# Prepare transaction
with Transaction() as transaction:
# Update from account
transaction.update(
from_account,
actions=[
Account.balance.add(-amount),
Account.version.add(1)
],
condition=Account.version == from_account.version
)
# Update to account
transaction.update(
to_account,
actions=[
Account.balance.add(amount),
Account.version.add(1)
],
condition=Account.version == to_account.version
)
# Usage
try:
transfer_money('account1', 'account2', 100)
print("Transfer successful")
except Exception as e:
print(f"Transfer failed: {e}")PythonConditional Operations
# Optimistic locking with version field
class Document(Model):
class Meta:
table_name = 'documents'
region = 'us-west-2'
doc_id = UnicodeAttribute(hash_key=True)
content = UnicodeAttribute()
version = NumberAttribute(default=1)
last_modified = UTCDateTimeAttribute(default=datetime.now)
def update_document(doc_id, new_content):
doc = Document.get(doc_id)
current_version = doc.version
try:
doc.update(
actions=[
Document.content.set(new_content),
Document.version.add(1),
Document.last_modified.set(datetime.now())
],
condition=Document.version == current_version
)
return True
except Document.UpdateError:
return False # Document was modified by another process
# Usage
if update_document('doc123', 'Updated content'):
print("Document updated successfully")
else:
print("Document was modified by another process, please retry")PythonBatch Operations
# Batch write operations
def bulk_create_users(user_data_list):
with User.batch_write() as batch:
for user_data in user_data_list:
user = User(**user_data)
batch.save(user)
# Batch read operations
def get_users_batch(user_ids):
users = {}
for user in User.batch_get(user_ids):
users[user.user_id] = user
return users
# Usage
user_data = [
{'user_id': 'user1', 'name': 'Alice', 'email': 'alice@example.com'},
{'user_id': 'user2', 'name': 'Bob', 'email': 'bob@example.com'},
{'user_id': 'user3', 'name': 'Charlie', 'email': 'charlie@example.com'},
]
bulk_create_users(user_data)
users = get_users_batch(['user1', 'user2', 'user3'])PythonStreams and Change Data Capture
# Enable streams on table
class AuditableUser(Model):
class Meta:
table_name = 'auditable_users'
region = 'us-west-2'
stream_view_type = 'NEW_AND_OLD_IMAGES' # Enable streams
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
email = UnicodeAttribute()
updated_at = UTCDateTimeAttribute(default=datetime.now)
# Stream processing function (typically used with AWS Lambda)
def process_stream_record(record):
event_name = record['eventName']
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
print(f"New user created: {new_image}")
elif event_name == 'MODIFY':
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
print(f"User updated: {old_image} -> {new_image}")
elif event_name == 'REMOVE':
old_image = record['dynamodb']['OldImage']
print(f"User deleted: {old_image}")Python9. Best Practices
Design Patterns
graph TB
A[DynamoDB Design Patterns] --> B[Single Table Design]
A --> C[Access Pattern First]
A --> D[Denormalization]
A --> E[Hot Partitions Avoidance]
B --> F[Multiple Entity Types]
B --> G[Overloaded Indexes]
C --> H[Define Queries First]
C --> I[Design Tables Second]
D --> J[Duplicate Data]
D --> K[Optimize for Reads]
E --> L[Distribute Load]
E --> M[Use Random Prefixes]Model Design Best Practices
# 1. Use meaningful naming conventions
class UserProfile(Model):
class Meta:
table_name = 'user_profiles'
region = 'us-west-2'
# Use descriptive attribute names
user_id = UnicodeAttribute(hash_key=True)
full_name = UnicodeAttribute()
email_address = UnicodeAttribute()
date_of_birth = UTCDateTimeAttribute()
account_status = UnicodeAttribute() # active, inactive, suspended
# Add metadata for tracking
created_at = UTCDateTimeAttribute(default=datetime.now)
updated_at = UTCDateTimeAttribute(default=datetime.now)
def save(self, **kwargs):
self.updated_at = datetime.now()
super().save(**kwargs)
# 2. Use enums for limited value sets
from enum import Enum
class AccountStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class UserProfile(Model):
# ... other attributes ...
account_status = UnicodeAttribute()
def set_status(self, status: AccountStatus):
self.account_status = status.value
# 3. Implement data validation
from pynamodb.exceptions import DoesNotExist
import re
class UserProfile(Model):
# ... attributes ...
def clean(self):
# Email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, self.email_address):
raise ValueError("Invalid email address format")
# Age validation
if self.date_of_birth and self.date_of_birth > datetime.now():
raise ValueError("Date of birth cannot be in the future")
def save(self, **kwargs):
self.clean()
super().save(**kwargs)PythonError Handling
from pynamodb.exceptions import (
DoesNotExist, UpdateError, DeleteError,
TransactWriteError, PutError
)
import logging
logger = logging.getLogger(__name__)
class UserService:
@staticmethod
def create_user(user_data):
try:
user = User(**user_data)
user.save()
logger.info(f"User created successfully: {user.user_id}")
return user
except PutError as e:
logger.error(f"Failed to create user: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error creating user: {e}")
raise
@staticmethod
def get_user(user_id):
try:
return User.get(user_id)
except DoesNotExist:
logger.warning(f"User not found: {user_id}")
return None
except Exception as e:
logger.error(f"Error retrieving user {user_id}: {e}")
raise
@staticmethod
def update_user(user_id, updates):
try:
user = User.get(user_id)
for field, value in updates.items():
setattr(user, field, value)
user.save()
logger.info(f"User updated successfully: {user_id}")
return user
except DoesNotExist:
logger.warning(f"Cannot update non-existent user: {user_id}")
return None
except UpdateError as e:
logger.error(f"Failed to update user {user_id}: {e}")
raisePythonConnection Management
# Connection pooling and configuration
from pynamodb.connection import Connection
from pynamodb.models import Model
class BaseModel(Model):
class Meta:
abstract = True
region = 'us-west-2'
# Configure connection settings
max_retry_attempts = 3
base_backoff_ms = 25
max_backoff_ms = 1000
@classmethod
def get_connection(cls):
# Custom connection configuration
if not hasattr(cls, '_connection'):
cls._connection = Connection(
region=cls.Meta.region,
max_retry_attempts=cls.Meta.max_retry_attempts,
base_backoff_ms=cls.Meta.base_backoff_ms,
max_backoff_ms=cls.Meta.max_backoff_ms
)
return cls._connection
class User(BaseModel):
class Meta:
table_name = 'users'
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()Python10. Real-World Examples
E-commerce Application
erDiagram
USER ||--o{ ORDER : places
ORDER ||--o{ ORDER_ITEM : contains
PRODUCT ||--o{ ORDER_ITEM : "ordered as"
USER ||--o{ CART_ITEM : has
PRODUCT ||--o{ CART_ITEM : "added to"
USER {
string user_id PK
string email
string name
datetime created_at
}
ORDER {
string user_id FK
string order_id PK
string status
decimal total_amount
datetime created_at
}
ORDER_ITEM {
string order_id FK
string product_id FK
int quantity
decimal price
}
PRODUCT {
string product_id PK
string name
string category
decimal price
int stock_quantity
}
# E-commerce models
class User(Model):
class Meta:
table_name = 'ecommerce_users'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
email = UnicodeAttribute()
name = UnicodeAttribute()
address = JSONAttribute(null=True)
created_at = UTCDateTimeAttribute(default=datetime.now)
# GSI for email lookup
email_index = EmailIndex()
class Product(Model):
class Meta:
table_name = 'products'
region = 'us-west-2'
product_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
description = UnicodeAttribute()
category = UnicodeAttribute()
price = NumberAttribute()
stock_quantity = NumberAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)
# GSI for category browsing
category_index = CategoryIndex()
class Order(Model):
class Meta:
table_name = 'orders'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
order_id = UnicodeAttribute(range_key=True)
status = UnicodeAttribute() # pending, confirmed, shipped, delivered
total_amount = NumberAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)
updated_at = UTCDateTimeAttribute(default=datetime.now)
# LSI for order date queries
date_index = OrderDateIndex()
class OrderItem(Model):
class Meta:
table_name = 'order_items'
region = 'us-west-2'
order_id = UnicodeAttribute(hash_key=True)
product_id = UnicodeAttribute(range_key=True)
quantity = NumberAttribute()
price = NumberAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)
# Service layer for business logic
class OrderService:
@staticmethod
def create_order(user_id, items_data):
"""Create a new order with items"""
import uuid
from pynamodb.transactions import Transaction
order_id = str(uuid.uuid4())
total_amount = 0
# Calculate total and validate stock
validated_items = []
for item_data in items_data:
product = Product.get(item_data['product_id'])
if product.stock_quantity < item_data['quantity']:
raise ValueError(f"Insufficient stock for {product.name}")
item_total = product.price * item_data['quantity']
total_amount += item_total
validated_items.append({
'product': product,
'quantity': item_data['quantity'],
'price': product.price
})
# Create order and items in transaction
with Transaction() as transaction:
# Create order
order = Order(
user_id=user_id,
order_id=order_id,
status='pending',
total_amount=total_amount
)
transaction.save(order)
# Create order items and update stock
for item in validated_items:
order_item = OrderItem(
order_id=order_id,
product_id=item['product'].product_id,
quantity=item['quantity'],
price=item['price']
)
transaction.save(order_item)
# Update product stock
transaction.update(
item['product'],
actions=[Product.stock_quantity.add(-item['quantity'])]
)
return order
@staticmethod
def get_user_orders(user_id, limit=10):
"""Get user's recent orders"""
orders = Order.query(
user_id,
scan_index_forward=False, # Most recent first
limit=limit
)
return list(orders)
@staticmethod
def update_order_status(user_id, order_id, new_status):
"""Update order status"""
try:
order = Order.get(user_id, order_id)
order.status = new_status
order.updated_at = datetime.now()
order.save()
return order
except Order.DoesNotExist:
raise ValueError("Order not found")PythonSocial Media Application
# Social media models
class User(Model):
class Meta:
table_name = 'social_users'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
username = UnicodeAttribute()
email = UnicodeAttribute()
bio = UnicodeAttribute(null=True)
follower_count = NumberAttribute(default=0)
following_count = NumberAttribute(default=0)
post_count = NumberAttribute(default=0)
created_at = UTCDateTimeAttribute(default=datetime.now)
class Post(Model):
class Meta:
table_name = 'posts'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
post_id = UnicodeAttribute(range_key=True)
content = UnicodeAttribute()
image_urls = ListAttribute(null=True)
like_count = NumberAttribute(default=0)
comment_count = NumberAttribute(default=0)
created_at = UTCDateTimeAttribute(default=datetime.now)
# GSI for timeline queries
timeline_index = TimelineIndex()
class Follow(Model):
class Meta:
table_name = 'follows'
region = 'us-west-2'
follower_id = UnicodeAttribute(hash_key=True)
following_id = UnicodeAttribute(range_key=True)
created_at = UTCDateTimeAttribute(default=datetime.now)
class Like(Model):
class Meta:
table_name = 'likes'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
post_id = UnicodeAttribute(range_key=True)
created_at = UTCDateTimeAttribute(default=datetime.now)
# Social media service
class SocialMediaService:
@staticmethod
def create_post(user_id, content, image_urls=None):
"""Create a new post"""
import uuid
post_id = str(uuid.uuid4())
post = Post(
user_id=user_id,
post_id=post_id,
content=content,
image_urls=image_urls or []
)
post.save()
# Update user's post count
user = User.get(user_id)
user.update(actions=[User.post_count.add(1)])
return post
@staticmethod
def follow_user(follower_id, following_id):
"""Follow a user"""
if follower_id == following_id:
raise ValueError("Cannot follow yourself")
try:
# Check if already following
Follow.get(follower_id, following_id)
raise ValueError("Already following this user")
except Follow.DoesNotExist:
pass
# Create follow relationship
follow = Follow(
follower_id=follower_id,
following_id=following_id
)
follow.save()
# Update counts
follower = User.get(follower_id)
following = User.get(following_id)
follower.update(actions=[User.following_count.add(1)])
following.update(actions=[User.follower_count.add(1)])
return follow
@staticmethod
def get_user_timeline(user_id, limit=20):
"""Get user's timeline (their posts)"""
posts = Post.query(
user_id,
scan_index_forward=False, # Most recent first
limit=limit
)
return list(posts)
@staticmethod
def like_post(user_id, post_user_id, post_id):
"""Like a post"""
try:
# Check if already liked
Like.get(user_id, post_id)
raise ValueError("Already liked this post")
except Like.DoesNotExist:
pass
# Create like
like = Like(user_id=user_id, post_id=post_id)
like.save()
# Update post like count
post = Post.get(post_user_id, post_id)
post.update(actions=[Post.like_count.add(1)])
return likePython11. Performance Optimization
Query Optimization
graph TB
A[Query Optimization] --> B[Use Primary Keys]
A --> C[Leverage Indexes]
A --> D[Batch Operations]
A --> E[Pagination]
A --> F[Projection]
B --> G[Direct Access]
B --> H[Consistent Performance]
C --> I[GSI for Different Access Patterns]
C --> J[LSI for Sort Variations]
D --> K[Reduce API Calls]
D --> L[Better Throughput]
E --> M[Limit Result Sets]
E --> N[Memory Efficiency]
F --> O[Reduce Data Transfer]
F --> P[Faster Queries]# Optimization techniques
class OptimizedUserService:
@staticmethod
def get_users_by_ids(user_ids):
"""Efficient batch retrieval"""
# Use batch_get instead of individual gets
users = {}
# Process in chunks of 100 (DynamoDB limit)
chunk_size = 100
for i in range(0, len(user_ids), chunk_size):
chunk = user_ids[i:i + chunk_size]
for user in User.batch_get(chunk):
users[user.user_id] = user
return users
@staticmethod
def get_active_users_paginated(page_size=50, last_key=None):
"""Efficient pagination with filtering"""
query_kwargs = {
'filter_condition': User.is_active == True,
'limit': page_size
}
if last_key:
query_kwargs['last_evaluated_key'] = last_key
results = User.scan(**query_kwargs)
items = list(results)
return {
'users': items,
'last_evaluated_key': results.last_evaluated_key,
'has_more': len(items) == page_size
}
@staticmethod
def get_user_summary(user_id):
"""Use projection to get only needed attributes"""
try:
# Only fetch specific attributes
user = User.get(
user_id,
attributes_to_get=['user_id', 'name', 'email', 'is_active']
)
return {
'user_id': user.user_id,
'name': user.name,
'email': user.email,
'is_active': user.is_active
}
except User.DoesNotExist:
return None
# Connection optimization
class OptimizedConnection:
_connections = {}
@classmethod
def get_connection(cls, region='us-west-2'):
"""Reuse connections per region"""
if region not in cls._connections:
cls._connections[region] = Connection(
region=region,
# Optimize connection settings
max_retry_attempts=5,
base_backoff_ms=100,
max_backoff_ms=5000,
connect_timeout_seconds=10,
read_timeout_seconds=30
)
return cls._connections[region]
# Caching layer
import time
from typing import Optional, Dict, Any
class CacheService:
"""Simple in-memory cache with TTL"""
def __init__(self):
self._cache: Dict[str, Dict[str, Any]] = {}
def get(self, key: str) -> Optional[Any]:
if key in self._cache:
entry = self._cache[key]
if time.time() < entry['expires_at']:
return entry['value']
else:
del self._cache[key]
return None
def set(self, key: str, value: Any, ttl: int = 300):
"""Set cache entry with TTL in seconds"""
self._cache[key] = {
'value': value,
'expires_at': time.time() + ttl
}
def delete(self, key: str):
if key in self._cache:
del self._cache[key]
# Cached user service
class CachedUserService:
def __init__(self):
self.cache = CacheService()
def get_user(self, user_id: str):
"""Get user with caching"""
cache_key = f"user:{user_id}"
# Try cache first
cached_user = self.cache.get(cache_key)
if cached_user:
return cached_user
# Fetch from database
try:
user = User.get(user_id)
user_data = {
'user_id': user.user_id,
'name': user.name,
'email': user.email,
'is_active': user.is_active
}
# Cache for 5 minutes
self.cache.set(cache_key, user_data, ttl=300)
return user_data
except User.DoesNotExist:
return None
def update_user(self, user_id: str, updates: Dict[str, Any]):
"""Update user and invalidate cache"""
try:
user = User.get(user_id)
for field, value in updates.items():
setattr(user, field, value)
user.save()
# Invalidate cache
cache_key = f"user:{user_id}"
self.cache.delete(cache_key)
return True
except User.DoesNotExist:
return FalsePythonCapacity Planning
# Capacity monitoring and auto-scaling
class CapacityMonitor:
def __init__(self, table_name):
self.table_name = table_name
self.cloudwatch = boto3.client('cloudwatch')
def get_consumption_metrics(self, hours=1):
"""Get read/write capacity consumption"""
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
metrics = {}
for metric_name in ['ConsumedReadCapacityUnits', 'ConsumedWriteCapacityUnits']:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName=metric_name,
Dimensions=[
{'Name': 'TableName', 'Value': self.table_name}
],
StartTime=start_time,
EndTime=end_time,
Period=300, # 5 minutes
Statistics=['Sum', 'Average', 'Maximum']
)
metrics[metric_name] = response['Datapoints']
return metrics
def recommend_capacity(self, metrics):
"""Recommend capacity based on usage patterns"""
# Simple recommendation logic
recommendations = {}
for metric_name, datapoints in metrics.items():
if not datapoints:
continue
max_consumption = max(point['Maximum'] for point in datapoints)
avg_consumption = sum(point['Average'] for point in datapoints) / len(datapoints)
# Recommend 20% above peak usage
recommended = int(max_consumption * 1.2)
capacity_type = 'read' if 'Read' in metric_name else 'write'
recommendations[capacity_type] = {
'current_peak': max_consumption,
'average': avg_consumption,
'recommended': recommended
}
return recommendationsPython12. Testing
Unit Testing Setup
import unittest
from unittest.mock import patch, MagicMock
from moto import mock_dynamodb
import boto3
from datetime import datetime
# Test models
class TestUser(Model):
class Meta:
table_name = 'test_users'
region = 'us-east-1'
host = 'http://localhost:8000' # Local DynamoDB
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
email = UnicodeAttribute()
created_at = UTCDateTimeAttribute(default=datetime.now)
class TestUserModel(unittest.TestCase):
@mock_dynamodb
def setUp(self):
"""Set up test environment"""
# Create table for testing
TestUser.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)
@mock_dynamodb
def test_create_user(self):
"""Test user creation"""
user = TestUser(
user_id='test123',
name='Test User',
email='test@example.com'
)
user.save()
# Verify user was created
retrieved_user = TestUser.get('test123')
self.assertEqual(retrieved_user.name, 'Test User')
self.assertEqual(retrieved_user.email, 'test@example.com')
@mock_dynamodb
def test_user_not_found(self):
"""Test handling of non-existent user"""
with self.assertRaises(TestUser.DoesNotExist):
TestUser.get('nonexistent')
@mock_dynamodb
def test_update_user(self):
"""Test user update"""
# Create user
user = TestUser(
user_id='test123',
name='Original Name',
email='original@example.com'
)
user.save()
# Update user
user.name = 'Updated Name'
user.save()
# Verify update
updated_user = TestUser.get('test123')
self.assertEqual(updated_user.name, 'Updated Name')
@mock_dynamodb
def test_delete_user(self):
"""Test user deletion"""
# Create user
user = TestUser(
user_id='test123',
name='Test User',
email='test@example.com'
)
user.save()
# Delete user
user.delete()
# Verify deletion
with self.assertRaises(TestUser.DoesNotExist):
TestUser.get('test123')
# Integration testing
class TestUserService(unittest.TestCase):
@mock_dynamodb
def setUp(self):
TestUser.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)
self.user_service = UserService()
@mock_dynamodb
def test_create_user_service(self):
"""Test user creation through service"""
user_data = {
'user_id': 'service_test',
'name': 'Service User',
'email': 'service@example.com'
}
user = self.user_service.create_user(user_data)
self.assertIsNotNone(user)
self.assertEqual(user.user_id, 'service_test')
@mock_dynamodb
def test_get_user_service(self):
"""Test user retrieval through service"""
# Create user first
TestUser(
user_id='service_test',
name='Service User',
email='service@example.com'
).save()
user = self.user_service.get_user('service_test')
self.assertIsNotNone(user)
self.assertEqual(user.name, 'Service User')
@mock_dynamodb
def test_user_not_found_service(self):
"""Test handling of non-existent user in service"""
user = self.user_service.get_user('nonexistent')
self.assertIsNone(user)
# Performance testing
import time
class TestPerformance(unittest.TestCase):
@mock_dynamodb
def setUp(self):
TestUser.create_table(read_capacity_units=5, write_capacity_units=5, wait=True)
@mock_dynamodb
def test_batch_operations_performance(self):
"""Test batch operations performance"""
start_time = time.time()
# Create 100 users using batch write
users_data = [
{
'user_id': f'user_{i}',
'name': f'User {i}',
'email': f'user{i}@example.com'
}
for i in range(100)
]
with TestUser.batch_write() as batch:
for user_data in users_data:
user = TestUser(**user_data)
batch.save(user)
batch_time = time.time() - start_time
# Verify all users were created
user_ids = [f'user_{i}' for i in range(100)]
retrieved_users = list(TestUser.batch_get(user_ids))
self.assertEqual(len(retrieved_users), 100)
print(f"Batch operation completed in {batch_time:.2f} seconds")
# Test utilities
class TestUtilities:
@staticmethod
def create_test_data():
"""Create test data for development"""
users_data = [
{'user_id': 'alice', 'name': 'Alice Johnson', 'email': 'alice@example.com'},
{'user_id': 'bob', 'name': 'Bob Smith', 'email': 'bob@example.com'},
{'user_id': 'charlie', 'name': 'Charlie Brown', 'email': 'charlie@example.com'},
]
for user_data in users_data:
user = TestUser(**user_data)
user.save()
print(f"Created {len(users_data)} test users")
@staticmethod
def cleanup_test_data():
"""Clean up test data"""
for user in TestUser.scan():
user.delete()
print("Cleaned up all test data")
if __name__ == '__main__':
unittest.main()PythonTesting with Docker
# Dockerfile for local DynamoDB testing
FROM amazon/dynamodb-local:latest
EXPOSE 8000
CMD ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]Dockerfile# docker-compose.yml for testing environment
version: '3.8'
services:
dynamodb-local:
image: amazon/dynamodb-local:latest
container_name: dynamodb-local
ports:
- "8000:8000"
command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]
app:
build: .
depends_on:
- dynamodb-local
environment:
- DYNAMODB_ENDPOINT=http://dynamodb-local:8000
- AWS_ACCESS_KEY_ID=fakeMyKeyId
- AWS_SECRET_ACCESS_KEY=fakeSecretAccessKey
- AWS_DEFAULT_REGION=us-east-1YAML13. Deployment Considerations
Environment Configuration
import os
from enum import Enum
class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Config:
def __init__(self):
self.environment = Environment(os.getenv('ENVIRONMENT', 'development'))
self.aws_region = os.getenv('AWS_DEFAULT_REGION', 'us-west-2')
self.dynamodb_endpoint = os.getenv('DYNAMODB_ENDPOINT')
@property
def table_prefix(self):
"""Get table prefix based on environment"""
if self.environment == Environment.DEVELOPMENT:
return "dev_"
elif self.environment == Environment.STAGING:
return "staging_"
else:
return ""
@property
def is_local(self):
"""Check if using local DynamoDB"""
return self.dynamodb_endpoint is not None
config = Config()
# Base model with environment-aware configuration
class BaseModel(Model):
class Meta:
abstract = True
region = config.aws_region
if config.is_local:
host = config.dynamodb_endpoint
@classmethod
def get_table_name(cls):
"""Get table name with environment prefix"""
base_name = cls.Meta.table_name
return f"{config.table_prefix}{base_name}"
# Environment-specific models
class User(BaseModel):
class Meta:
table_name = 'users'
user_id = UnicodeAttribute(hash_key=True)
name = UnicodeAttribute()
email = UnicodeAttribute()PythonInfrastructure as Code
# CloudFormation template for DynamoDB tables
AWSTemplateFormatVersion: '2010-09-09'
Description: 'DynamoDB tables for PynamoDB application'
Parameters:
Environment:
Type: String
Default: development
AllowedValues: [development, staging, production]
TablePrefix:
Type: String
Default: ''
Resources:
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${TablePrefix}users'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: email
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: email-index
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Application
Value: PynamoDBApp
OrdersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${TablePrefix}orders'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: user_id
AttributeType: S
- AttributeName: order_id
AttributeType: S
- AttributeName: created_at
AttributeType: S
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: order_id
KeyType: RANGE
LocalSecondaryIndexes:
- IndexName: user-created-index
KeySchema:
- AttributeName: user_id
KeyType: HASH
- AttributeName: created_at
KeyType: RANGE
Projection:
ProjectionType: ALL
Outputs:
UsersTableName:
Description: 'Name of the Users table'
Value: !Ref UsersTable
Export:
Name: !Sub '${AWS::StackName}-UsersTable'
OrdersTableName:
Description: 'Name of the Orders table'
Value: !Ref OrdersTable
Export:
Name: !Sub '${AWS::StackName}-OrdersTable'YAMLMonitoring and Logging
import logging
import boto3
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DynamoDBMonitor:
def __init__(self, table_names):
self.table_names = table_names
self.cloudwatch = boto3.client('cloudwatch')
self.dynamodb = boto3.client('dynamodb')
def check_table_health(self):
"""Check health of all tables"""
health_status = {}
for table_name in self.table_names:
try:
response = self.dynamodb.describe_table(TableName=table_name)
table_status = response['Table']['TableStatus']
health_status[table_name] = {
'status': table_status,
'healthy': table_status == 'ACTIVE'
}
if table_status != 'ACTIVE':
logger.warning(f"Table {table_name} is not active: {table_status}")
except Exception as e:
logger.error(f"Error checking table {table_name}: {e}")
health_status[table_name] = {
'status': 'ERROR',
'healthy': False,
'error': str(e)
}
return health_status
def get_throttling_metrics(self, hours=1):
"""Check for throttling events"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
throttling_data = {}
for table_name in self.table_names:
for operation in ['Query', 'Scan', 'GetItem', 'PutItem', 'UpdateItem', 'DeleteItem']:
try:
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName='ThrottledRequests',
Dimensions=[
{'Name': 'TableName', 'Value': table_name},
{'Name': 'Operation', 'Value': operation}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Sum']
)
datapoints = response.get('Datapoints', [])
if datapoints:
total_throttles = sum(point['Sum'] for point in datapoints)
if total_throttles > 0:
throttling_data[f"{table_name}_{operation}"] = total_throttles
logger.warning(f"Throttling detected: {table_name} {operation} - {total_throttles} events")
except Exception as e:
logger.error(f"Error getting throttling metrics for {table_name}: {e}")
return throttling_data
# Application metrics
class ApplicationMetrics:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def record_operation_latency(self, operation_name, latency_ms):
"""Record operation latency"""
try:
self.cloudwatch.put_metric_data(
Namespace='PynamoDB/Application',
MetricData=[
{
'MetricName': 'OperationLatency',
'Dimensions': [
{
'Name': 'Operation',
'Value': operation_name
}
],
'Value': latency_ms,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
logger.error(f"Failed to record metric: {e}")
def record_error(self, operation_name, error_type):
"""Record application errors"""
try:
self.cloudwatch.put_metric_data(
Namespace='PynamoDB/Application',
MetricData=[
{
'MetricName': 'Errors',
'Dimensions': [
{
'Name': 'Operation',
'Value': operation_name
},
{
'Name': 'ErrorType',
'Value': error_type
}
],
'Value': 1,
'Unit': 'Count',
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
logger.error(f"Failed to record error metric: {e}")
# Decorator for monitoring
import functools
import time
def monitor_performance(operation_name):
"""Decorator to monitor operation performance"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
metrics = ApplicationMetrics()
start_time = time.time()
try:
result = func(*args, **kwargs)
# Record successful operation
latency = (time.time() - start_time) * 1000
metrics.record_operation_latency(operation_name, latency)
return result
except Exception as e:
# Record error
error_type = type(e).__name__
metrics.record_error(operation_name, error_type)
raise
return wrapper
return decorator
# Usage example
class MonitoredUserService:
@monitor_performance('create_user')
def create_user(self, user_data):
user = User(**user_data)
user.save()
return user
@monitor_performance('get_user')
def get_user(self, user_id):
try:
return User.get(user_id)
except User.DoesNotExist:
return NonePythonSecurity Best Practices
# IAM Policy Templates
def generate_application_policy(table_arns, environment):
"""Generate IAM policy for application access"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem",
"dynamodb:TransactGetItems",
"dynamodb:TransactWriteItems"
],
"Resource": table_arns
},
{
"Effect": "Allow",
"Action": [
"dynamodb:Query",
"dynamodb:Scan"
],
"Resource": [f"{arn}/index/*" for arn in table_arns]
}
]
}
# Add additional restrictions for production
if environment == 'production':
# Deny destructive operations on main table
policy["Statement"].append({
"Effect": "Deny",
"Action": [
"dynamodb:DeleteTable",
"dynamodb:UpdateTable"
],
"Resource": table_arns
})
# Add condition for source IP if needed
# policy["Statement"][0]["Condition"] = {
# "IpAddress": {
# "aws:SourceIp": ["10.0.0.0/8"]
# }
# }
return policy
# Data encryption utilities
class EncryptionHelper:
def __init__(self, kms_key_id=None):
self.kms_key_id = kms_key_id
self.kms_client = boto3.client('kms') if kms_key_id else None
def encrypt_sensitive_data(self, data):
"""Encrypt sensitive data before storing in DynamoDB"""
if not self.kms_client:
raise ValueError("KMS key ID not configured")
response = self.kms_client.encrypt(
KeyId=self.kms_key_id,
Plaintext=json.dumps(data).encode('utf-8')
)
import base64
return base64.b64encode(response['CiphertextBlob']).decode('utf-8')
def decrypt_sensitive_data(self, encrypted_data):
"""Decrypt sensitive data retrieved from DynamoDB"""
if not self.kms_client:
raise ValueError("KMS key ID not configured")
import base64
ciphertext_blob = base64.b64decode(encrypted_data.encode('utf-8'))
response = self.kms_client.decrypt(CiphertextBlob=ciphertext_blob)
return json.loads(response['Plaintext'].decode('utf-8'))
# Secure model with encryption
class SecureUserProfile(Model):
class Meta:
table_name = 'secure_user_profiles'
region = 'us-west-2'
user_id = UnicodeAttribute(hash_key=True)
username = UnicodeAttribute()
encrypted_email = UnicodeAttribute() # Encrypted
encrypted_personal_data = UnicodeAttribute() # Encrypted PII
created_at = UTCDateTimeAttribute(default=datetime.now)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.encryption_helper = EncryptionHelper(
kms_key_id=os.getenv('KMS_KEY_ID')
)
def set_email(self, email):
"""Set encrypted email"""
self.encrypted_email = self.encryption_helper.encrypt_sensitive_data(email)
def get_email(self):
"""Get decrypted email"""
if self.encrypted_email:
return self.encryption_helper.decrypt_sensitive_data(self.encrypted_email)
return None
def set_personal_data(self, data):
"""Set encrypted personal data"""
self.encrypted_personal_data = self.encryption_helper.encrypt_sensitive_data(data)
def get_personal_data(self):
"""Get decrypted personal data"""
if self.encrypted_personal_data:
return self.encryption_helper.decrypt_sensitive_data(self.encrypted_personal_data)
return NonePythonBackup and Disaster Recovery
class BackupManager:
def __init__(self, table_names):
self.table_names = table_names
self.dynamodb = boto3.client('dynamodb')
self.backup_client = boto3.client('dynamodb')
def create_backup(self, table_name, backup_name=None):
"""Create on-demand backup"""
if not backup_name:
backup_name = f"{table_name}-backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
try:
response = self.backup_client.create_backup(
TableName=table_name,
BackupName=backup_name
)
backup_arn = response['BackupDetails']['BackupArn']
print(f"Backup created: {backup_name} ({backup_arn})")
return backup_arn
except Exception as e:
print(f"Failed to create backup for {table_name}: {e}")
return None
def list_backups(self, table_name=None):
"""List available backups"""
kwargs = {}
if table_name:
kwargs['TableName'] = table_name
try:
response = self.backup_client.list_backups(**kwargs)
return response['BackupSummaries']
except Exception as e:
print(f"Failed to list backups: {e}")
return []
def restore_from_backup(self, backup_arn, target_table_name):
"""Restore table from backup"""
try:
response = self.backup_client.restore_table_from_backup(
TargetTableName=target_table_name,
BackupArn=backup_arn
)
print(f"Restore initiated: {target_table_name}")
return response['TableDescription']['TableArn']
except Exception as e:
print(f"Failed to restore from backup: {e}")
return None
def setup_point_in_time_recovery(self):
"""Enable point-in-time recovery for all tables"""
for table_name in self.table_names:
try:
self.dynamodb.update_continuous_backups(
TableName=table_name,
PointInTimeRecoverySpecification={
'PointInTimeRecoveryEnabled': True
}
)
print(f"Point-in-time recovery enabled for {table_name}")
except Exception as e:
print(f"Failed to enable PITR for {table_name}: {e}")
# Cross-region replication setup
class ReplicationManager:
def __init__(self, source_region, target_regions):
self.source_region = source_region
self.target_regions = target_regions
def setup_global_tables(self, table_name):
"""Setup DynamoDB Global Tables for cross-region replication"""
try:
# Create replicas in target regions
replica_updates = []
for region in self.target_regions:
replica_updates.append({
'Create': {
'RegionName': region
}
})
dynamodb = boto3.client('dynamodb', region_name=self.source_region)
response = dynamodb.create_global_table(
GlobalTableName=table_name,
ReplicationGroup=[
{'RegionName': self.source_region},
*[{'RegionName': region} for region in self.target_regions]
]
)
print(f"Global table created: {table_name}")
return response['GlobalTableDescription']
except Exception as e:
print(f"Failed to create global table {table_name}: {e}")
return NonePythonConclusion
This comprehensive guide has covered PynamoDB from basic concepts to advanced production deployment strategies. Key takeaways include:
For Beginners:
- Start with simple models and basic CRUD operations
- Understand DynamoDB’s key concepts: hash keys, range keys, and indexes
- Practice with local DynamoDB for development
For Intermediate Users:
- Master querying and filtering techniques
- Implement proper error handling and validation
- Use transactions for complex operations
- Design efficient indexes for your access patterns
For Advanced Users:
- Optimize performance with caching and connection pooling
- Implement comprehensive monitoring and alerting
- Design for security with encryption and proper IAM policies
- Plan for disaster recovery with backups and replication
Production Readiness Checklist:
graph TB
A[Production Deployment] --> B[Security]
A --> C[Monitoring]
A --> D[Performance]
A --> E[Reliability]
B --> F[IAM Policies]
B --> G[Data Encryption]
B --> H[Network Security]
C --> I[CloudWatch Metrics]
C --> J[Custom Dashboards]
C --> K[Alerting Rules]
D --> L[Capacity Planning]
D --> M[Connection Optimization]
D --> N[Caching Strategy]
E --> O[Backup Strategy]
E --> P[Disaster Recovery]
E --> Q[High Availability]PynamoDB provides a powerful, Pythonic way to work with DynamoDB while maintaining the flexibility and performance benefits of NoSQL databases. By following the patterns and practices outlined in this guide, you can build robust, scalable applications that leverage the full power of DynamoDB through PynamoDB’s elegant interface.
Remember to always design your data models around your access patterns, implement proper error handling, monitor your applications in production, and maintain security best practices throughout your development lifecycle.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
