The Complete Guide to Python Beanie: From Beginner to Expert

    Table of Contents

    1. Introduction to Beanie
    2. Setting Up Your Environment
    3. Basic Concepts and Models
    4. CRUD Operations
    5. Advanced Queries
    6. Relationships and References
    7. Aggregation Pipeline
    8. Validation and Serialization
    9. Indexes and Performance
    10. Authentication and Authorization
    11. Testing Strategies
    12. Best Practices and Patterns
    13. Advanced Topics
    14. Production Deployment

    Chapter 1: Introduction to Beanie

    What is Beanie?

    Beanie is an asynchronous Python Object Document Mapper (ODM) for MongoDB, built on top of Pydantic. It provides a pythonic way to work with MongoDB documents while leveraging the power of type hints and async/await syntax.

    graph TB
        A[Python Application] --> B[Beanie ODM]
        B --> C[Motor Driver]
        C --> D[MongoDB]
    
        B --> E[Pydantic Models]
        E --> F[Type Validation]
        E --> G[Serialization]
    
        style B fill:#e1f5fe
        style E fill:#f3e5f5

    Key Features

    • Async/Await Support: Built for modern async Python applications
    • Type Safety: Leverages Pydantic for robust type checking
    • MongoDB Native: Direct integration with MongoDB features
    • FastAPI Integration: Seamless integration with FastAPI framework
    • Aggregation Support: Full MongoDB aggregation pipeline support

    Why Choose Beanie?

    mindmap
      root((Beanie Benefits))
        Performance
          Async Operations
          Connection Pooling
          Lazy Loading
        Developer Experience
          Type Hints
          Auto-completion
          Runtime Validation
        MongoDB Features
          Aggregation Pipeline
          Indexes
          Transactions
        Framework Integration
          FastAPI
          Starlette
          Custom Apps

    Chapter 2: Setting Up Your Environment

    Installation

    pip install beanie
    Bash

    For development with additional tools:

    pip install beanie[dev]
    Bash

    Project Structure

    graph TD
        A[Project Root] --> B[app/]
        B --> C[models/]
        B --> D[routers/]
        B --> E[services/]
        B --> F[config/]
    
        C --> G[user.py]
        C --> H[product.py]
        C --> I[order.py]
    
        D --> J[users.py]
        D --> K[products.py]
    
        E --> L[auth.py]
        E --> M[database.py]
    
        F --> N[settings.py]
        F --> O[database.py]

    Basic Setup

    from beanie import init_beanie
    from motor.motor_asyncio import AsyncIOMotorClient
    from models import User, Product, Order
    
    async def init_db():
        # Create Motor client
        client = AsyncIOMotorClient("mongodb://localhost:27017")
    
        # Initialize beanie with the database and models
        await init_beanie(
            database=client.ecommerce_db,
            document_models=[User, Product, Order]
        )
    Python

    Chapter 3: Basic Concepts and Models

    Document Models

    from beanie import Document
    from pydantic import BaseModel, Field
    from typing import Optional, List
    from datetime import datetime
    from bson import ObjectId
    
    class User(Document):
        name: str
        email: str = Field(..., unique=True)
        age: Optional[int] = None
        created_at: datetime = Field(default_factory=datetime.utcnow)
        is_active: bool = True
    
        class Settings:
            name = "users"  # Collection name
            indexes = [
                "email",  # Single field index
                [("name", 1), ("age", -1)]  # Compound index
            ]
    Python

    Model Inheritance

    classDiagram
        Document <|-- BaseUser
        BaseUser <|-- User
        BaseUser <|-- AdminUser
    
        class Document {
            +ObjectId id
            +save()
            +delete()
            +find()
        }
    
        class BaseUser {
            +str name
            +str email
            +datetime created_at
        }
    
        class User {
            +List~str~ interests
            +Optional~str~ bio
        }
    
        class AdminUser {
            +List~str~ permissions
            +int access_level
        }
    class BaseUser(Document):
        name: str
        email: str
        created_at: datetime = Field(default_factory=datetime.utcnow)
    
        class Settings:
            is_root = True  # This is an abstract base class
    
    class User(BaseUser):
        interests: List[str] = []
        bio: Optional[str] = None
    
        class Settings:
            name = "users"
    
    class AdminUser(BaseUser):
        permissions: List[str] = []
        access_level: int = 1
    
        class Settings:
            name = "admin_users"
    Python

    Embedded Documents

    class Address(BaseModel):
        street: str
        city: str
        country: str
        postal_code: str
    
    class User(Document):
        name: str
        email: str
        address: Optional[Address] = None
        shipping_addresses: List[Address] = []
    
        class Settings:
            name = "users"
    Python

    Chapter 4: CRUD Operations

    Create Operations

    sequenceDiagram
        participant App as Application
        participant Beanie as Beanie ODM
        participant Mongo as MongoDB
    
        App->>Beanie: user = User(name="John", email="john@example.com")
        App->>Beanie: await user.insert()
        Beanie->>Mongo: insertOne(document)
        Mongo-->>Beanie: {acknowledged: true, insertedId: ObjectId}
        Beanie-->>App: User instance with ID
    # Single document creation
    user = User(name="John Doe", email="john@example.com", age=30)
    await user.insert()
    
    # Bulk creation
    users = [
        User(name="Alice", email="alice@example.com"),
        User(name="Bob", email="bob@example.com")
    ]
    await User.insert_many(users)
    
    # Insert with validation
    try:
        user = User(name="", email="invalid-email")  # Invalid data
        await user.insert()
    except ValidationError as e:
        print(f"Validation error: {e}")
    Python

    Read Operations

    # Find by ID
    user = await User.get(user_id)
    
    # Find one document
    user = await User.find_one(User.email == "john@example.com")
    
    # Find multiple documents
    users = await User.find(User.age >= 18).to_list()
    
    # Find all
    all_users = await User.find_all().to_list()
    
    # Find with limit and skip
    page_users = await User.find().skip(10).limit(5).to_list()
    
    # Find with sorting
    sorted_users = await User.find().sort(User.created_at).to_list()
    Python

    Update Operations

    flowchart TD
        A[Update Request] --> B{Update Type}
        B -->|Single Document| C[find_one + update fields + save]
        B -->|Multiple Documents| D[Update Many]
        B -->|Upsert| E[Update with upsert=True]
    
        C --> F["Document.save()"]
        D --> G["Document.update_many()"]
        E --> H["Document.update() with upsert"]
    
        F --> I[Updated Document]
        G --> J[Update Result]
        H --> K["Updated/Inserted Document"]
    # Update single document - Method 1
    user = await User.find_one(User.email == "john@example.com")
    user.age = 31
    await user.save()
    
    # Update single document - Method 2
    await User.find_one(User.email == "john@example.com").update(
        {"$set": {"age": 31}}
    )
    
    # Update multiple documents
    await User.find(User.age < 18).update_many(
        {"$set": {"is_minor": True}}
    )
    
    # Atomic updates
    await User.find_one(User.email == "john@example.com").update(
        {"$inc": {"login_count": 1}}
    )
    
    # Upsert operation
    await User.find_one(User.email == "new@example.com").upsert(
        {"$set": {"name": "New User", "age": 25}},
        on_insert=User(name="New User", email="new@example.com")
    )
    Python

    Delete Operations

    # Delete single document
    user = await User.find_one(User.email == "john@example.com")
    await user.delete()
    
    # Delete by query
    await User.find_one(User.email == "john@example.com").delete()
    
    # Delete multiple documents
    result = await User.find(User.is_active == False).delete_many()
    print(f"Deleted {result.deleted_count} users")
    
    # Delete all documents (be careful!)
    await User.delete_all()
    Python

    Chapter 5: Advanced Queries

    Query Operators

    graph LR
        A[Query Operators] --> B[Comparison]
        A --> C[Logical]
        A --> D[Element]
        A --> E[Array]
    
        B --> B1[$eq, $ne, $gt, $gte, $lt, $lte]
        C --> C1[$and, $or, $not, $nor]
        D --> D1[$exists, $type]
        E --> E1[$in, $nin, $all, $size]
    # Comparison operators
    users = await User.find(User.age > 18).to_list()
    users = await User.find(User.age >= 18, User.age <= 65).to_list()
    users = await User.find(User.name != "admin").to_list()
    
    # Logical operators
    from beanie.operators import Or, And, Not
    
    users = await User.find(
        Or(User.age < 18, User.age > 65)
    ).to_list()
    
    users = await User.find(
        And(User.is_active == True, User.age >= 18)
    ).to_list()
    
    # Element operators
    from beanie.operators import Exists
    
    users = await User.find(Exists(User.bio, True)).to_list()
    
    # Array operators
    from beanie.operators import In, All
    
    users = await User.find(
        In(User.interests, ["python", "mongodb"])
    ).to_list()
    
    # Text search
    users = await User.find(
        {"$text": {"$search": "python developer"}}
    ).to_list()
    Python

    Aggregation Queries

    # Basic aggregation
    pipeline = [
        {"$match": {"age": {"$gte": 18}}},
        {"$group": {
            "_id": "$country",
            "count": {"$sum": 1},
            "avg_age": {"$avg": "$age"}
        }},
        {"$sort": {"count": -1}}
    ]
    
    results = await User.aggregate(pipeline).to_list()
    
    # Using Beanie aggregation methods
    results = await User.find(User.age >= 18).aggregate([
        {"$group": {
            "_id": "$country",
            "count": {"$sum": 1},
            "avg_age": {"$avg": "$age"}
        }}
    ]).to_list()
    Python

    Complex Queries with Joins

    class Order(Document):
        user_id: ObjectId
        product_ids: List[ObjectId]
        total: float
        status: str
        created_at: datetime = Field(default_factory=datetime.utcnow)
    
        class Settings:
            name = "orders"
    
    # Aggregation with lookup (join)
    pipeline = [
        {"$lookup": {
            "from": "users",
            "localField": "user_id",
            "foreignField": "_id",
            "as": "user"
        }},
        {"$unwind": "$user"},
        {"$match": {"user.age": {"$gte": 18}}},
        {"$project": {
            "total": 1,
            "user_name": "$user.name",
            "user_email": "$user.email"
        }}
    ]
    
    orders_with_users = await Order.aggregate(pipeline).to_list()
    Python

    Chapter 6: Relationships and References

    Document References

    erDiagram
        User ||--o{ Order : places
        Order ||--o{ OrderItem : contains
        Product ||--o{ OrderItem : referenced_in
        Category ||--o{ Product : contains
    
        User {
            ObjectId id
            string name
            string email
        }
    
        Order {
            ObjectId id
            ObjectId user_id
            datetime created_at
            float total
        }
    
        Product {
            ObjectId id
            string name
            float price
            ObjectId category_id
        }
    from beanie import Link
    from typing import Optional
    
    class Category(Document):
        name: str
        description: Optional[str] = None
    
        class Settings:
            name = "categories"
    
    class Product(Document):
        name: str
        price: float
        category: Link[Category]  # Reference to Category
        description: Optional[str] = None
    
        class Settings:
            name = "products"
    
    class OrderItem(BaseModel):
        product: Link[Product]
        quantity: int
        price: float
    
    class Order(Document):
        user: Link[User]
        items: List[OrderItem]
        total: float
        status: str = "pending"
        created_at: datetime = Field(default_factory=datetime.utcnow)
    
        class Settings:
            name = "orders"
    Python

    Working with References

    # Creating documents with references
    category = Category(name="Electronics")
    await category.insert()
    
    product = Product(
        name="Laptop",
        price=999.99,
        category=category  # Direct reference
    )
    await product.insert()
    
    # Fetching with references
    product = await Product.find_one(Product.name == "Laptop").populate(Product.category)
    print(product.category.name)  # "Electronics"
    
    # Creating orders with references
    user = await User.find_one(User.email == "john@example.com")
    product = await Product.find_one(Product.name == "Laptop")
    
    order = Order(
        user=user,
        items=[
            OrderItem(product=product, quantity=1, price=product.price)
        ],
        total=product.price
    )
    await order.insert()
    Python
    # Fetch specific links
    order = await Order.find_one().populate(Order.user)
    
    # Fetch nested links
    order = await Order.find_one().populate([
        Order.user,
        {"items": {"product": Product.category}}
    ])
    
    # Fetch all links
    order = await Order.find_one().populate()
    
    # Custom fetch strategies
    from beanie import FetchLinks
    
    # Eager loading
    class Order(Document):
        user: Link[User]
        items: List[OrderItem]
    
        class Settings:
            name = "orders"
            fetch_links = True  # Always fetch links
    
    # Selective fetching
    orders = await Order.find().populate([Order.user]).to_list()
    Python

    Chapter 7: Aggregation Pipeline

    Basic Aggregation Concepts

    flowchart TD
        A[Input Documents] --> B[$match]
        B --> C[$group]
        C --> D[$sort]
        D --> E[$project]
        E --> F[$limit]
        F --> G[Output Documents]
    
        B1[Filter Documents] --> B
        C1[Group by Field] --> C
        D1[Sort Results] --> D
        E1[Select Fields] --> E
        F1[Limit Results] --> F

    Aggregation Stages

    # Match stage - filtering
    match_stage = {"$match": {"status": "completed", "total": {"$gte": 100}}}
    
    # Group stage - aggregating data
    group_stage = {
        "$group": {
            "_id": "$user_id",
            "total_orders": {"$sum": 1},
            "total_spent": {"$sum": "$total"},
            "avg_order_value": {"$avg": "$total"},
            "max_order": {"$max": "$total"},
            "min_order": {"$min": "$total"}
        }
    }
    
    # Sort stage
    sort_stage = {"$sort": {"total_spent": -1}}
    
    # Project stage - shaping output
    project_stage = {
        "$project": {
            "user_id": "$_id",
            "total_orders": 1,
            "total_spent": 1,
            "avg_order_value": {"$round": ["$avg_order_value", 2]},
            "_id": 0
        }
    }
    
    # Complete pipeline
    pipeline = [match_stage, group_stage, sort_stage, project_stage]
    results = await Order.aggregate(pipeline).to_list()
    Python

    Advanced Aggregation Patterns

    # Lookup with pipeline
    lookup_stage = {
        "$lookup": {
            "from": "users",
            "let": {"user_id": "$user_id"},
            "pipeline": [
                {"$match": {"$expr": {"$eq": ["$_id", "$$user_id"]}}},
                {"$project": {"name": 1, "email": 1, "created_at": 1}}
            ],
            "as": "user_info"
        }
    }
    
    # Unwind arrays
    unwind_stage = {"$unwind": "$items"}
    
    # Add computed fields
    add_fields_stage = {
        "$addFields": {
            "item_total": {"$multiply": ["$items.quantity", "$items.price"]},
            "order_year": {"$year": "$created_at"},
            "order_month": {"$month": "$created_at"}
        }
    }
    
    # Conditional operations
    conditional_stage = {
        "$addFields": {
            "order_size": {
                "$switch": {
                    "branches": [
                        {"case": {"$lt": ["$total", 50]}, "then": "small"},
                        {"case": {"$lt": ["$total", 200]}, "then": "medium"},
                        {"case": {"$lt": ["$total", 500]}, "then": "large"}
                    ],
                    "default": "extra_large"
                }
            }
        }
    }
    Python

    Aggregation with Beanie Methods

    class OrderAnalytics:
        @staticmethod
        async def get_user_spending_summary():
            return await Order.aggregate([
                {"$match": {"status": "completed"}},
                {"$group": {
                    "_id": "$user_id",
                    "total_spent": {"$sum": "$total"},
                    "order_count": {"$sum": 1}
                }},
                {"$lookup": {
                    "from": "users",
                    "localField": "_id",
                    "foreignField": "_id",
                    "as": "user"
                }},
                {"$unwind": "$user"},
                {"$project": {
                    "user_name": "$user.name",
                    "user_email": "$user.email",
                    "total_spent": 1,
                    "order_count": 1,
                    "avg_order_value": {"$divide": ["$total_spent", "$order_count"]}
                }},
                {"$sort": {"total_spent": -1}}
            ]).to_list()
    
        @staticmethod
        async def get_monthly_sales():
            return await Order.aggregate([
                {"$match": {"status": "completed"}},
                {"$group": {
                    "_id": {
                        "year": {"$year": "$created_at"},
                        "month": {"$month": "$created_at"}
                    },
                    "total_sales": {"$sum": "$total"},
                    "order_count": {"$sum": 1}
                }},
                {"$sort": {"_id.year": 1, "_id.month": 1}}
            ]).to_list()
    Python

    Chapter 8: Validation and Serialization

    Pydantic Validation

    graph TD
        A[Input Data] --> B[Pydantic Validation]
        B --> C{Valid?}
        C -->|Yes| D[Create Model Instance]
        C -->|No| E[Validation Error]
    
        D --> F[Type Conversion]
        F --> G[Custom Validators]
        G --> H[Final Model]
    
        style E fill:#ffebee
        style H fill:#e8f5e8
    from pydantic import validator, Field, EmailStr
    from typing import List, Optional
    import re
    
    class User(Document):
        name: str = Field(..., min_length=2, max_length=50)
        email: EmailStr
        age: Optional[int] = Field(None, ge=0, le=150)
        phone: Optional[str] = None
        interests: List[str] = Field(default_factory=list, max_items=10)
    
        @validator('name')
        def validate_name(cls, v):
            if not v.strip():
                raise ValueError('Name cannot be empty or only whitespace')
            if any(char.isdigit() for char in v):
                raise ValueError('Name cannot contain numbers')
            return v.strip().title()
    
        @validator('phone')
        def validate_phone(cls, v):
            if v is None:
                return v
            # Simple phone validation
            phone_pattern = r'^\+?[\d\s\-\(\)]+$'
            if not re.match(phone_pattern, v):
                raise ValueError('Invalid phone number format')
            return v
    
        @validator('interests')
        def validate_interests(cls, v):
            # Remove duplicates and empty strings
            cleaned = list(set(interest.strip() for interest in v if interest.strip()))
            return cleaned
    
        class Settings:
            name = "users"
    Python

    Custom Field Types

    from pydantic import BaseModel, validator
    from datetime import datetime, date
    from typing import Any
    
    class DateRange(BaseModel):
        start_date: date
        end_date: date
    
        @validator('end_date')
        def end_after_start(cls, v, values):
            if 'start_date' in values and v < values['start_date']:
                raise ValueError('End date must be after start date')
            return v
    
    class Money(BaseModel):
        amount: float = Field(..., ge=0)
        currency: str = Field(..., regex=r'^[A-Z]{3}$')
    
        def __str__(self):
            return f"{self.amount} {self.currency}"
    
    class Event(Document):
        name: str
        date_range: DateRange
        ticket_price: Money
        max_attendees: int = Field(..., gt=0)
    
        @validator('ticket_price')
        def validate_price(cls, v):
            if v.amount > 10000:
                raise ValueError('Ticket price cannot exceed 10000')
            return v
    
        class Settings:
            name = "events"
    Python

    Serialization and Deserialization

    from pydantic import BaseModel
    from typing import Dict, Any
    
    class UserResponse(BaseModel):
        id: str
        name: str
        email: str
        age: Optional[int]
        created_at: datetime
    
        @classmethod
        def from_document(cls, user: User):
            return cls(
                id=str(user.id),
                name=user.name,
                email=user.email,
                age=user.age,
                created_at=user.created_at
            )
    
    class UserCreate(BaseModel):
        name: str
        email: EmailStr
        age: Optional[int] = None
        interests: List[str] = []
    
        def to_document(self) -> User:
            return User(**self.dict())
    
    # Usage in FastAPI
    from fastapi import FastAPI, HTTPException
    
    app = FastAPI()
    
    @app.post("/users/", response_model=UserResponse)
    async def create_user(user_data: UserCreate):
        try:
            user = user_data.to_document()
            await user.insert()
            return UserResponse.from_document(user)
        except DuplicateKeyError:
            raise HTTPException(status_code=400, detail="Email already exists")
    Python

    Advanced Validation Patterns

    from pydantic import root_validator, validator
    from typing import Union
    
    class ProductVariant(BaseModel):
        size: Optional[str] = None
        color: Optional[str] = None
        material: Optional[str] = None
    
    class Product(Document):
        name: str
        base_price: float = Field(..., gt=0)
        discount_percentage: Optional[float] = Field(None, ge=0, le=100)
        final_price: Optional[float] = None
        category: str
        variants: List[ProductVariant] = []
        tags: List[str] = []
    
        @root_validator
        def calculate_final_price(cls, values):
            base_price = values.get('base_price', 0)
            discount = values.get('discount_percentage', 0)
    
            if discount > 0:
                final_price = base_price * (1 - discount / 100)
            else:
                final_price = base_price
    
            values['final_price'] = round(final_price, 2)
            return values
    
        @validator('variants')
        def validate_variants(cls, v, values):
            category = values.get('category', '').lower()
    
            if category == 'clothing':
                for variant in v:
                    if not variant.size:
                        raise ValueError('Clothing items must have size variants')
    
            return v
    
        @validator('tags', pre=True)
        def normalize_tags(cls, v):
            if isinstance(v, str):
                return [tag.strip().lower() for tag in v.split(',')]
            return [tag.strip().lower() for tag in v if tag.strip()]
    
        class Settings:
            name = "products"
    Python

    Chapter 9: Indexes and Performance

    Index Types and Strategies

    graph TB
        A[Index Types] --> B[Single Field]
        A --> C[Compound]
        A --> D[Text]
        A --> E[Geospatial]
        A --> F[Partial]
        A --> G[Sparse]
    
        B --> B1[Basic Lookup]
        C --> C1[Multi-field Queries]
        D --> D1[Text Search]
        E --> E1[Location Queries]
        F --> F1[Conditional Indexing]
        G --> G1[Null Value Handling]

    Creating Indexes

    from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
    
    class User(Document):
        name: str
        email: str
        age: Optional[int] = None
        location: Optional[Dict] = None  # GeoJSON
        bio: Optional[str] = None
        tags: List[str] = []
        created_at: datetime = Field(default_factory=datetime.utcnow)
        last_login: Optional[datetime] = None
    
        class Settings:
            name = "users"
            indexes = [
                # Single field indexes
                "email",  # Unique by default if specified in field
                "created_at",
    
                # Compound indexes
                [("name", ASCENDING), ("age", DESCENDING)],
                [("created_at", DESCENDING), ("last_login", DESCENDING)],
    
                # Text index for search
                [("name", TEXT), ("bio", TEXT)],
    
                # Geospatial index
                [("location", "2dsphere")],
    
                # Partial index (only indexes documents where condition is true)
                IndexModel(
                    [("last_login", DESCENDING)],
                    partialFilterExpression={"last_login": {"$exists": True}}
                ),
    
                # Sparse index (doesn't index null values)
                IndexModel([("bio", TEXT)], sparse=True),
    
                # TTL index (documents expire after specified time)
                IndexModel(
                    [("created_at", ASCENDING)],
                    expireAfterSeconds=2592000  # 30 days
                )
            ]
    Python

    Performance Optimization

    class QueryOptimizer:
        @staticmethod
        async def efficient_pagination(page: int = 1, limit: int = 20):
            """Efficient pagination using skip and limit"""
            skip = (page - 1) * limit
    
            # Use projection to limit returned fields
            users = await User.find(
                User.is_active == True
            ).project(
                UserResponse
            ).skip(skip).limit(limit).to_list()
    
            return users
    
        @staticmethod
        async def cursor_based_pagination(last_id: Optional[str] = None, limit: int = 20):
            """More efficient pagination for large datasets"""
            query = User.is_active == True
    
            if last_id:
                query = And(query, User.id > ObjectId(last_id))
    
            users = await User.find(query).sort(
                User.id
            ).limit(limit).to_list()
    
            return users
    
        @staticmethod
        async def aggregation_with_indexes():
            """Optimized aggregation using indexes"""
            pipeline = [
                # Use indexed fields in early stages
                {"$match": {"created_at": {"$gte": datetime.utcnow() - timedelta(days=30)}}},
                {"$match": {"is_active": True}},
    
                # Group after filtering
                {"$group": {
                    "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
                    "user_count": {"$sum": 1}
                }},
    
                {"$sort": {"_id": 1}}
            ]
    
            return await User.aggregate(pipeline).to_list()
    Python

    Index Analysis and Monitoring

    from motor.motor_asyncio import AsyncIOMotorClient
    import asyncio
    
    class IndexAnalyzer:
        def __init__(self, client: AsyncIOMotorClient):
            self.client = client
            self.db = client.get_database()
    
        async def analyze_query_performance(self, collection_name: str, query: dict):
            """Analyze query execution plan"""
            collection = self.db[collection_name]
    
            # Get query explanation
            explain_result = await collection.find(query).explain()
    
            return {
                "execution_time_ms": explain_result.get("executionTimeMillis"),
                "total_docs_examined": explain_result.get("totalDocsExamined"),
                "total_docs_returned": explain_result.get("totalDocsReturned"),
                "index_used": explain_result.get("indexUsed"),
                "winning_plan": explain_result.get("queryPlanner", {}).get("winningPlan")
            }
    
        async def get_index_usage_stats(self, collection_name: str):
            """Get index usage statistics"""
            collection = self.db[collection_name]
    
            # Get index stats
            index_stats = []
            async for stat in collection.aggregate([{"$indexStats": {}}]):
                index_stats.append(stat)
    
            return index_stats
    
        async def suggest_indexes(self, collection_name: str, sample_queries: List[dict]):
            """Suggest indexes based on query patterns"""
            suggestions = []
    
            for query in sample_queries:
                # Analyze query fields
                query_fields = self._extract_query_fields(query)
                performance = await self.analyze_query_performance(collection_name, query)
    
                if performance["total_docs_examined"] > performance["total_docs_returned"] * 10:
                    suggestions.append({
                        "query": query,
                        "suggested_index": query_fields,
                        "reason": "High document examination ratio"
                    })
    
            return suggestions
    
        def _extract_query_fields(self, query: dict) -> List[tuple]:
            """Extract fields used in query for index suggestion"""
            fields = []
    
            def extract_fields(obj, prefix=""):
                if isinstance(obj, dict):
                    for key, value in obj.items():
                        if key.startswith("$"):
                            continue
    
                        field_name = f"{prefix}.{key}" if prefix else key
    
                        if isinstance(value, dict):
                            # Check for range queries
                            if any(op in value for op in ["$gt", "$gte", "$lt", "$lte"]):
                                fields.append((field_name, 1))  # Ascending for range
                            elif "$eq" in value or not any(k.startswith("$") for k in value.keys()):
                                fields.append((field_name, 1))  # Equality
                            else:
                                extract_fields(value, field_name)
                        else:
                            fields.append((field_name, 1))  # Simple equality
    
            extract_fields(query)
            return fields
    
    # Usage
    async def optimize_user_queries():
        client = AsyncIOMotorClient("mongodb://localhost:27017")
        analyzer = IndexAnalyzer(client)
    
        # Sample queries to analyze
        sample_queries = [
            {"email": "john@example.com"},
            {"age": {"$gte": 18}, "is_active": True},
            {"created_at": {"$gte": datetime.utcnow() - timedelta(days=7)}},
            {"name": {"$regex": "John", "$options": "i"}}
        ]
    
        suggestions = await analyzer.suggest_indexes("users", sample_queries)
    
        for suggestion in suggestions:
            print(f"Query: {suggestion['query']}")
            print(f"Suggested Index: {suggestion['suggested_index']}")
            print(f"Reason: {suggestion['reason']}\n")
    Python

    Chapter 10: Authentication and Authorization

    User Authentication Models

    sequenceDiagram
        participant Client
        participant API
        participant Auth
        participant DB
    
        Client->>API: POST /login {email, password}
        API->>Auth: validate_credentials()
        Auth->>DB: find_user(email)
        DB-->>Auth: user_document
        Auth->>Auth: verify_password()
        Auth-->>API: access_token
        API-->>Client: {token, user_info}
    
        Client->>API: GET /protected (Authorization: Bearer token)
        API->>Auth: verify_token()
        Auth-->>API: user_data
        API-->>Client: protected_data
    from passlib.context import CryptContext
    from jose import JWTError, jwt
    from datetime import datetime, timedelta
    from typing import Optional
    
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    class User(Document):
        email: EmailStr = Field(..., unique=True)
        username: str = Field(..., unique=True)
        hashed_password: str
        is_active: bool = True
        is_verified: bool = False
        roles: List[str] = Field(default_factory=lambda: ["user"])
        created_at: datetime = Field(default_factory=datetime.utcnow)
        last_login: Optional[datetime] = None
        password_reset_token: Optional[str] = None
        password_reset_expires: Optional[datetime] = None
    
        @staticmethod
        def hash_password(password: str) -> str:
            return pwd_context.hash(password)
    
        def verify_password(self, password: str) -> bool:
            return pwd_context.verify(password, self.hashed_password)
    
        def has_role(self, role: str) -> bool:
            return role in self.roles
    
        def add_role(self, role: str):
            if role not in self.roles:
                self.roles.append(role)
    
        def remove_role(self, role: str):
            if role in self.roles:
                self.roles.remove(role)
    
        class Settings:
            name = "users"
            indexes = [
                "email",
                "username",
                "password_reset_token"
            ]
    
    class RefreshToken(Document):
        user_id: ObjectId
        token: str = Field(..., unique=True)
        expires_at: datetime
        is_revoked: bool = False
        created_at: datetime = Field(default_factory=datetime.utcnow)
    
        @classmethod
        async def create_for_user(cls, user: User, expires_in_days: int = 30):
            token = secrets.token_urlsafe(32)
            refresh_token = cls(
                user_id=user.id,
                token=token,
                expires_at=datetime.utcnow() + timedelta(days=expires_in_days)
            )
            await refresh_token.insert()
            return refresh_token
    
        async def revoke(self):
            self.is_revoked = True
            await self.save()
    
        class Settings:
            name = "refresh_tokens"
            indexes = [
                "token",
                "user_id",
                "expires_at"
            ]
    Python

    Authentication Service

    import secrets
    from jose import JWTError, jwt
    from datetime import datetime, timedelta
    
    class AuthService:
        def __init__(self, secret_key: str, algorithm: str = "HS256"):
            self.secret_key = secret_key
            self.algorithm = algorithm
            self.access_token_expire_minutes = 30
            self.refresh_token_expire_days = 30
    
        async def register_user(self, email: str, username: str, password: str) -> User:
            """Register a new user"""
            # Check if user exists
            existing_user = await User.find_one(
                Or(User.email == email, User.username == username)
            )
    
            if existing_user:
                raise ValueError("User with this email or username already exists")
    
            # Create new user
            user = User(
                email=email,
                username=username,
                hashed_password=User.hash_password(password)
            )
    
            await user.insert()
            return user
    
        async def authenticate_user(self, email: str, password: str) -> Optional[User]:
            """Authenticate user with email and password"""
            user = await User.find_one(User.email == email)
    
            if not user or not user.verify_password(password):
                return None
    
            if not user.is_active:
                raise ValueError("User account is disabled")
    
            # Update last login
            user.last_login = datetime.utcnow()
            await user.save()
    
            return user
    
        def create_access_token(self, user: User) -> str:
            """Create JWT access token"""
            expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
    
            to_encode = {
                "sub": str(user.id),
                "email": user.email,
                "username": user.username,
                "roles": user.roles,
                "exp": expire,
                "iat": datetime.utcnow(),
                "type": "access"
            }
    
            return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
    
        async def create_refresh_token(self, user: User) -> str:
            """Create refresh token"""
            refresh_token = await RefreshToken.create_for_user(user)
            return refresh_token.token
    
        async def verify_access_token(self, token: str) -> User:
            """Verify and decode access token"""
            try:
                payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
                user_id = payload.get("sub")
    
                if user_id is None or payload.get("type") != "access":
                    raise JWTError("Invalid token")
    
                user = await User.get(ObjectId(user_id))
                if user is None or not user.is_active:
                    raise JWTError("User not found or inactive")
    
                return user
    
            except JWTError:
                raise JWTError("Could not validate credentials")
    
        async def refresh_access_token(self, refresh_token: str) -> tuple[str, str]:
            """Create new access token using refresh token"""
            token_doc = await RefreshToken.find_one(
                RefreshToken.token == refresh_token,
                RefreshToken.is_revoked == False,
                RefreshToken.expires_at > datetime.utcnow()
            )
    
            if not token_doc:
                raise JWTError("Invalid or expired refresh token")
    
            user = await User.get(token_doc.user_id)
            if not user or not user.is_active:
                raise JWTError("User not found or inactive")
    
            # Create new tokens
            new_access_token = self.create_access_token(user)
            new_refresh_token = await self.create_refresh_token(user)
    
            # Revoke old refresh token
            await token_doc.revoke()
    
            return new_access_token, new_refresh_token
    
        async def logout_user(self, refresh_token: str):
            """Logout user by revoking refresh token"""
            token_doc = await RefreshToken.find_one(RefreshToken.token == refresh_token)
            if token_doc:
                await token_doc.revoke()
    Python

    Role-Based Access Control

    from functools import wraps
    from fastapi import HTTPException, Depends, status
    from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
    
    security = HTTPBearer()
    
    class Permission:
        READ = "read"
        WRITE = "write"
        DELETE = "delete"
        ADMIN = "admin"
    
    class Role:
        USER = "user"
        MODERATOR = "moderator"
        ADMIN = "admin"
        SUPER_ADMIN = "super_admin"
    
    ROLE_PERMISSIONS = {
        Role.USER: [Permission.READ],
        Role.MODERATOR: [Permission.READ, Permission.WRITE],
        Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE],
        Role.SUPER_ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN]
    }
    
    class AuthorizationService:
        @staticmethod
        def user_has_permission(user: User, permission: str) -> bool:
            """Check if user has specific permission"""
            user_permissions = set()
    
            for role in user.roles:
                if role in ROLE_PERMISSIONS:
                    user_permissions.update(ROLE_PERMISSIONS[role])
    
            return permission in user_permissions
    
        @staticmethod
        def require_permission(permission: str):
            """Decorator to require specific permission"""
            def decorator(func):
                @wraps(func)
                async def wrapper(*args, **kwargs):
                    # Extract user from kwargs (passed by get_current_user dependency)
                    user = kwargs.get('current_user')
                    if not user:
                        raise HTTPException(
                            status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Authentication required"
                        )
    
                    if not AuthorizationService.user_has_permission(user, permission):
                        raise HTTPException(
                            status_code=status.HTTP_403_FORBIDDEN,
                            detail="Insufficient permissions"
                        )
    
                    return await func(*args, **kwargs)
                return wrapper
            return decorator
    
        @staticmethod
        def require_roles(*required_roles):
            """Decorator to require specific roles"""
            def decorator(func):
                @wraps(func)
                async def wrapper(*args, **kwargs):
                    user = kwargs.get('current_user')
                    if not user:
                        raise HTTPException(
                            status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Authentication required"
                        )
    
                    if not any(role in user.roles for role in required_roles):
                        raise HTTPException(
                            status_code=status.HTTP_403_FORBIDDEN,
                            detail="Insufficient role permissions"
                        )
    
                    return await func(*args, **kwargs)
                return wrapper
            return decorator
    
    # FastAPI dependencies
    auth_service = AuthService("your-secret-key")
    
    async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
        """FastAPI dependency to get current authenticated user"""
        try:
            user = await auth_service.verify_access_token(credentials.credentials)
            return user
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Could not validate credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )
    
    async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
        """Get current active user"""
        if not current_user.is_active:
            raise HTTPException(status_code=400, detail="Inactive user")
        return current_user
    
    # Usage in FastAPI routes
    from fastapi import FastAPI, Depends
    
    app = FastAPI()
    
    @app.get("/users/me")
    async def get_user_profile(current_user: User = Depends(get_current_active_user)):
        return current_user
    
    @app.get("/admin/users")
    @AuthorizationService.require_permission(Permission.ADMIN)
    async def get_all_users(current_user: User = Depends(get_current_active_user)):
        return await User.find_all().to_list()
    
    @app.delete("/users/{user_id}")
    @AuthorizationService.require_roles(Role.ADMIN, Role.SUPER_ADMIN)
    async def delete_user(
        user_id: str, 
        current_user: User = Depends(get_current_active_user)
    ):
        user_to_delete = await User.get(ObjectId(user_id))
        if user_to_delete:
            await user_to_delete.delete()
        return {"message": "User deleted successfully"}
    Python

    Chapter 11: Testing Strategies

    Test Environment Setup

    graph TD
        A[Test Suite] --> B[Unit Tests]
        A --> C[Integration Tests]
        A --> D[End-to-End Tests]
    
        B --> B1[Model Validation]
        B --> B2[Service Logic]
        B --> B3[Utility Functions]
    
        C --> C1[Database Operations]
        C --> C2[API Endpoints]
        C --> C3[Authentication]
    
        D --> D1[Complete User Flows]
        D --> D2[Performance Tests]
        D --> D3[Load Tests]
    import pytest
    import asyncio
    from motor.motor_asyncio import AsyncIOMotorClient
    from beanie import init_beanie
    from httpx import AsyncClient
    from fastapi.testclient import TestClient
    
    # Test configuration
    TEST_DATABASE_URL = "mongodb://localhost:27017/test_db"
    
    class TestConfig:
        mongodb_url = TEST_DATABASE_URL
        secret_key = "test-secret-key"
    
    @pytest.fixture(scope="session")
    def event_loop():
        """Create an instance of the default event loop for the test session."""
        loop = asyncio.get_event_loop_policy().new_event_loop()
        yield loop
        loop.close()
    
    @pytest.fixture(scope="session")
    async def mongodb_client():
        """Create MongoDB client for testing"""
        client = AsyncIOMotorClient(TEST_DATABASE_URL)
        yield client
        client.close()
    
    @pytest.fixture(scope="function")
    async def database(mongodb_client):
        """Create and clean database for each test"""
        db = mongodb_client.test_db
    
        # Initialize Beanie
        await init_beanie(
            database=db,
            document_models=[User, Product, Order, RefreshToken]
        )
    
        yield db
    
        # Clean up after test
        await db.drop_collection("users")
        await db.drop_collection("products")
        await db.drop_collection("orders")
        await db.drop_collection("refresh_tokens")
    
    @pytest.fixture
    async def sample_user(database):
        """Create a sample user for testing"""
        user = User(
            email="test@example.com",
            username="testuser",
            hashed_password=User.hash_password("testpassword")
        )
        await user.insert()
        return user
    
    @pytest.fixture
    async def auth_service():
        """Create auth service for testing"""
        return AuthService(TestConfig.secret_key)
    Python

    Unit Tests

    import pytest
    from pydantic import ValidationError
    from datetime import datetime, timedelta
    
    class TestUserModel:
        async def test_user_creation(self, database):
            """Test basic user creation"""
            user = User(
                email="newuser@example.com",
                username="newuser",
                hashed_password="hashedpassword"
            )
    
            await user.insert()
    
            assert user.id is not None
            assert user.email == "newuser@example.com"
            assert user.is_active is True
            assert user.created_at is not None
    
        async def test_user_validation(self, database):
            """Test user validation"""
            # Test invalid email
            with pytest.raises(ValidationError):
                User(
                    email="invalid-email",
                    username="testuser",
                    hashed_password="password"
                )
    
            # Test duplicate email
            await User(
                email="duplicate@example.com",
                username="user1",
                hashed_password="password"
            ).insert()
    
            with pytest.raises(Exception):  # DuplicateKeyError
                await User(
                    email="duplicate@example.com",
                    username="user2",
                    hashed_password="password"
                ).insert()
    
        async def test_password_hashing(self):
            """Test password hashing and verification"""
            password = "mypassword123"
            hashed = User.hash_password(password)
    
            assert hashed != password
            assert User.verify_password(password, hashed) is True
            assert User.verify_password("wrongpassword", hashed) is False
    
        async def test_user_roles(self, sample_user):
            """Test role management"""
            assert sample_user.has_role("user") is True
            assert sample_user.has_role("admin") is False
    
            sample_user.add_role("moderator")
            assert sample_user.has_role("moderator") is True
    
            sample_user.remove_role("user")
            assert sample_user.has_role("user") is False
    
    class TestAuthService:
        async def test_register_user(self, auth_service, database):
            """Test user registration"""
            user = await auth_service.register_user(
                email="register@example.com",
                username="registeruser",
                password="password123"
            )
    
            assert user.email == "register@example.com"
            assert user.username == "registeruser"
            assert user.verify_password("password123")
    
        async def test_authenticate_user(self, auth_service, sample_user):
            """Test user authentication"""
            # Test valid credentials
            user = await auth_service.authenticate_user(
                email="test@example.com",
                password="testpassword"
            )
            assert user is not None
            assert user.email == "test@example.com"
    
            # Test invalid credentials
            user = await auth_service.authenticate_user(
                email="test@example.com",
                password="wrongpassword"
            )
            assert user is None
    
        async def test_token_creation_and_verification(self, auth_service, sample_user):
            """Test JWT token creation and verification"""
            # Create access token
            token = auth_service.create_access_token(sample_user)
            assert isinstance(token, str)
            assert len(token) > 0
    
            # Verify token
            user = await auth_service.verify_access_token(token)
            assert user.id == sample_user.id
            assert user.email == sample_user.email
    
        async def test_refresh_token_flow(self, auth_service, sample_user, database):
            """Test refresh token creation and usage"""
            # Create refresh token
            refresh_token = await auth_service.create_refresh_token(sample_user)
            assert isinstance(refresh_token, str)
    
            # Use refresh token to get new access token
            new_access_token, new_refresh_token = await auth_service.refresh_access_token(refresh_token)
    
            assert isinstance(new_access_token, str)
            assert isinstance(new_refresh_token, str)
            assert new_refresh_token != refresh_token
    
            # Verify old refresh token is revoked
            with pytest.raises(Exception):
                await auth_service.refresh_access_token(refresh_token)
    Python

    Integration Tests

    from httpx import AsyncClient
    from fastapi import FastAPI
    
    @pytest.fixture
    async def app():
        """Create FastAPI app for testing"""
        from main import create_app  # Your app factory
        app = create_app(TestConfig)
        return app
    
    @pytest.fixture
    async def client(app):
        """Create HTTP client for testing"""
        async with AsyncClient(app=app, base_url="http://test") as ac:
            yield ac
    
    class TestUserEndpoints:
        async def test_register_user(self, client: AsyncClient, database):
            """Test user registration endpoint"""
            response = await client.post("/auth/register", json={
                "email": "newuser@example.com",
                "username": "newuser",
                "password": "password123"
            })
    
            assert response.status_code == 201
            data = response.json()
            assert data["email"] == "newuser@example.com"
            assert "access_token" in data
    
        async def test_login_user(self, client: AsyncClient, sample_user):
            """Test user login endpoint"""
            response = await client.post("/auth/login", json={
                "email": "test@example.com",
                "password": "testpassword"
            })
    
            assert response.status_code == 200
            data = response.json()
            assert "access_token" in data
            assert "refresh_token" in data
            assert data["user"]["email"] == "test@example.com"
    
        async def test_protected_endpoint(self, client: AsyncClient, sample_user, auth_service):
            """Test accessing protected endpoint"""
            # Get access token
            token = auth_service.create_access_token(sample_user)
    
            # Access protected endpoint
            headers = {"Authorization": f"Bearer {token}"}
            response = await client.get("/users/me", headers=headers)
    
            assert response.status_code == 200
            data = response.json()
            assert data["email"] == "test@example.com"
    
        async def test_unauthorized_access(self, client: AsyncClient):
            """Test unauthorized access to protected endpoint"""
            response = await client.get("/users/me")
            assert response.status_code == 401
    
    class TestProductEndpoints:
        async def test_create_product(self, client: AsyncClient, sample_user, auth_service):
            """Test product creation"""
            token = auth_service.create_access_token(sample_user)
            headers = {"Authorization": f"Bearer {token}"}
    
            response = await client.post("/products/", 
                headers=headers,
                json={
                    "name": "Test Product",
                    "price": 99.99,
                    "description": "A test product"
                }
            )
    
            assert response.status_code == 201
            data = response.json()
            assert data["name"] == "Test Product"
            assert data["price"] == 99.99
    
        async def test_get_products(self, client: AsyncClient, database):
            """Test getting products list"""
            # Create test products
            products = [
                Product(name=f"Product {i}", price=float(i * 10))
                for i in range(1, 6)
            ]
            await Product.insert_many(products)
    
            response = await client.get("/products/")
            assert response.status_code == 200
    
            data = response.json()
            assert len(data) == 5
            assert data[0]["name"] == "Product 1"
    Python

    Performance Tests

    import time
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    class TestPerformance:
        async def test_bulk_user_creation(self, database):
            """Test bulk user creation performance"""
            start_time = time.time()
    
            users = [
                User(
                    email=f"user{i}@example.com",
                    username=f"user{i}",
                    hashed_password=User.hash_password("password")
                )
                for i in range(1000)
            ]
    
            await User.insert_many(users)
    
            end_time = time.time()
            execution_time = end_time - start_time
    
            assert execution_time < 5.0  # Should complete within 5 seconds
    
            # Verify all users were created
            user_count = await User.count()
            assert user_count == 1000
    
        async def test_concurrent_reads(self, database):
            """Test concurrent read operations"""
            # Create test data
            users = [
                User(
                    email=f"concurrent{i}@example.com",
                    username=f"concurrent{i}",
                    hashed_password=User.hash_password("password")
                )
                for i in range(100)
            ]
            await User.insert_many(users)
    
            async def read_users():
                return await User.find().limit(10).to_list()
    
            start_time = time.time()
    
            # Run 50 concurrent read operations
            tasks = [read_users() for _ in range(50)]
            results = await asyncio.gather(*tasks)
    
            end_time = time.time()
            execution_time = end_time - start_time
    
            assert execution_time < 3.0  # Should complete within 3 seconds
            assert len(results) == 50
            assert all(len(result) == 10 for result in results)
    
        async def test_aggregation_performance(self, database):
            """Test aggregation query performance"""
            # Create test orders
            users = [
                User(email=f"agg{i}@example.com", username=f"agg{i}", 
                     hashed_password=User.hash_password("password"))
                for i in range(100)
            ]
            await User.insert_many(users)
    
            orders = []
            for i, user in enumerate(users):
                for j in range(10):  # 10 orders per user
                    orders.append(Order(
                        user=user,
                        items=[],
                        total=float((i + 1) * (j + 1) * 10),
                        status="completed"
                    ))
    
            await Order.insert_many(orders)
    
            start_time = time.time()
    
            # Complex aggregation query
            pipeline = [
                {"$match": {"status": "completed"}},
                {"$group": {
                    "_id": "$user_id",
                    "total_spent": {"$sum": "$total"},
                    "order_count": {"$sum": 1}
                }},
                {"$match": {"total_spent": {"$gte": 100}}},
                {"$sort": {"total_spent": -1}},
                {"$limit": 20}
            ]
    
            results = await Order.aggregate(pipeline).to_list()
    
            end_time = time.time()
            execution_time = end_time - start_time
    
            assert execution_time < 2.0  # Should complete within 2 seconds
            assert len(results) <= 20
    
    @pytest.mark.load
    class TestLoadTests:
        """Load tests - run with pytest -m load"""
    
        async def test_api_load(self, client: AsyncClient, sample_user, auth_service):
            """Test API under load"""
            token = auth_service.create_access_token(sample_user)
            headers = {"Authorization": f"Bearer {token}"}
    
            async def make_request():
                response = await client.get("/users/me", headers=headers)
                return response.status_code
    
            # Run 100 concurrent requests
            tasks = [make_request() for _ in range(100)]
            start_time = time.time()
    
            results = await asyncio.gather(*tasks)
    
            end_time = time.time()
            execution_time = end_time - start_time
    
            # All requests should succeed
            assert all(status == 200 for status in results)
    
            # Should handle 100 requests reasonably fast
            assert execution_time < 10.0
    
            # Calculate requests per second
            rps = len(results) / execution_time
            print(f"Requests per second: {rps:.2f}")
    
            # Should handle at least 10 RPS
            assert rps >= 10
    Python

    Test Utilities

    class TestDataFactory:
        """Factory for creating test data"""
    
        @staticmethod
        async def create_user(
            email: str = None,
            username: str = None,
            password: str = "testpassword",
            roles: List[str] = None,
            **kwargs
        ) -> User:
            """Create a test user"""
            if email is None:
                email = f"user{int(time.time())}@example.com"
            if username is None:
                username = f"user{int(time.time())}"
            if roles is None:
                roles = ["user"]
    
            user = User(
                email=email,
                username=username,
                hashed_password=User.hash_password(password),
                roles=roles,
                **kwargs
            )
            await user.insert()
            return user
    
        @staticmethod
        async def create_product(
            name: str = None,
            price: float = 99.99,
            category: str = "test",
            **kwargs
        ) -> Product:
            """Create a test product"""
            if name is None:
                name = f"Product {int(time.time())}"
    
            product = Product(
                name=name,
                price=price,
                category=category,
                **kwargs
            )
            await product.insert()
            return product
    
        @staticmethod
        async def create_order(
            user: User,
            items: List[dict] = None,
            status: str = "pending",
            **kwargs
        ) -> Order:
            """Create a test order"""
            if items is None:
                product = await TestDataFactory.create_product()
                items = [{"product": product, "quantity": 1, "price": product.price}]
    
            total = sum(item["quantity"] * item["price"] for item in items)
    
            order = Order(
                user=user,
                items=items,
                total=total,
                status=status,
                **kwargs
            )
            await order.insert()
            return order
    
    class DatabaseTestUtils:
        """Utilities for database testing"""
    
        @staticmethod
        async def clear_collections(*models):
            """Clear specified collections"""
            for model in models:
                await model.delete_all()
    
        @staticmethod
        async def count_documents(model) -> int:
            """Count documents in collection"""
            return await model.count()
    
        @staticmethod
        async def create_indexes(model):
            """Ensure indexes are created for model"""
            if hasattr(model, 'Settings') and hasattr(model.Settings, 'indexes'):
                # Indexes are automatically created by Beanie during init
                pass
    Python

    Chapter 12: Best Practices and Patterns

    Project Organization

    graph TD
        A[Clean Architecture] --> B[Domain Layer]
        A --> C[Application Layer]
        A --> D[Infrastructure Layer]
        A --> E[Presentation Layer]
    
        B --> B1[Models/Entities]
        B --> B2[Business Logic]
    
        C --> C1[Services]
        C --> C2[Use Cases]
    
        D --> D1[Database]
        D --> D2[External APIs]
    
        E --> E1[FastAPI Routes]
        E --> E2[Request/Response Models]

    Repository Pattern

    from abc import ABC, abstractmethod
    from typing import List, Optional, Generic, TypeVar
    from beanie import Document
    
    T = TypeVar('T', bound=Document)
    
    class BaseRepository(ABC, Generic[T]):
        """Abstract base repository"""
    
        def __init__(self, model: type[T]):
            self.model = model
    
        @abstractmethod
        async def create(self, entity: T) -> T:
            pass
    
        @abstractmethod
        async def get_by_id(self, id: str) -> Optional[T]:
            pass
    
        @abstractmethod
        async def update(self, entity: T) -> T:
            pass
    
        @abstractmethod
        async def delete(self, id: str) -> bool:
            pass
    
        @abstractmethod
        async def find_all(self, skip: int = 0, limit: int = 100) -> List[T]:
            pass
    
    class BeanieRepository(BaseRepository[T]):
        """Beanie implementation of repository pattern"""
    
        async def create(self, entity: T) -> T:
            await entity.insert()
            return entity
    
        async def get_by_id(self, id: str) -> Optional[T]:
            try:
                return await self.model.get(ObjectId(id))
            except:
                return None
    
        async def update(self, entity: T) -> T:
            await entity.save()
            return entity
    
        async def delete(self, id: str) -> bool:
            entity = await self.get_by_id(id)
            if entity:
                await entity.delete()
                return True
            return False
    
        async def find_all(self, skip: int = 0, limit: int = 100) -> List[T]:
            return await self.model.find().skip(skip).limit(limit).to_list()
    
    class UserRepository(BeanieRepository[User]):
        """User-specific repository methods"""
    
        async def find_by_email(self, email: str) -> Optional[User]:
            return await self.model.find_one(self.model.email == email)
    
        async def find_active_users(self, skip: int = 0, limit: int = 100) -> List[User]:
            return await self.model.find(
                self.model.is_active == True
            ).skip(skip).limit(limit).to_list()
    
        async def search_by_name(self, name: str) -> List[User]:
            return await self.model.find(
                {"$text": {"$search": name}}
            ).to_list()
    
    class ProductRepository(BeanieRepository[Product]):
        """Product-specific repository methods"""
    
        async def find_by_category(self, category: str) -> List[Product]:
            return await self.model.find(
                self.model.category == category
            ).to_list()
    
        async def find_in_price_range(self, min_price: float, max_price: float) -> List[Product]:
            return await self.model.find(
                self.model.price >= min_price,
                self.model.price <= max_price
            ).to_list()
    
        async def get_top_products(self, limit: int = 10) -> List[Product]:
            # Assuming we have a rating field
            return await self.model.find().sort(-self.model.rating).limit(limit).to_list()
    Python

    Service Layer Pattern

    from typing import List, Optional
    from dataclasses import dataclass
    
    @dataclass
    class UserCreateRequest:
        email: str
        username: str
        password: str
        name: Optional[str] = None
    
    @dataclass
    class UserUpdateRequest:
        name: Optional[str] = None
        email: Optional[str] = None
        is_active: Optional[bool] = None
    
    class UserService:
        """Business logic for user operations"""
    
        def __init__(self, user_repo: UserRepository, auth_service: AuthService):
            self.user_repo = user_repo
            self.auth_service = auth_service
    
        async def create_user(self, request: UserCreateRequest) -> User:
            """Create a new user with business validation"""
            # Check if user already exists
            existing_user = await self.user_repo.find_by_email(request.email)
            if existing_user:
                raise ValueError("User with this email already exists")
    
            # Create user
            user = User(
                email=request.email,
                username=request.username,
                hashed_password=User.hash_password(request.password),
                name=request.name or request.username
            )
    
            return await self.user_repo.create(user)
    
        async def update_user(self, user_id: str, request: UserUpdateRequest) -> Optional[User]:
            """Update user with business validation"""
            user = await self.user_repo.get_by_id(user_id)
            if not user:
                return None
    
            # Update fields
            if request.name is not None:
                user.name = request.name
            if request.email is not None:
                # Check email uniqueness
                existing = await self.user_repo.find_by_email(request.email)
                if existing and existing.id != user.id:
                    raise ValueError("Email already in use")
                user.email = request.email
            if request.is_active is not None:
                user.is_active = request.is_active
    
            return await self.user_repo.update(user)
    
        async def get_user_profile(self, user_id: str) -> Optional[User]:
            """Get user profile with additional data"""
            user = await self.user_repo.get_by_id(user_id)
            if user:
                # Could add additional business logic here
                # e.g., calculating user statistics, recent activity, etc.
                pass
            return user
    
        async def deactivate_user(self, user_id: str) -> bool:
            """Deactivate user (soft delete)"""
            user = await self.user_repo.get_by_id(user_id)
            if user:
                user.is_active = False
                await self.user_repo.update(user)
                return True
            return False
    
    class OrderService:
        """Business logic for order operations"""
    
        def __init__(self, order_repo: 'OrderRepository', product_repo: ProductRepository, user_repo: UserRepository):
            self.order_repo = order_repo
            self.product_repo = product_repo
            self.user_repo = user_repo
    
        async def create_order(self, user_id: str, items: List[dict]) -> Order:
            """Create order with business validation"""
            # Validate user
            user = await self.user_repo.get_by_id(user_id)
            if not user or not user.is_active:
                raise ValueError("Invalid or inactive user")
    
            # Validate and calculate order
            validated_items = []
            total = 0.0
    
            for item in items:
                product = await self.product_repo.get_by_id(item['product_id'])
                if not product:
                    raise ValueError(f"Product {item['product_id']} not found")
    
                quantity = item['quantity']
                if quantity <= 0:
                    raise ValueError("Quantity must be positive")
    
                item_total = product.price * quantity
                total += item_total
    
                validated_items.append({
                    'product': product,
                    'quantity': quantity,
                    'price': product.price
                })
    
            # Create order
            order = Order(
                user=user,
                items=validated_items,
                total=total,
                status="pending"
            )
    
            return await self.order_repo.create(order)
    
        async def process_payment(self, order_id: str) -> bool:
            """Process order payment"""
            order = await self.order_repo.get_by_id(order_id)
            if not order or order.status != "pending":
                return False
    
            # Payment processing logic here
            # This could integrate with external payment services
    
            order.status = "paid"
            await self.order_repo.update(order)
            return True
    
        async def get_user_orders(self, user_id: str) -> List[Order]:
            """Get all orders for a user"""
            # This would be implemented in OrderRepository
            pass
    Python

    Error Handling Patterns

    from enum import Enum
    from typing import Optional, Any
    from fastapi import HTTPException, status
    
    class ErrorCode(Enum):
        USER_NOT_FOUND = "USER_NOT_FOUND"
        USER_ALREADY_EXISTS = "USER_ALREADY_EXISTS"
        INVALID_CREDENTIALS = "INVALID_CREDENTIALS"
        INSUFFICIENT_PERMISSIONS = "INSUFFICIENT_PERMISSIONS"
        VALIDATION_ERROR = "VALIDATION_ERROR"
        INTERNAL_ERROR = "INTERNAL_ERROR"
    
    class BusinessException(Exception):
        """Base exception for business logic errors"""
    
        def __init__(self, error_code: ErrorCode, message: str, details: Optional[Any] = None):
            self.error_code = error_code
            self.message = message
            self.details = details
            super().__init__(message)
    
    class UserNotFoundException(BusinessException):
        def __init__(self, user_id: str):
            super().__init__(
                ErrorCode.USER_NOT_FOUND,
                f"User with ID {user_id} not found"
            )
    
    class UserAlreadyExistsException(BusinessException):
        def __init__(self, email: str):
            super().__init__(
                ErrorCode.USER_ALREADY_EXISTS,
                f"User with email {email} already exists"
            )
    
    class ErrorHandler:
        """Global error handling for API"""
    
        @staticmethod
        def handle_business_exception(exc: BusinessException) -> HTTPException:
            """Convert business exceptions to HTTP exceptions"""
            error_mappings = {
                ErrorCode.USER_NOT_FOUND: status.HTTP_404_NOT_FOUND,
                ErrorCode.USER_ALREADY_EXISTS: status.HTTP_409_CONFLICT,
                ErrorCode.INVALID_CREDENTIALS: status.HTTP_401_UNAUTHORIZED,
                ErrorCode.INSUFFICIENT_PERMISSIONS: status.HTTP_403_FORBIDDEN,
                ErrorCode.VALIDATION_ERROR: status.HTTP_422_UNPROCESSABLE_ENTITY,
                ErrorCode.INTERNAL_ERROR: status.HTTP_500_INTERNAL_SERVER_ERROR,
            }
    
            status_code = error_mappings.get(exc.error_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
    
            return HTTPException(
                status_code=status_code,
                detail={
                    "error_code": exc.error_code.value,
                    "message": exc.message,
                    "details": exc.details
                }
            )
    
    # Usage in FastAPI
    from fastapi import FastAPI, Request
    from fastapi.responses import JSONResponse
    
    app = FastAPI()
    
    @app.exception_handler(BusinessException)
    async def business_exception_handler(request: Request, exc: BusinessException):
        http_exc = ErrorHandler.handle_business_exception(exc)
        return JSONResponse(
            status_code=http_exc.status_code,
            content=http_exc.detail
        )
    
    @app.exception_handler(ValidationError)
    async def validation_exception_handler(request: Request, exc: ValidationError):
        return JSONResponse(
            status_code=422,
            content={
                "error_code": "VALIDATION_ERROR",
                "message": "Validation failed",
                "details": exc.errors()
            }
        )
    Python

    Caching Strategies

    import json
    import pickle
    from typing import Any, Optional, Callable
    from functools import wraps
    import redis.asyncio as redis
    from datetime import timedelta
    
    class CacheService:
        """Redis-based caching service"""
    
        def __init__(self, redis_client: redis.Redis):
            self.redis = redis_client
    
        async def get(self, key: str) -> Optional[Any]:
            """Get value from cache"""
            try:
                data = await self.redis.get(key)
                if data:
                    return pickle.loads(data)
            except Exception:
                pass
            return None
    
        async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
            """Set value in cache with TTL"""
            try:
                data = pickle.dumps(value)
                return await self.redis.setex(key, ttl, data)
            except Exception:
                return False
    
        async def delete(self, key: str) -> bool:
            """Delete key from cache"""
            try:
                return await self.redis.delete(key) > 0
            except Exception:
                return False
    
        async def clear_pattern(self, pattern: str) -> int:
            """Clear all keys matching pattern"""
            try:
                keys = await self.redis.keys(pattern)
                if keys:
                    return await self.redis.delete(*keys)
                return 0
            except Exception:
                return 0
    
    def cache_result(ttl: int = 3600, key_prefix: str = ""):
        """Decorator to cache function results"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
    
                # Try to get from cache
                cache_service = kwargs.get('cache_service')
                if cache_service:
                    cached_result = await cache_service.get(cache_key)
                    if cached_result is not None:
                        return cached_result
    
                # Execute function
                result = await func(*args, **kwargs)
    
                # Cache result
                if cache_service and result is not None:
                    await cache_service.set(cache_key, result, ttl)
    
                return result
            return wrapper
        return decorator
    
    class CachedUserService(UserService):
        """User service with caching"""
    
        def __init__(self, user_repo: UserRepository, auth_service: AuthService, cache_service: CacheService):
            super().__init__(user_repo, auth_service)
            self.cache = cache_service
    
        @cache_result(ttl=1800, key_prefix="user")
        async def get_user_profile(self, user_id: str, cache_service: CacheService = None) -> Optional[User]:
            """Get user profile with caching"""
            if cache_service is None:
                cache_service = self.cache
            return await super().get_user_profile(user_id)
    
        async def update_user(self, user_id: str, request: UserUpdateRequest) -> Optional[User]:
            """Update user and invalidate cache"""
            user = await super().update_user(user_id, request)
            if user:
                # Invalidate user cache
                await self.cache.clear_pattern(f"user:*{user_id}*")
            return user
    Python

    Chapter 13: Advanced Topics

    Database Transactions

    from motor.motor_asyncio import AsyncIOMotorClientSession
    from contextlib import asynccontextmanager
    
    class TransactionManager:
        """Manager for database transactions"""
    
        def __init__(self, client: AsyncIOMotorClient):
            self.client = client
    
        @asynccontextmanager
        async def transaction(self):
            """Context manager for transactions"""
            async with await self.client.start_session() as session:
                async with session.start_transaction():
                    try:
                        yield session
                    except Exception:
                        await session.abort_transaction()
                        raise
                    else:
                        await session.commit_transaction()
    
    class TransactionalOrderService:
        """Order service with transaction support"""
    
        def __init__(self, transaction_manager: TransactionManager):
            self.transaction_manager = transaction_manager
    
        async def process_order_with_inventory(self, user_id: str, items: List[dict]) -> Order:
            """Process order with inventory update in transaction"""
            async with self.transaction_manager.transaction() as session:
                # Create order
                order = Order(
                    user_id=ObjectId(user_id),
                    items=items,
                    total=sum(item['quantity'] * item['price'] for item in items),
                    status="processing"
                )
                await order.insert(session=session)
    
                # Update inventory for each product
                for item in items:
                    product = await Product.get(
                        ObjectId(item['product_id']), 
                        session=session
                    )
    
                    if product.inventory < item['quantity']:
                        raise ValueError(f"Insufficient inventory for product {product.name}")
    
                    product.inventory -= item['quantity']
                    await product.save(session=session)
    
                # Update order status
                order.status = "confirmed"
                await order.save(session=session)
    
                return order
    Python

    Event-Driven Architecture

    from typing import Dict, List, Callable
    from datetime import datetime
    from abc import ABC, abstractmethod
    import asyncio
    
    class DomainEvent(ABC):
        """Base class for domain events"""
    
        def __init__(self):
            self.occurred_at = datetime.utcnow()
            self.event_id = str(uuid.uuid4())
    
    class UserCreatedEvent(DomainEvent):
        def __init__(self, user: User):
            super().__init__()
            self.user = user
            self.user_id = str(user.id)
    
    class OrderPlacedEvent(DomainEvent):
        def __init__(self, order: Order):
            super().__init__()
            self.order = order
            self.order_id = str(order.id)
            self.user_id = str(order.user.id)
    
    class EventBus:
        """Simple in-memory event bus"""
    
        def __init__(self):
            self._handlers: Dict[type, List[Callable]] = {}
    
        def subscribe(self, event_type: type, handler: Callable):
            """Subscribe to an event type"""
            if event_type not in self._handlers:
                self._handlers[event_type] = []
            self._handlers[event_type].append(handler)
    
        async def publish(self, event: DomainEvent):
            """Publish an event to all subscribers"""
            event_type = type(event)
            if event_type in self._handlers:
                tasks = []
                for handler in self._handlers[event_type]:
                    tasks.append(asyncio.create_task(handler(event)))
    
                if tasks:
                    await asyncio.gather(*tasks, return_exceptions=True)
    
    class EventDrivenUserService(UserService):
        """User service with event publishing"""
    
        def __init__(self, user_repo: UserRepository, auth_service: AuthService, event_bus: EventBus):
            super().__init__(user_repo, auth_service)
            self.event_bus = event_bus
    
        async def create_user(self, request: UserCreateRequest) -> User:
            """Create user and publish event"""
            user = await super().create_user(request)
    
            # Publish event
            event = UserCreatedEvent(user)
            await self.event_bus.publish(event)
    
            return user
    
    # Event handlers
    class EmailNotificationHandler:
        """Handle email notifications"""
    
        def __init__(self, email_service: 'EmailService'):
            self.email_service = email_service
    
        async def handle_user_created(self, event: UserCreatedEvent):
            """Send welcome email when user is created"""
            await self.email_service.send_welcome_email(event.user.email, event.user.name)
    
        async def handle_order_placed(self, event: OrderPlacedEvent):
            """Send order confirmation email"""
            user = await User.get(ObjectId(event.user_id))
            await self.email_service.send_order_confirmation(user.email, event.order_id)
    
    class AnalyticsHandler:
        """Handle analytics events"""
    
        def __init__(self, analytics_service: 'AnalyticsService'):
            self.analytics_service = analytics_service
    
        async def handle_user_created(self, event: UserCreatedEvent):
            """Track user registration"""
            await self.analytics_service.track_event("user_registered", {
                "user_id": event.user_id,
                "timestamp": event.occurred_at.isoformat()
            })
    
        async def handle_order_placed(self, event: OrderPlacedEvent):
            """Track order placement"""
            await self.analytics_service.track_event("order_placed", {
                "order_id": event.order_id,
                "user_id": event.user_id,
                "order_total": event.order.total,
                "timestamp": event.occurred_at.isoformat()
            })
    
    # Setup event bus
    def setup_event_bus(email_service, analytics_service) -> EventBus:
        event_bus = EventBus()
    
        email_handler = EmailNotificationHandler(email_service)
        analytics_handler = AnalyticsHandler(analytics_service)
    
        # Subscribe handlers
        event_bus.subscribe(UserCreatedEvent, email_handler.handle_user_created)
        event_bus.subscribe(UserCreatedEvent, analytics_handler.handle_user_created)
        event_bus.subscribe(OrderPlacedEvent, email_handler.handle_order_placed)
        event_bus.subscribe(OrderPlacedEvent, analytics_handler.handle_order_placed)
    
        return event_bus
    Python

    Advanced Query Patterns

    from typing import TypeVar, Generic, List, Optional, Union
    from beanie.operators import In, Regex, GTE, LTE
    
    T = TypeVar('T', bound=Document)
    
    class QueryBuilder(Generic[T]):
        """Fluent query builder for complex queries"""
    
        def __init__(self, model: type[T]):
            self.model = model
            self._filters = []
            self._sort_criteria = []
            self._skip_count = 0
            self._limit_count = None
            self._populate_fields = []
    
        def where(self, condition) -> 'QueryBuilder[T]':
            """Add a filter condition"""
            self._filters.append(condition)
            return self
    
        def where_in(self, field, values: List) -> 'QueryBuilder[T]':
            """Add IN condition"""
            self._filters.append(In(field, values))
            return self
    
        def where_regex(self, field, pattern: str, options: str = "i") -> 'QueryBuilder[T]':
            """Add regex condition"""
            self._filters.append(Regex(field, pattern, options))
            return self
    
        def where_range(self, field, min_value, max_value) -> 'QueryBuilder[T]':
            """Add range condition"""
            if min_value is not None:
                self._filters.append(GTE(field, min_value))
            if max_value is not None:
                self._filters.append(LTE(field, max_value))
            return self
    
        def order_by(self, field, descending: bool = False) -> 'QueryBuilder[T]':
            """Add sort criteria"""
            direction = -1 if descending else 1
            self._sort_criteria.append((field, direction))
            return self
    
        def skip(self, count: int) -> 'QueryBuilder[T]':
            """Set skip count"""
            self._skip_count = count
            return self
    
        def limit(self, count: int) -> 'QueryBuilder[T]':
            """Set limit count"""
            self._limit_count = count
            return self
    
        def populate(self, *fields) -> 'QueryBuilder[T]':
            """Set fields to populate"""
            self._populate_fields.extend(fields)
            return self
    
        async def to_list(self) -> List[T]:
            """Execute query and return list"""
            query = self.model.find(*self._filters)
    
            if self._sort_criteria:
                for field, direction in self._sort_criteria:
                    query = query.sort((field, direction))
    
            if self._skip_count:
                query = query.skip(self._skip_count)
    
            if self._limit_count:
                query = query.limit(self._limit_count)
    
            if self._populate_fields:
                query = query.populate(self._populate_fields)
    
            return await query.to_list()
    
        async def first(self) -> Optional[T]:
            """Get first result"""
            results = await self.limit(1).to_list()
            return results[0] if results else None
    
        async def count(self) -> int:
            """Count matching documents"""
            return await self.model.find(*self._filters).count()
    
    # Usage examples
    async def advanced_user_search(
        name_pattern: str = None,
        age_range: tuple = None,
        roles: List[str] = None,
        is_active: bool = None,
        page: int = 1,
        page_size: int = 20
    ) -> List[User]:
        """Advanced user search with multiple filters"""
    
        query = QueryBuilder(User)
    
        if name_pattern:
            query = query.where_regex(User.name, name_pattern)
    
        if age_range:
            min_age, max_age = age_range
            query = query.where_range(User.age, min_age, max_age)
    
        if roles:
            query = query.where_in(User.roles, roles)
    
        if is_active is not None:
            query = query.where(User.is_active == is_active)
    
        skip = (page - 1) * page_size
    
        return await query.order_by(User.created_at, descending=True)\
                         .skip(skip)\
                         .limit(page_size)\
                         .to_list()
    
    async def get_recent_orders_with_users(days: int = 30) -> List[Order]:
        """Get recent orders with user information"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)
    
        return await QueryBuilder(Order)\
            .where(Order.created_at >= cutoff_date)\
            .where(Order.status == "completed")\
            .order_by(Order.created_at, descending=True)\
            .populate(Order.user)\
            .to_list()
    Python

    Chapter 14: Production Deployment

    Configuration Management

    graph TD
        A[Configuration] --> B[Environment Variables]
        A --> C[Config Files]
        A --> D[Secret Management]
    
        B --> B1[Development]
        B --> B2[Staging]
        B --> B3[Production]
    
        C --> C1[YAML/JSON]
        C --> C2[TOML]
    
        D --> D1[HashiCorp Vault]
        D --> D2[AWS Secrets Manager]
        D --> D3[Azure Key Vault]
    from pydantic import BaseSettings, Field
    from typing import List, Optional
    import os
    
    class DatabaseSettings(BaseSettings):
        """Database configuration"""
        mongodb_url: str = Field(..., env="MONGODB_URL")
        database_name: str = Field("myapp", env="DATABASE_NAME")
        max_connections: int = Field(100, env="MONGODB_MAX_CONNECTIONS")
        min_connections: int = Field(10, env="MONGODB_MIN_CONNECTIONS")
    
    class RedisSettings(BaseSettings):
        """Redis configuration"""
        redis_url: str = Field("redis://localhost:6379", env="REDIS_URL")
        redis_db: int = Field(0, env="REDIS_DB")
    
    class SecuritySettings(BaseSettings):
        """Security configuration"""
        secret_key: str = Field(..., env="SECRET_KEY")
        jwt_algorithm: str = Field("HS256", env="JWT_ALGORITHM")
        access_token_expire_minutes: int = Field(30, env="ACCESS_TOKEN_EXPIRE_MINUTES")
        refresh_token_expire_days: int = Field(30, env="REFRESH_TOKEN_EXPIRE_DAYS")
        allowed_hosts: List[str] = Field(["*"], env="ALLOWED_HOSTS")
    
    class LoggingSettings(BaseSettings):
        """Logging configuration"""
        log_level: str = Field("INFO", env="LOG_LEVEL")
        log_format: str = Field("json", env="LOG_FORMAT")  # json or text
        log_file: Optional[str] = Field(None, env="LOG_FILE")
    
    class AppSettings(BaseSettings):
        """Main application settings"""
        app_name: str = Field("FastAPI Beanie App", env="APP_NAME")
        debug: bool = Field(False, env="DEBUG")
        environment: str = Field("production", env="ENVIRONMENT")
    
        # Sub-configurations
        database: DatabaseSettings = DatabaseSettings()
        redis: RedisSettings = RedisSettings()
        security: SecuritySettings = SecuritySettings()
        logging: LoggingSettings = LoggingSettings()
    
        class Config:
            case_sensitive = False
            env_file = ".env"
            env_file_encoding = "utf-8"
    
    # Global settings instance
    settings = AppSettings()
    
    # Environment-specific configuration loading
    def load_settings() -> AppSettings:
        """Load settings based on environment"""
        env = os.getenv("ENVIRONMENT", "production")
    
        if env == "development":
            return AppSettings(_env_file=".env.dev")
        elif env == "testing":
            return AppSettings(_env_file=".env.test")
        elif env == "staging":
            return AppSettings(_env_file=".env.staging")
        else:
            return AppSettings()  # Production uses environment variables
    Python

    Logging Configuration

    import logging
    import logging.config
    import sys
    from datetime import datetime
    from typing import Dict, Any
    import json
    
    class JSONFormatter(logging.Formatter):
        """Custom JSON formatter for structured logging"""
    
        def format(self, record: logging.LogRecord) -> str:
            log_data = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "logger": record.name,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
                "line": record.lineno
            }
    
            # Add exception info if present
            if record.exc_info:
                log_data["exception"] = self.formatException(record.exc_info)
    
            # Add extra fields
            for key, value in record.__dict__.items():
                if key not in log_data and not key.startswith('_'):
                    log_data[key] = value
    
            return json.dumps(log_data)
    
    def setup_logging(settings: LoggingSettings):
        """Setup application logging"""
    
        # Determine formatter
        if settings.log_format == "json":
            formatter = JSONFormatter()
        else:
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
    
        # Setup handlers
        handlers = []
    
        # Console handler
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        handlers.append(console_handler)
    
        # File handler if specified
        if settings.log_file:
            file_handler = logging.FileHandler(settings.log_file)
            file_handler.setFormatter(formatter)
            handlers.append(file_handler)
    
        # Configure root logger
        logging.basicConfig(
            level=getattr(logging, settings.log_level.upper()),
            handlers=handlers,
            force=True
        )
    
        # Set specific loggers
        logging.getLogger("beanie").setLevel(logging.INFO)
        logging.getLogger("motor").setLevel(logging.WARNING)
        logging.getLogger("pymongo").setLevel(logging.WARNING)
    
    # Application logger
    logger = logging.getLogger("myapp")
    
    # Logging decorators
    def log_async_function(func):
        """Decorator to log async function calls"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
    
            logger.info(
                f"Starting {func.__name__}",
                extra={"function": func.__name__, "args_count": len(args)}
            )
    
            try:
                result = await func(*args, **kwargs)
                execution_time = time.time() - start_time
    
                logger.info(
                    f"Completed {func.__name__}",
                    extra={
                        "function": func.__name__,
                        "execution_time": execution_time,
                        "success": True
                    }
                )
    
                return result
    
            except Exception as e:
                execution_time = time.time() - start_time
    
                logger.error(
                    f"Failed {func.__name__}: {str(e)}",
                    extra={
                        "function": func.__name__,
                        "execution_time": execution_time,
                        "success": False,
                        "error": str(e)
                    },
                    exc_info=True
                )
                raise
    
        return wrapper
    Python

    Health Checks and Monitoring

    from fastapi import FastAPI, status
    from fastapi.responses import JSONResponse
    from datetime import datetime, timedelta
    from typing import Dict, Any
    import psutil
    import asyncio
    
    class HealthChecker:
        """Application health checker"""
    
        def __init__(self, db_client: AsyncIOMotorClient, redis_client: redis.Redis):
            self.db_client = db_client
            self.redis_client = redis_client
    
        async def check_database(self) -> Dict[str, Any]:
            """Check database connectivity"""
            try:
                start_time = time.time()
                # Simple ping to database
                await self.db_client.admin.command('ping')
                response_time = time.time() - start_time
    
                return {
                    "status": "healthy",
                    "response_time_ms": round(response_time * 1000, 2)
                }
            except Exception as e:
                return {
                    "status": "unhealthy",
                    "error": str(e)
                }
    
        async def check_redis(self) -> Dict[str, Any]:
            """Check Redis connectivity"""
            try:
                start_time = time.time()
                await self.redis_client.ping()
                response_time = time.time() - start_time
    
                return {
                    "status": "healthy",
                    "response_time_ms": round(response_time * 1000, 2)
                }
            except Exception as e:
                return {
                    "status": "unhealthy",
                    "error": str(e)
                }
    
        def check_system_resources(self) -> Dict[str, Any]:
            """Check system resource usage"""
            cpu_percent = psutil.cpu_percent(interval=1)
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage('/')
    
            return {
                "cpu_percent": cpu_percent,
                "memory_percent": memory.percent,
                "memory_available_gb": round(memory.available / (1024**3), 2),
                "disk_percent": disk.percent,
                "disk_free_gb": round(disk.free / (1024**3), 2)
            }
    
        async def check_all(self) -> Dict[str, Any]:
            """Perform all health checks"""
            checks = {
                "timestamp": datetime.utcnow().isoformat(),
                "status": "healthy",
                "checks": {}
            }
    
            # Run checks in parallel
            db_check, redis_check = await asyncio.gather(
                self.check_database(),
                self.check_redis(),
                return_exceptions=True
            )
    
            checks["checks"]["database"] = db_check if not isinstance(db_check, Exception) else {
                "status": "unhealthy",
                "error": str(db_check)
            }
    
            checks["checks"]["redis"] = redis_check if not isinstance(redis_check, Exception) else {
                "status": "unhealthy", 
                "error": str(redis_check)
            }
    
            checks["checks"]["system"] = self.check_system_resources()
    
            # Determine overall status
            if any(check.get("status") == "unhealthy" for check in checks["checks"].values()):
                checks["status"] = "unhealthy"
    
            return checks
    
    # Health check endpoints
    def add_health_endpoints(app: FastAPI, health_checker: HealthChecker):
        """Add health check endpoints to FastAPI app"""
    
        @app.get("/health", tags=["health"])
        async def health_check():
            """Basic health check"""
            return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
    
        @app.get("/health/detailed", tags=["health"])
        async def detailed_health_check():
            """Detailed health check with dependencies"""
            health_status = await health_checker.check_all()
    
            status_code = status.HTTP_200_OK if health_status["status"] == "healthy" else status.HTTP_503_SERVICE_UNAVAILABLE
    
            return JSONResponse(
                content=health_status,
                status_code=status_code
            )
    
        @app.get("/health/ready", tags=["health"])
        async def readiness_check():
            """Readiness check for Kubernetes"""
            try:
                # Check critical dependencies
                await health_checker.check_database()
                return {"status": "ready"}
            except Exception:
                return JSONResponse(
                    content={"status": "not ready"},
                    status_code=status.HTTP_503_SERVICE_UNAVAILABLE
                )
    
        @app.get("/health/live", tags=["health"])
        async def liveness_check():
            """Liveness check for Kubernetes"""
            return {"status": "alive"}
    Python

    Docker Configuration

    # Multi-stage Dockerfile for production deployment
    FROM python:3.11-slim as builder
    
    # Install system dependencies
    RUN apt-get update && apt-get install -y \
        gcc \
        g++ \
        && rm -rf /var/lib/apt/lists/*
    
    # Set working directory
    WORKDIR /app
    
    # Copy requirements and install Python dependencies
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    # Production stage
    FROM python:3.11-slim
    
    # Create non-root user
    RUN groupadd -r appuser && useradd -r -g appuser appuser
    
    # Install runtime dependencies
    RUN apt-get update && apt-get install -y \
        curl \
        && rm -rf /var/lib/apt/lists/*
    
    # Set working directory
    WORKDIR /app
    
    # Copy Python packages from builder stage
    COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
    COPY --from=builder /usr/local/bin /usr/local/bin
    
    # Copy application code
    COPY . .
    
    # Change ownership to appuser
    RUN chown -R appuser:appuser /app
    
    # Switch to non-root user
    USER appuser
    
    # Expose port
    EXPOSE 8000
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
        CMD curl -f http://localhost:8000/health || exit 1
    
    # Run application
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
    Dockerfile

    Kubernetes Deployment

    # kubernetes/deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: beanie-app
      labels:
        app: beanie-app
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: beanie-app
      template:
        metadata:
          labels:
            app: beanie-app
        spec:
          containers:
          - name: beanie-app
            image: myregistry/beanie-app:latest
            ports:
            - containerPort: 8000
            env:
            - name: MONGODB_URL
              valueFrom:
                secretKeyRef:
                  name: app-secrets
                  key: mongodb-url
            - name: SECRET_KEY
              valueFrom:
                secretKeyRef:
                  name: app-secrets
                  key: secret-key
            - name: ENVIRONMENT
              value: "production"
            resources:
              requests:
                memory: "256Mi"
                cpu: "250m"
              limits:
                memory: "512Mi"
                cpu: "500m"
            livenessProbe:
              httpGet:
                path: /health/live
                port: 8000
              initialDelaySeconds: 30
              periodSeconds: 10
            readinessProbe:
              httpGet:
                path: /health/ready
                port: 8000
              initialDelaySeconds: 5
              periodSeconds: 5
            lifecycle:
              preStop:
                exec:
                  command: ["/bin/sh", "-c", "sleep 15"]
    
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: beanie-app-service
    spec:
      selector:
        app: beanie-app
      ports:
        - protocol: TCP
          port: 80
          targetPort: 8000
      type: ClusterIP
    
    ---
    apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: beanie-app-ingress
      annotations:
        kubernetes.io/ingress.class: nginx
        cert-manager.io/cluster-issuer: letsencrypt-prod
    spec:
      tls:
      - hosts:
        - api.myapp.com
        secretName: beanie-app-tls
      rules:
      - host: api.myapp.com
        http:
          paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: beanie-app-service
                port:
                  number: 80
    YAML

    Performance Optimization

    from fastapi import FastAPI
    from fastapi.middleware.gzip import GZipMiddleware
    from fastapi.middleware.cors import CORSMiddleware
    import uvicorn
    
    def create_optimized_app() -> FastAPI:
        """Create FastAPI app with performance optimizations"""
    
        app = FastAPI(
            title="Beanie FastAPI App",
            description="Production-ready FastAPI app with Beanie ODM",
            version="1.0.0",
            docs_url="/docs" if settings.debug else None,  # Disable docs in production
            redoc_url="/redoc" if settings.debug else None
        )
    
        # Add middleware
        app.add_middleware(GZipMiddleware, minimum_size=1000)
        app.add_middleware(
            CORSMiddleware,
            allow_origins=settings.security.allowed_hosts,
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
    
        return app
    
    # Uvicorn configuration for production
    def run_production_server():
        """Run server with production configuration"""
        uvicorn.run(
            "main:app",
            host="0.0.0.0",
            port=8000,
            workers=4,  # Number of worker processes
            loop="uvloop",  # Use uvloop for better performance
            http="httptools",  # Use httptools for better HTTP parsing
            access_log=False,  # Disable access logs for performance
            log_config=None,  # Use custom logging configuration
        )
    Python

    Conclusion

    This comprehensive guide covered Python Beanie from basic concepts to advanced production deployment strategies. Key takeaways include:

    Best Practices Summary:

    1. Model Design: Use proper inheritance, validation, and indexes
    2. Query Optimization: Leverage indexes, aggregation, and efficient pagination
    3. Architecture: Implement repository pattern, service layer, and event-driven design
    4. Testing: Comprehensive test coverage with proper fixtures and performance tests
    5. Production: Proper configuration management, logging, monitoring, and deployment

    Performance Tips:

    • Use indexes strategically
    • Implement proper caching strategies
    • Optimize aggregation pipelines
    • Use cursor-based pagination for large datasets
    • Monitor query performance regularly

    Security Considerations:

    • Implement proper authentication and authorization
    • Use environment variables for sensitive configuration
    • Validate all input data
    • Implement rate limiting and request validation
    • Regular security audits

    This guide provides a solid foundation for building production-ready applications with Python Beanie and MongoDB.


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *