Table of Contents

    1. Introduction to PynamoDB
    2. Getting Started
    3. Basic Concepts
    4. Model Definition
    5. CRUD Operations
    6. Querying and Filtering
    7. Indexes
    8. Advanced Features
    9. Best Practices
    10. Real-World Examples
    11. Performance Optimization
    12. Testing
    13. Deployment Considerations

    1. Introduction to PynamoDB

    PynamoDB is a Pythonic interface for Amazon DynamoDB, providing an Object-Relational Mapping (ORM) layer that makes working with DynamoDB more intuitive for Python developers.

    What is DynamoDB?

    graph TB
        A[DynamoDB] --> B[NoSQL Database]
        A --> C[Fully Managed]
        A --> D[Serverless]
        A --> E[Auto-scaling]
        B --> F[Key-Value Store]
        B --> G[Document Store]
        C --> H[No Server Management]
        D --> I[Pay per Request]
        E --> J[Handles Traffic Spikes]

    Why PynamoDB?

    graph LR
        A[Raw Boto3] --> B[Complex Syntax]
        A --> C[Manual Type Conversion]
        A --> D[Error Prone]
    
        E[PynamoDB] --> F[Pythonic Interface]
        E --> G[Automatic Type Handling]
        E --> H[Model-based Approach]
        E --> I[Built-in Validation]

    2. Getting Started

    Installation

    pip install pynamodb
    Bash

    Basic Setup

    from pynamodb.models import Model
    from pynamodb.attributes import UnicodeAttribute, NumberAttribute
    import os
    
    # Configure AWS credentials
    os.environ['AWS_ACCESS_KEY_ID'] = 'your-access-key'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'your-secret-key'
    os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
    Python

    Your First Model

    class User(Model):
        class Meta:
            table_name = 'users'
            region = 'us-west-2'
    
        email = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        age = NumberAttribute()
    Python

    3. Basic Concepts

    DynamoDB Key Concepts

    graph TD
        A[DynamoDB Table] --> B[Primary Key]
        B --> C[Hash Key - Partition Key]
        B --> D[Range Key - Sort Key - Optional]
        A --> E[Items - Records]
        A --> F[Attributes - Fields]
        A --> G[Secondary Indexes]
        G --> H[Global Secondary Index - GSI]
        G --> I[Local Secondary Index - LSI]

    PynamoDB Model Architecture

    classDiagram
        class Model {
            +Meta class
            +Attributes
            +save()
            +delete()
            +query()
            +scan()
        }
    
        class Meta {
            +table_name
            +region
            +host
            +billing_mode
        }
    
        class Attributes {
            +UnicodeAttribute
            +NumberAttribute
            +BinaryAttribute
            +BooleanAttribute
            +UTCDateTimeAttribute
            +ListAttribute
            +MapAttribute
        }
    
        Model --> Meta
        Model --> Attributes

    4. Model Definition

    Basic Model Structure

    from pynamodb.models import Model
    from pynamodb.attributes import (
        UnicodeAttribute, NumberAttribute, BinaryAttribute,
        BooleanAttribute, UTCDateTimeAttribute, ListAttribute,
        MapAttribute, JSONAttribute
    )
    from datetime import datetime
    
    class User(Model):
        class Meta:
            table_name = 'users'
            region = 'us-west-2'
            # Optional: for local development
            # host = 'http://localhost:8000'
    
        # Primary Key
        user_id = UnicodeAttribute(hash_key=True)
    
        # Attributes
        email = UnicodeAttribute()
        name = UnicodeAttribute()
        age = NumberAttribute()
        is_active = BooleanAttribute(default=True)
        created_at = UTCDateTimeAttribute(default=datetime.now)
        profile_data = JSONAttribute(null=True)
    Python

    Composite Primary Key

    class OrderItem(Model):
        class Meta:
            table_name = 'order_items'
            region = 'us-west-2'
    
        order_id = UnicodeAttribute(hash_key=True)
        item_id = UnicodeAttribute(range_key=True)
        quantity = NumberAttribute()
        price = NumberAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
    Python

    Attribute Types Deep Dive

    graph TB
        A[PynamoDB Attributes] --> B[Simple Types]
        A --> C[Complex Types]
        A --> D[Custom Types]
    
        B --> E[UnicodeAttribute]
        B --> F[NumberAttribute]
        B --> G[BinaryAttribute]
        B --> H[BooleanAttribute]
        B --> I[UTCDateTimeAttribute]
    
        C --> J[ListAttribute]
        C --> K[MapAttribute]
        C --> L[JSONAttribute]
        C --> M[SetAttribute]
    
        D --> N[Custom Serialization]
        D --> O[Inheritance]

    Complex Attributes Example

    class Address(MapAttribute):
        street = UnicodeAttribute()
        city = UnicodeAttribute()
        state = UnicodeAttribute()
        zip_code = UnicodeAttribute()
    
    class User(Model):
        class Meta:
            table_name = 'users'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        addresses = ListAttribute(of=Address)
        tags = ListAttribute()
        metadata = JSONAttribute()
    Python

    5. CRUD Operations

    Create Operations

    sequenceDiagram
        participant App
        participant PynamoDB
        participant DynamoDB
    
        App->>PynamoDB: user.save()
        PynamoDB->>DynamoDB: PutItem Request
        DynamoDB->>PynamoDB: Success Response
        PynamoDB->>App: Model Instance
    # Creating a new user
    user = User(
        user_id='user123',
        email='john@example.com',
        name='John Doe',
        age=30
    )
    user.save()
    
    # Alternative creation method
    User.create(
        user_id='user124',
        email='jane@example.com',
        name='Jane Smith',
        age=25
    )
    
    # Conditional creation (only if doesn't exist)
    try:
        User.create(
            user_id='user125',
            email='bob@example.com',
            name='Bob Johnson',
            age=35,
            condition=User.user_id.does_not_exist()
        )
    except User.DoesNotExist:
        print("User already exists")
    Python

    Read Operations

    # Get a single item
    try:
        user = User.get('user123')
        print(f"Found user: {user.name}")
    except User.DoesNotExist:
        print("User not found")
    
    # Get with range key
    try:
        order_item = OrderItem.get('order123', 'item456')
        print(f"Quantity: {order_item.quantity}")
    except OrderItem.DoesNotExist:
        print("Order item not found")
    
    # Batch get
    user_ids = ['user123', 'user124', 'user125']
    users = []
    for user in User.batch_get(user_ids):
        users.append(user)
    Python

    Update Operations

    # Simple update
    user = User.get('user123')
    user.age = 31
    user.save()
    
    # Conditional update
    user.update(actions=[
        User.age.set(32),
        User.name.set('John Updated')
    ], condition=User.age < 35)
    
    # Atomic operations
    user.update(actions=[
        User.age.add(1),  # Increment age by 1
        User.tags.append(['new-tag']),  # Add to list
        User.metadata.set({'last_login': datetime.now().isoformat()})
    ])
    Python

    Delete Operations

    # Delete a single item
    user = User.get('user123')
    user.delete()
    
    # Conditional delete
    user.delete(condition=User.is_active == True)
    
    # Direct delete without fetching
    User.delete('user124')
    
    # Batch delete
    users_to_delete = ['user125', 'user126', 'user127']
    with User.batch_write() as batch:
        for user_id in users_to_delete:
            batch.delete(User(user_id=user_id))
    Python

    6. Querying and Filtering

    Query vs Scan

    graph LR
        A[Data Access Patterns] --> B[Query]
        A --> C[Scan]
    
        B --> D[Uses Primary Key]
        B --> E[Efficient]
        B --> F[Limited Results]
    
        C --> G[Examines All Items]
        C --> H[Expensive]
        C --> I[Flexible Filtering]

    Query Operations

    # Query by hash key
    users = User.query('user123')
    for user in users:
        print(user.name)
    
    # Query with range key condition
    order_items = OrderItem.query(
        'order123',
        OrderItem.item_id.startswith('ELEC')
    )
    
    # Query with filter
    recent_orders = OrderItem.query(
        'order123',
        filter_condition=OrderItem.created_at > datetime(2023, 1, 1)
    )
    
    # Query with pagination
    page_size = 10
    last_evaluated_key = None
    all_items = []
    
    while True:
        if last_evaluated_key:
            results = OrderItem.query(
                'order123',
                limit=page_size,
                last_evaluated_key=last_evaluated_key
            )
        else:
            results = OrderItem.query('order123', limit=page_size)
    
        items = list(results)
        all_items.extend(items)
    
        if len(items) < page_size:
            break
    
        last_evaluated_key = results.last_evaluated_key
    Python

    Scan Operations

    # Basic scan
    for user in User.scan():
        print(f"User: {user.name}, Age: {user.age}")
    
    # Scan with filter
    active_users = User.scan(User.is_active == True)
    
    # Scan with multiple conditions
    young_active_users = User.scan(
        (User.age < 30) & (User.is_active == True)
    )
    
    # Parallel scan for better performance
    def scan_segment(segment, total_segments):
        return User.scan(
            segment=segment,
            total_segments=total_segments
        )
    
    # Use ThreadPoolExecutor for parallel scanning
    from concurrent.futures import ThreadPoolExecutor
    import threading
    
    all_users = []
    lock = threading.Lock()
    
    def process_segment(segment):
        segment_users = list(scan_segment(segment, 4))
        with lock:
            all_users.extend(segment_users)
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(process_segment, i) for i in range(4)]
        for future in futures:
            future.result()
    Python

    Advanced Filtering

    # Complex filter expressions
    from pynamodb.expressions.condition import Exists, NotExists, Size
    
    # Check if attribute exists
    users_with_profile = User.scan(
        filter_condition=Exists(User.profile_data)
    )
    
    # Check attribute size
    users_with_tags = User.scan(
        filter_condition=Size(User.tags) > 0
    )
    
    # Combining conditions
    complex_filter = (
        (User.age.between(25, 35)) &
        (User.is_active == True) &
        Exists(User.profile_data)
    )
    
    filtered_users = User.scan(filter_condition=complex_filter)
    Python

    7. Indexes

    Index Types

    graph TB
        A[DynamoDB Indexes] --> B[Global Secondary Index - GSI]
        A --> C[Local Secondary Index - LSI]
    
        B --> D[Different Hash Key]
        B --> E[Different Range Key]
        B --> F[Separate RCU/WCU]
        B --> G[Eventually Consistent]
    
        C --> H[Same Hash Key]
        C --> I[Different Range Key]
        C --> J[Shares RCU/WCU]
        C --> K[Strongly Consistent Option]

    Global Secondary Index (GSI)

    class User(Model):
        class Meta:
            table_name = 'users'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        email = UnicodeAttribute()
        name = UnicodeAttribute()
        age = NumberAttribute()
        department = UnicodeAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
        # GSI for querying by email
        email_index = EmailIndex()
    
        # GSI for querying by department and created_at
        department_index = DepartmentIndex()
    
    class EmailIndex(GlobalSecondaryIndex):
        class Meta:
            index_name = 'email-index'
            projection = AllProjection()
    
        email = UnicodeAttribute(hash_key=True)
    
    class DepartmentIndex(GlobalSecondaryIndex):
        class Meta:
            index_name = 'department-created-index'
            projection = AllProjection()
    
        department = UnicodeAttribute(hash_key=True)
        created_at = UTCDateTimeAttribute(range_key=True)
    Python

    Using Indexes

    # Query using GSI
    # Find user by email
    try:
        users = User.email_index.query('john@example.com')
        user = next(iter(users))
        print(f"Found user: {user.name}")
    except StopIteration:
        print("User not found")
    
    # Query department index
    engineering_users = User.department_index.query(
        'Engineering',
        User.created_at > datetime(2023, 1, 1)
    )
    
    for user in engineering_users:
        print(f"Engineer: {user.name}, Joined: {user.created_at}")
    Python

    Local Secondary Index (LSI)

    class Order(Model):
        class Meta:
            table_name = 'orders'
            region = 'us-west-2'
    
        customer_id = UnicodeAttribute(hash_key=True)
        order_id = UnicodeAttribute(range_key=True)
        order_date = UTCDateTimeAttribute()
        status = UnicodeAttribute()
        total_amount = NumberAttribute()
    
        # LSI for querying by customer and order_date
        date_index = OrderDateIndex()
    
        # LSI for querying by customer and status
        status_index = OrderStatusIndex()
    
    class OrderDateIndex(LocalSecondaryIndex):
        class Meta:
            index_name = 'customer-date-index'
            projection = AllProjection()
    
        customer_id = UnicodeAttribute(hash_key=True)
        order_date = UTCDateTimeAttribute(range_key=True)
    
    class OrderStatusIndex(LocalSecondaryIndex):
        class Meta:
            index_name = 'customer-status-index'
            projection = AllProjection()
    
        customer_id = UnicodeAttribute(hash_key=True)
        status = UnicodeAttribute(range_key=True)
    Python

    8. Advanced Features

    Transactions

    sequenceDiagram
        participant App
        participant PynamoDB
        participant DynamoDB
    
        App->>PynamoDB: Transaction.write()
        PynamoDB->>DynamoDB: TransactWriteItems
        alt Success
            DynamoDB->>PynamoDB: All operations succeed
            PynamoDB->>App: Success
        else Failure
            DynamoDB->>PynamoDB: Transaction fails
            PynamoDB->>App: Exception
        end
    from pynamodb.transactions import Transaction
    
    # Transaction example: Transfer money between accounts
    class Account(Model):
        class Meta:
            table_name = 'accounts'
            region = 'us-west-2'
    
        account_id = UnicodeAttribute(hash_key=True)
        balance = NumberAttribute()
        version = NumberAttribute(default=0)
    
    def transfer_money(from_account_id, to_account_id, amount):
        # Get current account states
        from_account = Account.get(from_account_id)
        to_account = Account.get(to_account_id)
    
        # Check sufficient balance
        if from_account.balance < amount:
            raise ValueError("Insufficient balance")
    
        # Prepare transaction
        with Transaction() as transaction:
            # Update from account
            transaction.update(
                from_account,
                actions=[
                    Account.balance.add(-amount),
                    Account.version.add(1)
                ],
                condition=Account.version == from_account.version
            )
    
            # Update to account
            transaction.update(
                to_account,
                actions=[
                    Account.balance.add(amount),
                    Account.version.add(1)
                ],
                condition=Account.version == to_account.version
            )
    
    # Usage
    try:
        transfer_money('account1', 'account2', 100)
        print("Transfer successful")
    except Exception as e:
        print(f"Transfer failed: {e}")
    Python

    Conditional Operations

    # Optimistic locking with version field
    class Document(Model):
        class Meta:
            table_name = 'documents'
            region = 'us-west-2'
    
        doc_id = UnicodeAttribute(hash_key=True)
        content = UnicodeAttribute()
        version = NumberAttribute(default=1)
        last_modified = UTCDateTimeAttribute(default=datetime.now)
    
    def update_document(doc_id, new_content):
        doc = Document.get(doc_id)
        current_version = doc.version
    
        try:
            doc.update(
                actions=[
                    Document.content.set(new_content),
                    Document.version.add(1),
                    Document.last_modified.set(datetime.now())
                ],
                condition=Document.version == current_version
            )
            return True
        except Document.UpdateError:
            return False  # Document was modified by another process
    
    # Usage
    if update_document('doc123', 'Updated content'):
        print("Document updated successfully")
    else:
        print("Document was modified by another process, please retry")
    Python

    Batch Operations

    # Batch write operations
    def bulk_create_users(user_data_list):
        with User.batch_write() as batch:
            for user_data in user_data_list:
                user = User(**user_data)
                batch.save(user)
    
    # Batch read operations
    def get_users_batch(user_ids):
        users = {}
        for user in User.batch_get(user_ids):
            users[user.user_id] = user
        return users
    
    # Usage
    user_data = [
        {'user_id': 'user1', 'name': 'Alice', 'email': 'alice@example.com'},
        {'user_id': 'user2', 'name': 'Bob', 'email': 'bob@example.com'},
        {'user_id': 'user3', 'name': 'Charlie', 'email': 'charlie@example.com'},
    ]
    
    bulk_create_users(user_data)
    users = get_users_batch(['user1', 'user2', 'user3'])
    Python

    Streams and Change Data Capture

    # Enable streams on table
    class AuditableUser(Model):
        class Meta:
            table_name = 'auditable_users'
            region = 'us-west-2'
            stream_view_type = 'NEW_AND_OLD_IMAGES'  # Enable streams
    
        user_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        email = UnicodeAttribute()
        updated_at = UTCDateTimeAttribute(default=datetime.now)
    
    # Stream processing function (typically used with AWS Lambda)
    def process_stream_record(record):
        event_name = record['eventName']
    
        if event_name == 'INSERT':
            new_image = record['dynamodb']['NewImage']
            print(f"New user created: {new_image}")
    
        elif event_name == 'MODIFY':
            old_image = record['dynamodb']['OldImage']
            new_image = record['dynamodb']['NewImage']
            print(f"User updated: {old_image} -> {new_image}")
    
        elif event_name == 'REMOVE':
            old_image = record['dynamodb']['OldImage']
            print(f"User deleted: {old_image}")
    Python

    9. Best Practices

    Design Patterns

    graph TB
        A[DynamoDB Design Patterns] --> B[Single Table Design]
        A --> C[Access Pattern First]
        A --> D[Denormalization]
        A --> E[Hot Partitions Avoidance]
    
        B --> F[Multiple Entity Types]
        B --> G[Overloaded Indexes]
    
        C --> H[Define Queries First]
        C --> I[Design Tables Second]
    
        D --> J[Duplicate Data]
        D --> K[Optimize for Reads]
    
        E --> L[Distribute Load]
        E --> M[Use Random Prefixes]

    Model Design Best Practices

    # 1. Use meaningful naming conventions
    class UserProfile(Model):
        class Meta:
            table_name = 'user_profiles'
            region = 'us-west-2'
    
        # Use descriptive attribute names
        user_id = UnicodeAttribute(hash_key=True)
        full_name = UnicodeAttribute()
        email_address = UnicodeAttribute()
        date_of_birth = UTCDateTimeAttribute()
        account_status = UnicodeAttribute()  # active, inactive, suspended
    
        # Add metadata for tracking
        created_at = UTCDateTimeAttribute(default=datetime.now)
        updated_at = UTCDateTimeAttribute(default=datetime.now)
    
        def save(self, **kwargs):
            self.updated_at = datetime.now()
            super().save(**kwargs)
    
    # 2. Use enums for limited value sets
    from enum import Enum
    
    class AccountStatus(Enum):
        ACTIVE = "active"
        INACTIVE = "inactive"
        SUSPENDED = "suspended"
    
    class UserProfile(Model):
        # ... other attributes ...
        account_status = UnicodeAttribute()
    
        def set_status(self, status: AccountStatus):
            self.account_status = status.value
    
    # 3. Implement data validation
    from pynamodb.exceptions import DoesNotExist
    import re
    
    class UserProfile(Model):
        # ... attributes ...
    
        def clean(self):
            # Email validation
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            if not re.match(email_pattern, self.email_address):
                raise ValueError("Invalid email address format")
    
            # Age validation
            if self.date_of_birth and self.date_of_birth > datetime.now():
                raise ValueError("Date of birth cannot be in the future")
    
        def save(self, **kwargs):
            self.clean()
            super().save(**kwargs)
    Python

    Error Handling

    from pynamodb.exceptions import (
        DoesNotExist, UpdateError, DeleteError, 
        TransactWriteError, PutError
    )
    import logging
    
    logger = logging.getLogger(__name__)
    
    class UserService:
        @staticmethod
        def create_user(user_data):
            try:
                user = User(**user_data)
                user.save()
                logger.info(f"User created successfully: {user.user_id}")
                return user
            except PutError as e:
                logger.error(f"Failed to create user: {e}")
                raise
            except Exception as e:
                logger.error(f"Unexpected error creating user: {e}")
                raise
    
        @staticmethod
        def get_user(user_id):
            try:
                return User.get(user_id)
            except DoesNotExist:
                logger.warning(f"User not found: {user_id}")
                return None
            except Exception as e:
                logger.error(f"Error retrieving user {user_id}: {e}")
                raise
    
        @staticmethod
        def update_user(user_id, updates):
            try:
                user = User.get(user_id)
                for field, value in updates.items():
                    setattr(user, field, value)
                user.save()
                logger.info(f"User updated successfully: {user_id}")
                return user
            except DoesNotExist:
                logger.warning(f"Cannot update non-existent user: {user_id}")
                return None
            except UpdateError as e:
                logger.error(f"Failed to update user {user_id}: {e}")
                raise
    Python

    Connection Management

    # Connection pooling and configuration
    from pynamodb.connection import Connection
    from pynamodb.models import Model
    
    class BaseModel(Model):
        class Meta:
            abstract = True
            region = 'us-west-2'
            # Configure connection settings
            max_retry_attempts = 3
            base_backoff_ms = 25
            max_backoff_ms = 1000
    
        @classmethod
        def get_connection(cls):
            # Custom connection configuration
            if not hasattr(cls, '_connection'):
                cls._connection = Connection(
                    region=cls.Meta.region,
                    max_retry_attempts=cls.Meta.max_retry_attempts,
                    base_backoff_ms=cls.Meta.base_backoff_ms,
                    max_backoff_ms=cls.Meta.max_backoff_ms
                )
            return cls._connection
    
    class User(BaseModel):
        class Meta:
            table_name = 'users'
    
        user_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
    Python

    10. Real-World Examples

    E-commerce Application

    erDiagram
        USER ||--o{ ORDER : places
        ORDER ||--o{ ORDER_ITEM : contains
        PRODUCT ||--o{ ORDER_ITEM : "ordered as"
        USER ||--o{ CART_ITEM : has
        PRODUCT ||--o{ CART_ITEM : "added to"
    
        USER {
            string user_id PK
            string email
            string name
            datetime created_at
        }
    
        ORDER {
            string user_id FK
            string order_id PK
            string status
            decimal total_amount
            datetime created_at
        }
    
        ORDER_ITEM {
            string order_id FK
            string product_id FK
            int quantity
            decimal price
        }
    
        PRODUCT {
            string product_id PK
            string name
            string category
            decimal price
            int stock_quantity
        }
    
    # E-commerce models
    class User(Model):
        class Meta:
            table_name = 'ecommerce_users'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        email = UnicodeAttribute()
        name = UnicodeAttribute()
        address = JSONAttribute(null=True)
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
        # GSI for email lookup
        email_index = EmailIndex()
    
    class Product(Model):
        class Meta:
            table_name = 'products'
            region = 'us-west-2'
    
        product_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        description = UnicodeAttribute()
        category = UnicodeAttribute()
        price = NumberAttribute()
        stock_quantity = NumberAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
        # GSI for category browsing
        category_index = CategoryIndex()
    
    class Order(Model):
        class Meta:
            table_name = 'orders'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        order_id = UnicodeAttribute(range_key=True)
        status = UnicodeAttribute()  # pending, confirmed, shipped, delivered
        total_amount = NumberAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
        updated_at = UTCDateTimeAttribute(default=datetime.now)
    
        # LSI for order date queries
        date_index = OrderDateIndex()
    
    class OrderItem(Model):
        class Meta:
            table_name = 'order_items'
            region = 'us-west-2'
    
        order_id = UnicodeAttribute(hash_key=True)
        product_id = UnicodeAttribute(range_key=True)
        quantity = NumberAttribute()
        price = NumberAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
    # Service layer for business logic
    class OrderService:
        @staticmethod
        def create_order(user_id, items_data):
            """Create a new order with items"""
            import uuid
            from pynamodb.transactions import Transaction
    
            order_id = str(uuid.uuid4())
            total_amount = 0
    
            # Calculate total and validate stock
            validated_items = []
            for item_data in items_data:
                product = Product.get(item_data['product_id'])
                if product.stock_quantity < item_data['quantity']:
                    raise ValueError(f"Insufficient stock for {product.name}")
    
                item_total = product.price * item_data['quantity']
                total_amount += item_total
    
                validated_items.append({
                    'product': product,
                    'quantity': item_data['quantity'],
                    'price': product.price
                })
    
            # Create order and items in transaction
            with Transaction() as transaction:
                # Create order
                order = Order(
                    user_id=user_id,
                    order_id=order_id,
                    status='pending',
                    total_amount=total_amount
                )
                transaction.save(order)
    
                # Create order items and update stock
                for item in validated_items:
                    order_item = OrderItem(
                        order_id=order_id,
                        product_id=item['product'].product_id,
                        quantity=item['quantity'],
                        price=item['price']
                    )
                    transaction.save(order_item)
    
                    # Update product stock
                    transaction.update(
                        item['product'],
                        actions=[Product.stock_quantity.add(-item['quantity'])]
                    )
    
            return order
    
        @staticmethod
        def get_user_orders(user_id, limit=10):
            """Get user's recent orders"""
            orders = Order.query(
                user_id,
                scan_index_forward=False,  # Most recent first
                limit=limit
            )
            return list(orders)
    
        @staticmethod
        def update_order_status(user_id, order_id, new_status):
            """Update order status"""
            try:
                order = Order.get(user_id, order_id)
                order.status = new_status
                order.updated_at = datetime.now()
                order.save()
                return order
            except Order.DoesNotExist:
                raise ValueError("Order not found")
    Python

    Social Media Application

    # Social media models
    class User(Model):
        class Meta:
            table_name = 'social_users'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        username = UnicodeAttribute()
        email = UnicodeAttribute()
        bio = UnicodeAttribute(null=True)
        follower_count = NumberAttribute(default=0)
        following_count = NumberAttribute(default=0)
        post_count = NumberAttribute(default=0)
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
    class Post(Model):
        class Meta:
            table_name = 'posts'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        post_id = UnicodeAttribute(range_key=True)
        content = UnicodeAttribute()
        image_urls = ListAttribute(null=True)
        like_count = NumberAttribute(default=0)
        comment_count = NumberAttribute(default=0)
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
        # GSI for timeline queries
        timeline_index = TimelineIndex()
    
    class Follow(Model):
        class Meta:
            table_name = 'follows'
            region = 'us-west-2'
    
        follower_id = UnicodeAttribute(hash_key=True)
        following_id = UnicodeAttribute(range_key=True)
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
    class Like(Model):
        class Meta:
            table_name = 'likes'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        post_id = UnicodeAttribute(range_key=True)
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
    # Social media service
    class SocialMediaService:
        @staticmethod
        def create_post(user_id, content, image_urls=None):
            """Create a new post"""
            import uuid
    
            post_id = str(uuid.uuid4())
            post = Post(
                user_id=user_id,
                post_id=post_id,
                content=content,
                image_urls=image_urls or []
            )
            post.save()
    
            # Update user's post count
            user = User.get(user_id)
            user.update(actions=[User.post_count.add(1)])
    
            return post
    
        @staticmethod
        def follow_user(follower_id, following_id):
            """Follow a user"""
            if follower_id == following_id:
                raise ValueError("Cannot follow yourself")
    
            try:
                # Check if already following
                Follow.get(follower_id, following_id)
                raise ValueError("Already following this user")
            except Follow.DoesNotExist:
                pass
    
            # Create follow relationship
            follow = Follow(
                follower_id=follower_id,
                following_id=following_id
            )
            follow.save()
    
            # Update counts
            follower = User.get(follower_id)
            following = User.get(following_id)
    
            follower.update(actions=[User.following_count.add(1)])
            following.update(actions=[User.follower_count.add(1)])
    
            return follow
    
        @staticmethod
        def get_user_timeline(user_id, limit=20):
            """Get user's timeline (their posts)"""
            posts = Post.query(
                user_id,
                scan_index_forward=False,  # Most recent first
                limit=limit
            )
            return list(posts)
    
        @staticmethod
        def like_post(user_id, post_user_id, post_id):
            """Like a post"""
            try:
                # Check if already liked
                Like.get(user_id, post_id)
                raise ValueError("Already liked this post")
            except Like.DoesNotExist:
                pass
    
            # Create like
            like = Like(user_id=user_id, post_id=post_id)
            like.save()
    
            # Update post like count
            post = Post.get(post_user_id, post_id)
            post.update(actions=[Post.like_count.add(1)])
    
            return like
    Python

    11. Performance Optimization

    Query Optimization

    graph TB
        A[Query Optimization] --> B[Use Primary Keys]
        A --> C[Leverage Indexes]
        A --> D[Batch Operations]
        A --> E[Pagination]
        A --> F[Projection]
    
        B --> G[Direct Access]
        B --> H[Consistent Performance]
    
        C --> I[GSI for Different Access Patterns]
        C --> J[LSI for Sort Variations]
    
        D --> K[Reduce API Calls]
        D --> L[Better Throughput]
    
        E --> M[Limit Result Sets]
        E --> N[Memory Efficiency]
    
        F --> O[Reduce Data Transfer]
        F --> P[Faster Queries]
    # Optimization techniques
    class OptimizedUserService:
    
        @staticmethod
        def get_users_by_ids(user_ids):
            """Efficient batch retrieval"""
            # Use batch_get instead of individual gets
            users = {}
    
            # Process in chunks of 100 (DynamoDB limit)
            chunk_size = 100
            for i in range(0, len(user_ids), chunk_size):
                chunk = user_ids[i:i + chunk_size]
                for user in User.batch_get(chunk):
                    users[user.user_id] = user
    
            return users
    
        @staticmethod
        def get_active_users_paginated(page_size=50, last_key=None):
            """Efficient pagination with filtering"""
            query_kwargs = {
                'filter_condition': User.is_active == True,
                'limit': page_size
            }
    
            if last_key:
                query_kwargs['last_evaluated_key'] = last_key
    
            results = User.scan(**query_kwargs)
            items = list(results)
    
            return {
                'users': items,
                'last_evaluated_key': results.last_evaluated_key,
                'has_more': len(items) == page_size
            }
    
        @staticmethod
        def get_user_summary(user_id):
            """Use projection to get only needed attributes"""
            try:
                # Only fetch specific attributes
                user = User.get(
                    user_id,
                    attributes_to_get=['user_id', 'name', 'email', 'is_active']
                )
                return {
                    'user_id': user.user_id,
                    'name': user.name,
                    'email': user.email,
                    'is_active': user.is_active
                }
            except User.DoesNotExist:
                return None
    
    # Connection optimization
    class OptimizedConnection:
        _connections = {}
    
        @classmethod
        def get_connection(cls, region='us-west-2'):
            """Reuse connections per region"""
            if region not in cls._connections:
                cls._connections[region] = Connection(
                    region=region,
                    # Optimize connection settings
                    max_retry_attempts=5,
                    base_backoff_ms=100,
                    max_backoff_ms=5000,
                    connect_timeout_seconds=10,
                    read_timeout_seconds=30
                )
            return cls._connections[region]
    
    # Caching layer
    import time
    from typing import Optional, Dict, Any
    
    class CacheService:
        """Simple in-memory cache with TTL"""
    
        def __init__(self):
            self._cache: Dict[str, Dict[str, Any]] = {}
    
        def get(self, key: str) -> Optional[Any]:
            if key in self._cache:
                entry = self._cache[key]
                if time.time() < entry['expires_at']:
                    return entry['value']
                else:
                    del self._cache[key]
            return None
    
        def set(self, key: str, value: Any, ttl: int = 300):
            """Set cache entry with TTL in seconds"""
            self._cache[key] = {
                'value': value,
                'expires_at': time.time() + ttl
            }
    
        def delete(self, key: str):
            if key in self._cache:
                del self._cache[key]
    
    # Cached user service
    class CachedUserService:
        def __init__(self):
            self.cache = CacheService()
    
        def get_user(self, user_id: str):
            """Get user with caching"""
            cache_key = f"user:{user_id}"
    
            # Try cache first
            cached_user = self.cache.get(cache_key)
            if cached_user:
                return cached_user
    
            # Fetch from database
            try:
                user = User.get(user_id)
                user_data = {
                    'user_id': user.user_id,
                    'name': user.name,
                    'email': user.email,
                    'is_active': user.is_active
                }
    
                # Cache for 5 minutes
                self.cache.set(cache_key, user_data, ttl=300)
                return user_data
            except User.DoesNotExist:
                return None
    
        def update_user(self, user_id: str, updates: Dict[str, Any]):
            """Update user and invalidate cache"""
            try:
                user = User.get(user_id)
                for field, value in updates.items():
                    setattr(user, field, value)
                user.save()
    
                # Invalidate cache
                cache_key = f"user:{user_id}"
                self.cache.delete(cache_key)
    
                return True
            except User.DoesNotExist:
                return False
    Python

    Capacity Planning

    # Capacity monitoring and auto-scaling
    class CapacityMonitor:
        def __init__(self, table_name):
            self.table_name = table_name
            self.cloudwatch = boto3.client('cloudwatch')
    
        def get_consumption_metrics(self, hours=1):
            """Get read/write capacity consumption"""
            end_time = datetime.now()
            start_time = end_time - timedelta(hours=hours)
    
            metrics = {}
    
            for metric_name in ['ConsumedReadCapacityUnits', 'ConsumedWriteCapacityUnits']:
                response = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/DynamoDB',
                    MetricName=metric_name,
                    Dimensions=[
                        {'Name': 'TableName', 'Value': self.table_name}
                    ],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=300,  # 5 minutes
                    Statistics=['Sum', 'Average', 'Maximum']
                )
                metrics[metric_name] = response['Datapoints']
    
            return metrics
    
        def recommend_capacity(self, metrics):
            """Recommend capacity based on usage patterns"""
            # Simple recommendation logic
            recommendations = {}
    
            for metric_name, datapoints in metrics.items():
                if not datapoints:
                    continue
    
                max_consumption = max(point['Maximum'] for point in datapoints)
                avg_consumption = sum(point['Average'] for point in datapoints) / len(datapoints)
    
                # Recommend 20% above peak usage
                recommended = int(max_consumption * 1.2)
    
                capacity_type = 'read' if 'Read' in metric_name else 'write'
                recommendations[capacity_type] = {
                    'current_peak': max_consumption,
                    'average': avg_consumption,
                    'recommended': recommended
                }
    
            return recommendations
    Python

    12. Testing

    Unit Testing Setup

    import unittest
    from unittest.mock import patch, MagicMock
    from moto import mock_dynamodb
    import boto3
    from datetime import datetime
    
    # Test models
    class TestUser(Model):
        class Meta:
            table_name = 'test_users'
            region = 'us-east-1'
            host = 'http://localhost:8000'  # Local DynamoDB
    
        user_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        email = UnicodeAttribute()
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
    class TestUserModel(unittest.TestCase):
    
        @mock_dynamodb
        def setUp(self):
            """Set up test environment"""
            # Create table for testing
            TestUser.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)
    
        @mock_dynamodb
        def test_create_user(self):
            """Test user creation"""
            user = TestUser(
                user_id='test123',
                name='Test User',
                email='test@example.com'
            )
            user.save()
    
            # Verify user was created
            retrieved_user = TestUser.get('test123')
            self.assertEqual(retrieved_user.name, 'Test User')
            self.assertEqual(retrieved_user.email, 'test@example.com')
    
        @mock_dynamodb
        def test_user_not_found(self):
            """Test handling of non-existent user"""
            with self.assertRaises(TestUser.DoesNotExist):
                TestUser.get('nonexistent')
    
        @mock_dynamodb
        def test_update_user(self):
            """Test user update"""
            # Create user
            user = TestUser(
                user_id='test123',
                name='Original Name',
                email='original@example.com'
            )
            user.save()
    
            # Update user
            user.name = 'Updated Name'
            user.save()
    
            # Verify update
            updated_user = TestUser.get('test123')
            self.assertEqual(updated_user.name, 'Updated Name')
    
        @mock_dynamodb
        def test_delete_user(self):
            """Test user deletion"""
            # Create user
            user = TestUser(
                user_id='test123',
                name='Test User',
                email='test@example.com'
            )
            user.save()
    
            # Delete user
            user.delete()
    
            # Verify deletion
            with self.assertRaises(TestUser.DoesNotExist):
                TestUser.get('test123')
    
    # Integration testing
    class TestUserService(unittest.TestCase):
    
        @mock_dynamodb
        def setUp(self):
            TestUser.create_table(read_capacity_units=1, write_capacity_units=1, wait=True)
            self.user_service = UserService()
    
        @mock_dynamodb
        def test_create_user_service(self):
            """Test user creation through service"""
            user_data = {
                'user_id': 'service_test',
                'name': 'Service User',
                'email': 'service@example.com'
            }
    
            user = self.user_service.create_user(user_data)
            self.assertIsNotNone(user)
            self.assertEqual(user.user_id, 'service_test')
    
        @mock_dynamodb
        def test_get_user_service(self):
            """Test user retrieval through service"""
            # Create user first
            TestUser(
                user_id='service_test',
                name='Service User',
                email='service@example.com'
            ).save()
    
            user = self.user_service.get_user('service_test')
            self.assertIsNotNone(user)
            self.assertEqual(user.name, 'Service User')
    
        @mock_dynamodb
        def test_user_not_found_service(self):
            """Test handling of non-existent user in service"""
            user = self.user_service.get_user('nonexistent')
            self.assertIsNone(user)
    
    # Performance testing
    import time
    
    class TestPerformance(unittest.TestCase):
    
        @mock_dynamodb
        def setUp(self):
            TestUser.create_table(read_capacity_units=5, write_capacity_units=5, wait=True)
    
        @mock_dynamodb
        def test_batch_operations_performance(self):
            """Test batch operations performance"""
            start_time = time.time()
    
            # Create 100 users using batch write
            users_data = [
                {
                    'user_id': f'user_{i}',
                    'name': f'User {i}',
                    'email': f'user{i}@example.com'
                }
                for i in range(100)
            ]
    
            with TestUser.batch_write() as batch:
                for user_data in users_data:
                    user = TestUser(**user_data)
                    batch.save(user)
    
            batch_time = time.time() - start_time
    
            # Verify all users were created
            user_ids = [f'user_{i}' for i in range(100)]
            retrieved_users = list(TestUser.batch_get(user_ids))
    
            self.assertEqual(len(retrieved_users), 100)
            print(f"Batch operation completed in {batch_time:.2f} seconds")
    
    # Test utilities
    class TestUtilities:
    
        @staticmethod
        def create_test_data():
            """Create test data for development"""
            users_data = [
                {'user_id': 'alice', 'name': 'Alice Johnson', 'email': 'alice@example.com'},
                {'user_id': 'bob', 'name': 'Bob Smith', 'email': 'bob@example.com'},
                {'user_id': 'charlie', 'name': 'Charlie Brown', 'email': 'charlie@example.com'},
            ]
    
            for user_data in users_data:
                user = TestUser(**user_data)
                user.save()
    
            print(f"Created {len(users_data)} test users")
    
        @staticmethod
        def cleanup_test_data():
            """Clean up test data"""
            for user in TestUser.scan():
                user.delete()
    
            print("Cleaned up all test data")
    
    if __name__ == '__main__':
        unittest.main()
    Python

    Testing with Docker

    # Dockerfile for local DynamoDB testing
    FROM amazon/dynamodb-local:latest
    
    EXPOSE 8000
    
    CMD ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]
    Dockerfile
    # docker-compose.yml for testing environment
    version: '3.8'
    services:
      dynamodb-local:
        image: amazon/dynamodb-local:latest
        container_name: dynamodb-local
        ports:
          - "8000:8000"
        command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]
    
      app:
        build: .
        depends_on:
          - dynamodb-local
        environment:
          - DYNAMODB_ENDPOINT=http://dynamodb-local:8000
          - AWS_ACCESS_KEY_ID=fakeMyKeyId
          - AWS_SECRET_ACCESS_KEY=fakeSecretAccessKey
          - AWS_DEFAULT_REGION=us-east-1
    YAML

    13. Deployment Considerations

    Environment Configuration

    import os
    from enum import Enum
    
    class Environment(Enum):
        DEVELOPMENT = "development"
        STAGING = "staging"
        PRODUCTION = "production"
    
    class Config:
        def __init__(self):
            self.environment = Environment(os.getenv('ENVIRONMENT', 'development'))
            self.aws_region = os.getenv('AWS_DEFAULT_REGION', 'us-west-2')
            self.dynamodb_endpoint = os.getenv('DYNAMODB_ENDPOINT')
    
        @property
        def table_prefix(self):
            """Get table prefix based on environment"""
            if self.environment == Environment.DEVELOPMENT:
                return "dev_"
            elif self.environment == Environment.STAGING:
                return "staging_"
            else:
                return ""
    
        @property
        def is_local(self):
            """Check if using local DynamoDB"""
            return self.dynamodb_endpoint is not None
    
    config = Config()
    
    # Base model with environment-aware configuration
    class BaseModel(Model):
        class Meta:
            abstract = True
            region = config.aws_region
    
            if config.is_local:
                host = config.dynamodb_endpoint
    
        @classmethod
        def get_table_name(cls):
            """Get table name with environment prefix"""
            base_name = cls.Meta.table_name
            return f"{config.table_prefix}{base_name}"
    
    # Environment-specific models
    class User(BaseModel):
        class Meta:
            table_name = 'users'
    
        user_id = UnicodeAttribute(hash_key=True)
        name = UnicodeAttribute()
        email = UnicodeAttribute()
    Python

    Infrastructure as Code

    # CloudFormation template for DynamoDB tables
    AWSTemplateFormatVersion: '2010-09-09'
    Description: 'DynamoDB tables for PynamoDB application'
    
    Parameters:
      Environment:
        Type: String
        Default: development
        AllowedValues: [development, staging, production]
    
      TablePrefix:
        Type: String
        Default: ''
    
    Resources:
      UsersTable:
        Type: AWS::DynamoDB::Table
        Properties:
          TableName: !Sub '${TablePrefix}users'
          BillingMode: PAY_PER_REQUEST
          AttributeDefinitions:
            - AttributeName: user_id
              AttributeType: S
            - AttributeName: email
              AttributeType: S
          KeySchema:
            - AttributeName: user_id
              KeyType: HASH
          GlobalSecondaryIndexes:
            - IndexName: email-index
              KeySchema:
                - AttributeName: email
                  KeyType: HASH
              Projection:
                ProjectionType: ALL
          StreamSpecification:
            StreamViewType: NEW_AND_OLD_IMAGES
          PointInTimeRecoverySpecification:
            PointInTimeRecoveryEnabled: true
          Tags:
            - Key: Environment
              Value: !Ref Environment
            - Key: Application
              Value: PynamoDBApp
    
      OrdersTable:
        Type: AWS::DynamoDB::Table
        Properties:
          TableName: !Sub '${TablePrefix}orders'
          BillingMode: PAY_PER_REQUEST
          AttributeDefinitions:
            - AttributeName: user_id
              AttributeType: S
            - AttributeName: order_id
              AttributeType: S
            - AttributeName: created_at
              AttributeType: S
          KeySchema:
            - AttributeName: user_id
              KeyType: HASH
            - AttributeName: order_id
              KeyType: RANGE
          LocalSecondaryIndexes:
            - IndexName: user-created-index
              KeySchema:
                - AttributeName: user_id
                  KeyType: HASH
                - AttributeName: created_at
                  KeyType: RANGE
              Projection:
                ProjectionType: ALL
    
    Outputs:
      UsersTableName:
        Description: 'Name of the Users table'
        Value: !Ref UsersTable
        Export:
          Name: !Sub '${AWS::StackName}-UsersTable'
    
      OrdersTableName:
        Description: 'Name of the Orders table'
        Value: !Ref OrdersTable
        Export:
          Name: !Sub '${AWS::StackName}-OrdersTable'
    YAML

    Monitoring and Logging

    import logging
    import boto3
    from datetime import datetime, timedelta
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    class DynamoDBMonitor:
        def __init__(self, table_names):
            self.table_names = table_names
            self.cloudwatch = boto3.client('cloudwatch')
            self.dynamodb = boto3.client('dynamodb')
    
        def check_table_health(self):
            """Check health of all tables"""
            health_status = {}
    
            for table_name in self.table_names:
                try:
                    response = self.dynamodb.describe_table(TableName=table_name)
                    table_status = response['Table']['TableStatus']
    
                    health_status[table_name] = {
                        'status': table_status,
                        'healthy': table_status == 'ACTIVE'
                    }
    
                    if table_status != 'ACTIVE':
                        logger.warning(f"Table {table_name} is not active: {table_status}")
    
                except Exception as e:
                    logger.error(f"Error checking table {table_name}: {e}")
                    health_status[table_name] = {
                        'status': 'ERROR',
                        'healthy': False,
                        'error': str(e)
                    }
    
            return health_status
    
        def get_throttling_metrics(self, hours=1):
            """Check for throttling events"""
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(hours=hours)
    
            throttling_data = {}
    
            for table_name in self.table_names:
                for operation in ['Query', 'Scan', 'GetItem', 'PutItem', 'UpdateItem', 'DeleteItem']:
                    try:
                        response = self.cloudwatch.get_metric_statistics(
                            Namespace='AWS/DynamoDB',
                            MetricName='ThrottledRequests',
                            Dimensions=[
                                {'Name': 'TableName', 'Value': table_name},
                                {'Name': 'Operation', 'Value': operation}
                            ],
                            StartTime=start_time,
                            EndTime=end_time,
                            Period=300,
                            Statistics=['Sum']
                        )
    
                        datapoints = response.get('Datapoints', [])
                        if datapoints:
                            total_throttles = sum(point['Sum'] for point in datapoints)
                            if total_throttles > 0:
                                throttling_data[f"{table_name}_{operation}"] = total_throttles
                                logger.warning(f"Throttling detected: {table_name} {operation} - {total_throttles} events")
    
                    except Exception as e:
                        logger.error(f"Error getting throttling metrics for {table_name}: {e}")
    
            return throttling_data
    
    # Application metrics
    class ApplicationMetrics:
        def __init__(self):
            self.cloudwatch = boto3.client('cloudwatch')
    
        def record_operation_latency(self, operation_name, latency_ms):
            """Record operation latency"""
            try:
                self.cloudwatch.put_metric_data(
                    Namespace='PynamoDB/Application',
                    MetricData=[
                        {
                            'MetricName': 'OperationLatency',
                            'Dimensions': [
                                {
                                    'Name': 'Operation',
                                    'Value': operation_name
                                }
                            ],
                            'Value': latency_ms,
                            'Unit': 'Milliseconds',
                            'Timestamp': datetime.utcnow()
                        }
                    ]
                )
            except Exception as e:
                logger.error(f"Failed to record metric: {e}")
    
        def record_error(self, operation_name, error_type):
            """Record application errors"""
            try:
                self.cloudwatch.put_metric_data(
                    Namespace='PynamoDB/Application',
                    MetricData=[
                        {
                            'MetricName': 'Errors',
                            'Dimensions': [
                                {
                                    'Name': 'Operation',
                                    'Value': operation_name
                                },
                                {
                                    'Name': 'ErrorType',
                                    'Value': error_type
                                }
                            ],
                            'Value': 1,
                            'Unit': 'Count',
                            'Timestamp': datetime.utcnow()
                        }
                    ]
                )
            except Exception as e:
                logger.error(f"Failed to record error metric: {e}")
    
    # Decorator for monitoring
    import functools
    import time
    
    def monitor_performance(operation_name):
        """Decorator to monitor operation performance"""
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                metrics = ApplicationMetrics()
                start_time = time.time()
    
                try:
                    result = func(*args, **kwargs)
    
                    # Record successful operation
                    latency = (time.time() - start_time) * 1000
                    metrics.record_operation_latency(operation_name, latency)
    
                    return result
    
                except Exception as e:
                    # Record error
                    error_type = type(e).__name__
                    metrics.record_error(operation_name, error_type)
                    raise
    
            return wrapper
        return decorator
    
    # Usage example
    class MonitoredUserService:
        @monitor_performance('create_user')
        def create_user(self, user_data):
            user = User(**user_data)
            user.save()
            return user
    
        @monitor_performance('get_user')
        def get_user(self, user_id):
            try:
                return User.get(user_id)
            except User.DoesNotExist:
                return None
    Python

    Security Best Practices

    # IAM Policy Templates
    def generate_application_policy(table_arns, environment):
        """Generate IAM policy for application access"""
    
        policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "dynamodb:GetItem",
                        "dynamodb:PutItem",
                        "dynamodb:UpdateItem",
                        "dynamodb:DeleteItem",
                        "dynamodb:Query",
                        "dynamodb:Scan",
                        "dynamodb:BatchGetItem",
                        "dynamodb:BatchWriteItem",
                        "dynamodb:TransactGetItems",
                        "dynamodb:TransactWriteItems"
                    ],
                    "Resource": table_arns
                },
                {
                    "Effect": "Allow",
                    "Action": [
                        "dynamodb:Query",
                        "dynamodb:Scan"
                    ],
                    "Resource": [f"{arn}/index/*" for arn in table_arns]
                }
            ]
        }
    
        # Add additional restrictions for production
        if environment == 'production':
            # Deny destructive operations on main table
            policy["Statement"].append({
                "Effect": "Deny",
                "Action": [
                    "dynamodb:DeleteTable",
                    "dynamodb:UpdateTable"
                ],
                "Resource": table_arns
            })
    
            # Add condition for source IP if needed
            # policy["Statement"][0]["Condition"] = {
            #     "IpAddress": {
            #         "aws:SourceIp": ["10.0.0.0/8"]
            #     }
            # }
    
        return policy
    
    # Data encryption utilities
    class EncryptionHelper:
        def __init__(self, kms_key_id=None):
            self.kms_key_id = kms_key_id
            self.kms_client = boto3.client('kms') if kms_key_id else None
    
        def encrypt_sensitive_data(self, data):
            """Encrypt sensitive data before storing in DynamoDB"""
            if not self.kms_client:
                raise ValueError("KMS key ID not configured")
    
            response = self.kms_client.encrypt(
                KeyId=self.kms_key_id,
                Plaintext=json.dumps(data).encode('utf-8')
            )
    
            import base64
            return base64.b64encode(response['CiphertextBlob']).decode('utf-8')
    
        def decrypt_sensitive_data(self, encrypted_data):
            """Decrypt sensitive data retrieved from DynamoDB"""
            if not self.kms_client:
                raise ValueError("KMS key ID not configured")
    
            import base64
            ciphertext_blob = base64.b64decode(encrypted_data.encode('utf-8'))
    
            response = self.kms_client.decrypt(CiphertextBlob=ciphertext_blob)
            return json.loads(response['Plaintext'].decode('utf-8'))
    
    # Secure model with encryption
    class SecureUserProfile(Model):
        class Meta:
            table_name = 'secure_user_profiles'
            region = 'us-west-2'
    
        user_id = UnicodeAttribute(hash_key=True)
        username = UnicodeAttribute()
        encrypted_email = UnicodeAttribute()  # Encrypted
        encrypted_personal_data = UnicodeAttribute()  # Encrypted PII
        created_at = UTCDateTimeAttribute(default=datetime.now)
    
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.encryption_helper = EncryptionHelper(
                kms_key_id=os.getenv('KMS_KEY_ID')
            )
    
        def set_email(self, email):
            """Set encrypted email"""
            self.encrypted_email = self.encryption_helper.encrypt_sensitive_data(email)
    
        def get_email(self):
            """Get decrypted email"""
            if self.encrypted_email:
                return self.encryption_helper.decrypt_sensitive_data(self.encrypted_email)
            return None
    
        def set_personal_data(self, data):
            """Set encrypted personal data"""
            self.encrypted_personal_data = self.encryption_helper.encrypt_sensitive_data(data)
    
        def get_personal_data(self):
            """Get decrypted personal data"""
            if self.encrypted_personal_data:
                return self.encryption_helper.decrypt_sensitive_data(self.encrypted_personal_data)
            return None
    Python

    Backup and Disaster Recovery

    class BackupManager:
        def __init__(self, table_names):
            self.table_names = table_names
            self.dynamodb = boto3.client('dynamodb')
            self.backup_client = boto3.client('dynamodb')
    
        def create_backup(self, table_name, backup_name=None):
            """Create on-demand backup"""
            if not backup_name:
                backup_name = f"{table_name}-backup-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    
            try:
                response = self.backup_client.create_backup(
                    TableName=table_name,
                    BackupName=backup_name
                )
    
                backup_arn = response['BackupDetails']['BackupArn']
                print(f"Backup created: {backup_name} ({backup_arn})")
                return backup_arn
    
            except Exception as e:
                print(f"Failed to create backup for {table_name}: {e}")
                return None
    
        def list_backups(self, table_name=None):
            """List available backups"""
            kwargs = {}
            if table_name:
                kwargs['TableName'] = table_name
    
            try:
                response = self.backup_client.list_backups(**kwargs)
                return response['BackupSummaries']
            except Exception as e:
                print(f"Failed to list backups: {e}")
                return []
    
        def restore_from_backup(self, backup_arn, target_table_name):
            """Restore table from backup"""
            try:
                response = self.backup_client.restore_table_from_backup(
                    TargetTableName=target_table_name,
                    BackupArn=backup_arn
                )
    
                print(f"Restore initiated: {target_table_name}")
                return response['TableDescription']['TableArn']
    
            except Exception as e:
                print(f"Failed to restore from backup: {e}")
                return None
    
        def setup_point_in_time_recovery(self):
            """Enable point-in-time recovery for all tables"""
            for table_name in self.table_names:
                try:
                    self.dynamodb.update_continuous_backups(
                        TableName=table_name,
                        PointInTimeRecoverySpecification={
                            'PointInTimeRecoveryEnabled': True
                        }
                    )
                    print(f"Point-in-time recovery enabled for {table_name}")
                except Exception as e:
                    print(f"Failed to enable PITR for {table_name}: {e}")
    
    # Cross-region replication setup
    class ReplicationManager:
        def __init__(self, source_region, target_regions):
            self.source_region = source_region
            self.target_regions = target_regions
    
        def setup_global_tables(self, table_name):
            """Setup DynamoDB Global Tables for cross-region replication"""
            try:
                # Create replicas in target regions
                replica_updates = []
                for region in self.target_regions:
                    replica_updates.append({
                        'Create': {
                            'RegionName': region
                        }
                    })
    
                dynamodb = boto3.client('dynamodb', region_name=self.source_region)
    
                response = dynamodb.create_global_table(
                    GlobalTableName=table_name,
                    ReplicationGroup=[
                        {'RegionName': self.source_region},
                        *[{'RegionName': region} for region in self.target_regions]
                    ]
                )
    
                print(f"Global table created: {table_name}")
                return response['GlobalTableDescription']
    
            except Exception as e:
                print(f"Failed to create global table {table_name}: {e}")
                return None
    Python

    Conclusion

    This comprehensive guide has covered PynamoDB from basic concepts to advanced production deployment strategies. Key takeaways include:

    For Beginners:

    • Start with simple models and basic CRUD operations
    • Understand DynamoDB’s key concepts: hash keys, range keys, and indexes
    • Practice with local DynamoDB for development

    For Intermediate Users:

    • Master querying and filtering techniques
    • Implement proper error handling and validation
    • Use transactions for complex operations
    • Design efficient indexes for your access patterns

    For Advanced Users:

    • Optimize performance with caching and connection pooling
    • Implement comprehensive monitoring and alerting
    • Design for security with encryption and proper IAM policies
    • Plan for disaster recovery with backups and replication

    Production Readiness Checklist:

    graph TB
        A[Production Deployment] --> B[Security]
        A --> C[Monitoring]
        A --> D[Performance]
        A --> E[Reliability]
    
        B --> F[IAM Policies]
        B --> G[Data Encryption]
        B --> H[Network Security]
    
        C --> I[CloudWatch Metrics]
        C --> J[Custom Dashboards]
        C --> K[Alerting Rules]
    
        D --> L[Capacity Planning]
        D --> M[Connection Optimization]
        D --> N[Caching Strategy]
    
        E --> O[Backup Strategy]
        E --> P[Disaster Recovery]
        E --> Q[High Availability]

    PynamoDB provides a powerful, Pythonic way to work with DynamoDB while maintaining the flexibility and performance benefits of NoSQL databases. By following the patterns and practices outlined in this guide, you can build robust, scalable applications that leverage the full power of DynamoDB through PynamoDB’s elegant interface.

    Remember to always design your data models around your access patterns, implement proper error handling, monitor your applications in production, and maintain security best practices throughout your development lifecycle.



    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *