The Complete Guide to Python Beanie: From Beginner to Expert
Table of Contents
- Introduction to Beanie
- Setting Up Your Environment
- Basic Concepts and Models
- CRUD Operations
- Advanced Queries
- Relationships and References
- Aggregation Pipeline
- Validation and Serialization
- Indexes and Performance
- Authentication and Authorization
- Testing Strategies
- Best Practices and Patterns
- Advanced Topics
- Production Deployment
Chapter 1: Introduction to Beanie
What is Beanie?
Beanie is an asynchronous Python Object Document Mapper (ODM) for MongoDB, built on top of Pydantic. It provides a pythonic way to work with MongoDB documents while leveraging the power of type hints and async/await syntax.
graph TB
A[Python Application] --> B[Beanie ODM]
B --> C[Motor Driver]
C --> D[MongoDB]
B --> E[Pydantic Models]
E --> F[Type Validation]
E --> G[Serialization]
style B fill:#e1f5fe
style E fill:#f3e5f5Key Features
- Async/Await Support: Built for modern async Python applications
- Type Safety: Leverages Pydantic for robust type checking
- MongoDB Native: Direct integration with MongoDB features
- FastAPI Integration: Seamless integration with FastAPI framework
- Aggregation Support: Full MongoDB aggregation pipeline support
Why Choose Beanie?
mindmap
root((Beanie Benefits))
Performance
Async Operations
Connection Pooling
Lazy Loading
Developer Experience
Type Hints
Auto-completion
Runtime Validation
MongoDB Features
Aggregation Pipeline
Indexes
Transactions
Framework Integration
FastAPI
Starlette
Custom AppsChapter 2: Setting Up Your Environment
Installation
pip install beanieBashFor development with additional tools:
pip install beanie[dev]BashProject Structure
graph TD
A[Project Root] --> B[app/]
B --> C[models/]
B --> D[routers/]
B --> E[services/]
B --> F[config/]
C --> G[user.py]
C --> H[product.py]
C --> I[order.py]
D --> J[users.py]
D --> K[products.py]
E --> L[auth.py]
E --> M[database.py]
F --> N[settings.py]
F --> O[database.py]Basic Setup
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from models import User, Product, Order
async def init_db():
# Create Motor client
client = AsyncIOMotorClient("mongodb://localhost:27017")
# Initialize beanie with the database and models
await init_beanie(
database=client.ecommerce_db,
document_models=[User, Product, Order]
)PythonChapter 3: Basic Concepts and Models
Document Models
from beanie import Document
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from bson import ObjectId
class User(Document):
name: str
email: str = Field(..., unique=True)
age: Optional[int] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = True
class Settings:
name = "users" # Collection name
indexes = [
"email", # Single field index
[("name", 1), ("age", -1)] # Compound index
]PythonModel Inheritance
classDiagram
Document <|-- BaseUser
BaseUser <|-- User
BaseUser <|-- AdminUser
class Document {
+ObjectId id
+save()
+delete()
+find()
}
class BaseUser {
+str name
+str email
+datetime created_at
}
class User {
+List~str~ interests
+Optional~str~ bio
}
class AdminUser {
+List~str~ permissions
+int access_level
}class BaseUser(Document):
name: str
email: str
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
is_root = True # This is an abstract base class
class User(BaseUser):
interests: List[str] = []
bio: Optional[str] = None
class Settings:
name = "users"
class AdminUser(BaseUser):
permissions: List[str] = []
access_level: int = 1
class Settings:
name = "admin_users"PythonEmbedded Documents
class Address(BaseModel):
street: str
city: str
country: str
postal_code: str
class User(Document):
name: str
email: str
address: Optional[Address] = None
shipping_addresses: List[Address] = []
class Settings:
name = "users"PythonChapter 4: CRUD Operations
Create Operations
sequenceDiagram
participant App as Application
participant Beanie as Beanie ODM
participant Mongo as MongoDB
App->>Beanie: user = User(name="John", email="john@example.com")
App->>Beanie: await user.insert()
Beanie->>Mongo: insertOne(document)
Mongo-->>Beanie: {acknowledged: true, insertedId: ObjectId}
Beanie-->>App: User instance with ID# Single document creation
user = User(name="John Doe", email="john@example.com", age=30)
await user.insert()
# Bulk creation
users = [
User(name="Alice", email="alice@example.com"),
User(name="Bob", email="bob@example.com")
]
await User.insert_many(users)
# Insert with validation
try:
user = User(name="", email="invalid-email") # Invalid data
await user.insert()
except ValidationError as e:
print(f"Validation error: {e}")PythonRead Operations
# Find by ID
user = await User.get(user_id)
# Find one document
user = await User.find_one(User.email == "john@example.com")
# Find multiple documents
users = await User.find(User.age >= 18).to_list()
# Find all
all_users = await User.find_all().to_list()
# Find with limit and skip
page_users = await User.find().skip(10).limit(5).to_list()
# Find with sorting
sorted_users = await User.find().sort(User.created_at).to_list()PythonUpdate Operations
flowchart TD
A[Update Request] --> B{Update Type}
B -->|Single Document| C[find_one + update fields + save]
B -->|Multiple Documents| D[Update Many]
B -->|Upsert| E[Update with upsert=True]
C --> F["Document.save()"]
D --> G["Document.update_many()"]
E --> H["Document.update() with upsert"]
F --> I[Updated Document]
G --> J[Update Result]
H --> K["Updated/Inserted Document"]# Update single document - Method 1
user = await User.find_one(User.email == "john@example.com")
user.age = 31
await user.save()
# Update single document - Method 2
await User.find_one(User.email == "john@example.com").update(
{"$set": {"age": 31}}
)
# Update multiple documents
await User.find(User.age < 18).update_many(
{"$set": {"is_minor": True}}
)
# Atomic updates
await User.find_one(User.email == "john@example.com").update(
{"$inc": {"login_count": 1}}
)
# Upsert operation
await User.find_one(User.email == "new@example.com").upsert(
{"$set": {"name": "New User", "age": 25}},
on_insert=User(name="New User", email="new@example.com")
)PythonDelete Operations
# Delete single document
user = await User.find_one(User.email == "john@example.com")
await user.delete()
# Delete by query
await User.find_one(User.email == "john@example.com").delete()
# Delete multiple documents
result = await User.find(User.is_active == False).delete_many()
print(f"Deleted {result.deleted_count} users")
# Delete all documents (be careful!)
await User.delete_all()PythonChapter 5: Advanced Queries
Query Operators
graph LR
A[Query Operators] --> B[Comparison]
A --> C[Logical]
A --> D[Element]
A --> E[Array]
B --> B1[$eq, $ne, $gt, $gte, $lt, $lte]
C --> C1[$and, $or, $not, $nor]
D --> D1[$exists, $type]
E --> E1[$in, $nin, $all, $size]# Comparison operators
users = await User.find(User.age > 18).to_list()
users = await User.find(User.age >= 18, User.age <= 65).to_list()
users = await User.find(User.name != "admin").to_list()
# Logical operators
from beanie.operators import Or, And, Not
users = await User.find(
Or(User.age < 18, User.age > 65)
).to_list()
users = await User.find(
And(User.is_active == True, User.age >= 18)
).to_list()
# Element operators
from beanie.operators import Exists
users = await User.find(Exists(User.bio, True)).to_list()
# Array operators
from beanie.operators import In, All
users = await User.find(
In(User.interests, ["python", "mongodb"])
).to_list()
# Text search
users = await User.find(
{"$text": {"$search": "python developer"}}
).to_list()PythonAggregation Queries
# Basic aggregation
pipeline = [
{"$match": {"age": {"$gte": 18}}},
{"$group": {
"_id": "$country",
"count": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}},
{"$sort": {"count": -1}}
]
results = await User.aggregate(pipeline).to_list()
# Using Beanie aggregation methods
results = await User.find(User.age >= 18).aggregate([
{"$group": {
"_id": "$country",
"count": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}}
]).to_list()PythonComplex Queries with Joins
class Order(Document):
user_id: ObjectId
product_ids: List[ObjectId]
total: float
status: str
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "orders"
# Aggregation with lookup (join)
pipeline = [
{"$lookup": {
"from": "users",
"localField": "user_id",
"foreignField": "_id",
"as": "user"
}},
{"$unwind": "$user"},
{"$match": {"user.age": {"$gte": 18}}},
{"$project": {
"total": 1,
"user_name": "$user.name",
"user_email": "$user.email"
}}
]
orders_with_users = await Order.aggregate(pipeline).to_list()PythonChapter 6: Relationships and References
Document References
erDiagram
User ||--o{ Order : places
Order ||--o{ OrderItem : contains
Product ||--o{ OrderItem : referenced_in
Category ||--o{ Product : contains
User {
ObjectId id
string name
string email
}
Order {
ObjectId id
ObjectId user_id
datetime created_at
float total
}
Product {
ObjectId id
string name
float price
ObjectId category_id
}from beanie import Link
from typing import Optional
class Category(Document):
name: str
description: Optional[str] = None
class Settings:
name = "categories"
class Product(Document):
name: str
price: float
category: Link[Category] # Reference to Category
description: Optional[str] = None
class Settings:
name = "products"
class OrderItem(BaseModel):
product: Link[Product]
quantity: int
price: float
class Order(Document):
user: Link[User]
items: List[OrderItem]
total: float
status: str = "pending"
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "orders"PythonWorking with References
# Creating documents with references
category = Category(name="Electronics")
await category.insert()
product = Product(
name="Laptop",
price=999.99,
category=category # Direct reference
)
await product.insert()
# Fetching with references
product = await Product.find_one(Product.name == "Laptop").populate(Product.category)
print(product.category.name) # "Electronics"
# Creating orders with references
user = await User.find_one(User.email == "john@example.com")
product = await Product.find_one(Product.name == "Laptop")
order = Order(
user=user,
items=[
OrderItem(product=product, quantity=1, price=product.price)
],
total=product.price
)
await order.insert()PythonFetch Links and Lazy Loading
# Fetch specific links
order = await Order.find_one().populate(Order.user)
# Fetch nested links
order = await Order.find_one().populate([
Order.user,
{"items": {"product": Product.category}}
])
# Fetch all links
order = await Order.find_one().populate()
# Custom fetch strategies
from beanie import FetchLinks
# Eager loading
class Order(Document):
user: Link[User]
items: List[OrderItem]
class Settings:
name = "orders"
fetch_links = True # Always fetch links
# Selective fetching
orders = await Order.find().populate([Order.user]).to_list()PythonChapter 7: Aggregation Pipeline
Basic Aggregation Concepts
flowchart TD
A[Input Documents] --> B[$match]
B --> C[$group]
C --> D[$sort]
D --> E[$project]
E --> F[$limit]
F --> G[Output Documents]
B1[Filter Documents] --> B
C1[Group by Field] --> C
D1[Sort Results] --> D
E1[Select Fields] --> E
F1[Limit Results] --> FAggregation Stages
# Match stage - filtering
match_stage = {"$match": {"status": "completed", "total": {"$gte": 100}}}
# Group stage - aggregating data
group_stage = {
"$group": {
"_id": "$user_id",
"total_orders": {"$sum": 1},
"total_spent": {"$sum": "$total"},
"avg_order_value": {"$avg": "$total"},
"max_order": {"$max": "$total"},
"min_order": {"$min": "$total"}
}
}
# Sort stage
sort_stage = {"$sort": {"total_spent": -1}}
# Project stage - shaping output
project_stage = {
"$project": {
"user_id": "$_id",
"total_orders": 1,
"total_spent": 1,
"avg_order_value": {"$round": ["$avg_order_value", 2]},
"_id": 0
}
}
# Complete pipeline
pipeline = [match_stage, group_stage, sort_stage, project_stage]
results = await Order.aggregate(pipeline).to_list()PythonAdvanced Aggregation Patterns
# Lookup with pipeline
lookup_stage = {
"$lookup": {
"from": "users",
"let": {"user_id": "$user_id"},
"pipeline": [
{"$match": {"$expr": {"$eq": ["$_id", "$$user_id"]}}},
{"$project": {"name": 1, "email": 1, "created_at": 1}}
],
"as": "user_info"
}
}
# Unwind arrays
unwind_stage = {"$unwind": "$items"}
# Add computed fields
add_fields_stage = {
"$addFields": {
"item_total": {"$multiply": ["$items.quantity", "$items.price"]},
"order_year": {"$year": "$created_at"},
"order_month": {"$month": "$created_at"}
}
}
# Conditional operations
conditional_stage = {
"$addFields": {
"order_size": {
"$switch": {
"branches": [
{"case": {"$lt": ["$total", 50]}, "then": "small"},
{"case": {"$lt": ["$total", 200]}, "then": "medium"},
{"case": {"$lt": ["$total", 500]}, "then": "large"}
],
"default": "extra_large"
}
}
}
}PythonAggregation with Beanie Methods
class OrderAnalytics:
@staticmethod
async def get_user_spending_summary():
return await Order.aggregate([
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$user_id",
"total_spent": {"$sum": "$total"},
"order_count": {"$sum": 1}
}},
{"$lookup": {
"from": "users",
"localField": "_id",
"foreignField": "_id",
"as": "user"
}},
{"$unwind": "$user"},
{"$project": {
"user_name": "$user.name",
"user_email": "$user.email",
"total_spent": 1,
"order_count": 1,
"avg_order_value": {"$divide": ["$total_spent", "$order_count"]}
}},
{"$sort": {"total_spent": -1}}
]).to_list()
@staticmethod
async def get_monthly_sales():
return await Order.aggregate([
{"$match": {"status": "completed"}},
{"$group": {
"_id": {
"year": {"$year": "$created_at"},
"month": {"$month": "$created_at"}
},
"total_sales": {"$sum": "$total"},
"order_count": {"$sum": 1}
}},
{"$sort": {"_id.year": 1, "_id.month": 1}}
]).to_list()PythonChapter 8: Validation and Serialization
Pydantic Validation
graph TD
A[Input Data] --> B[Pydantic Validation]
B --> C{Valid?}
C -->|Yes| D[Create Model Instance]
C -->|No| E[Validation Error]
D --> F[Type Conversion]
F --> G[Custom Validators]
G --> H[Final Model]
style E fill:#ffebee
style H fill:#e8f5e8from pydantic import validator, Field, EmailStr
from typing import List, Optional
import re
class User(Document):
name: str = Field(..., min_length=2, max_length=50)
email: EmailStr
age: Optional[int] = Field(None, ge=0, le=150)
phone: Optional[str] = None
interests: List[str] = Field(default_factory=list, max_items=10)
@validator('name')
def validate_name(cls, v):
if not v.strip():
raise ValueError('Name cannot be empty or only whitespace')
if any(char.isdigit() for char in v):
raise ValueError('Name cannot contain numbers')
return v.strip().title()
@validator('phone')
def validate_phone(cls, v):
if v is None:
return v
# Simple phone validation
phone_pattern = r'^\+?[\d\s\-\(\)]+$'
if not re.match(phone_pattern, v):
raise ValueError('Invalid phone number format')
return v
@validator('interests')
def validate_interests(cls, v):
# Remove duplicates and empty strings
cleaned = list(set(interest.strip() for interest in v if interest.strip()))
return cleaned
class Settings:
name = "users"PythonCustom Field Types
from pydantic import BaseModel, validator
from datetime import datetime, date
from typing import Any
class DateRange(BaseModel):
start_date: date
end_date: date
@validator('end_date')
def end_after_start(cls, v, values):
if 'start_date' in values and v < values['start_date']:
raise ValueError('End date must be after start date')
return v
class Money(BaseModel):
amount: float = Field(..., ge=0)
currency: str = Field(..., regex=r'^[A-Z]{3}$')
def __str__(self):
return f"{self.amount} {self.currency}"
class Event(Document):
name: str
date_range: DateRange
ticket_price: Money
max_attendees: int = Field(..., gt=0)
@validator('ticket_price')
def validate_price(cls, v):
if v.amount > 10000:
raise ValueError('Ticket price cannot exceed 10000')
return v
class Settings:
name = "events"PythonSerialization and Deserialization
from pydantic import BaseModel
from typing import Dict, Any
class UserResponse(BaseModel):
id: str
name: str
email: str
age: Optional[int]
created_at: datetime
@classmethod
def from_document(cls, user: User):
return cls(
id=str(user.id),
name=user.name,
email=user.email,
age=user.age,
created_at=user.created_at
)
class UserCreate(BaseModel):
name: str
email: EmailStr
age: Optional[int] = None
interests: List[str] = []
def to_document(self) -> User:
return User(**self.dict())
# Usage in FastAPI
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.post("/users/", response_model=UserResponse)
async def create_user(user_data: UserCreate):
try:
user = user_data.to_document()
await user.insert()
return UserResponse.from_document(user)
except DuplicateKeyError:
raise HTTPException(status_code=400, detail="Email already exists")PythonAdvanced Validation Patterns
from pydantic import root_validator, validator
from typing import Union
class ProductVariant(BaseModel):
size: Optional[str] = None
color: Optional[str] = None
material: Optional[str] = None
class Product(Document):
name: str
base_price: float = Field(..., gt=0)
discount_percentage: Optional[float] = Field(None, ge=0, le=100)
final_price: Optional[float] = None
category: str
variants: List[ProductVariant] = []
tags: List[str] = []
@root_validator
def calculate_final_price(cls, values):
base_price = values.get('base_price', 0)
discount = values.get('discount_percentage', 0)
if discount > 0:
final_price = base_price * (1 - discount / 100)
else:
final_price = base_price
values['final_price'] = round(final_price, 2)
return values
@validator('variants')
def validate_variants(cls, v, values):
category = values.get('category', '').lower()
if category == 'clothing':
for variant in v:
if not variant.size:
raise ValueError('Clothing items must have size variants')
return v
@validator('tags', pre=True)
def normalize_tags(cls, v):
if isinstance(v, str):
return [tag.strip().lower() for tag in v.split(',')]
return [tag.strip().lower() for tag in v if tag.strip()]
class Settings:
name = "products"PythonChapter 9: Indexes and Performance
Index Types and Strategies
graph TB
A[Index Types] --> B[Single Field]
A --> C[Compound]
A --> D[Text]
A --> E[Geospatial]
A --> F[Partial]
A --> G[Sparse]
B --> B1[Basic Lookup]
C --> C1[Multi-field Queries]
D --> D1[Text Search]
E --> E1[Location Queries]
F --> F1[Conditional Indexing]
G --> G1[Null Value Handling]Creating Indexes
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
class User(Document):
name: str
email: str
age: Optional[int] = None
location: Optional[Dict] = None # GeoJSON
bio: Optional[str] = None
tags: List[str] = []
created_at: datetime = Field(default_factory=datetime.utcnow)
last_login: Optional[datetime] = None
class Settings:
name = "users"
indexes = [
# Single field indexes
"email", # Unique by default if specified in field
"created_at",
# Compound indexes
[("name", ASCENDING), ("age", DESCENDING)],
[("created_at", DESCENDING), ("last_login", DESCENDING)],
# Text index for search
[("name", TEXT), ("bio", TEXT)],
# Geospatial index
[("location", "2dsphere")],
# Partial index (only indexes documents where condition is true)
IndexModel(
[("last_login", DESCENDING)],
partialFilterExpression={"last_login": {"$exists": True}}
),
# Sparse index (doesn't index null values)
IndexModel([("bio", TEXT)], sparse=True),
# TTL index (documents expire after specified time)
IndexModel(
[("created_at", ASCENDING)],
expireAfterSeconds=2592000 # 30 days
)
]PythonPerformance Optimization
class QueryOptimizer:
@staticmethod
async def efficient_pagination(page: int = 1, limit: int = 20):
"""Efficient pagination using skip and limit"""
skip = (page - 1) * limit
# Use projection to limit returned fields
users = await User.find(
User.is_active == True
).project(
UserResponse
).skip(skip).limit(limit).to_list()
return users
@staticmethod
async def cursor_based_pagination(last_id: Optional[str] = None, limit: int = 20):
"""More efficient pagination for large datasets"""
query = User.is_active == True
if last_id:
query = And(query, User.id > ObjectId(last_id))
users = await User.find(query).sort(
User.id
).limit(limit).to_list()
return users
@staticmethod
async def aggregation_with_indexes():
"""Optimized aggregation using indexes"""
pipeline = [
# Use indexed fields in early stages
{"$match": {"created_at": {"$gte": datetime.utcnow() - timedelta(days=30)}}},
{"$match": {"is_active": True}},
# Group after filtering
{"$group": {
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
"user_count": {"$sum": 1}
}},
{"$sort": {"_id": 1}}
]
return await User.aggregate(pipeline).to_list()PythonIndex Analysis and Monitoring
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
class IndexAnalyzer:
def __init__(self, client: AsyncIOMotorClient):
self.client = client
self.db = client.get_database()
async def analyze_query_performance(self, collection_name: str, query: dict):
"""Analyze query execution plan"""
collection = self.db[collection_name]
# Get query explanation
explain_result = await collection.find(query).explain()
return {
"execution_time_ms": explain_result.get("executionTimeMillis"),
"total_docs_examined": explain_result.get("totalDocsExamined"),
"total_docs_returned": explain_result.get("totalDocsReturned"),
"index_used": explain_result.get("indexUsed"),
"winning_plan": explain_result.get("queryPlanner", {}).get("winningPlan")
}
async def get_index_usage_stats(self, collection_name: str):
"""Get index usage statistics"""
collection = self.db[collection_name]
# Get index stats
index_stats = []
async for stat in collection.aggregate([{"$indexStats": {}}]):
index_stats.append(stat)
return index_stats
async def suggest_indexes(self, collection_name: str, sample_queries: List[dict]):
"""Suggest indexes based on query patterns"""
suggestions = []
for query in sample_queries:
# Analyze query fields
query_fields = self._extract_query_fields(query)
performance = await self.analyze_query_performance(collection_name, query)
if performance["total_docs_examined"] > performance["total_docs_returned"] * 10:
suggestions.append({
"query": query,
"suggested_index": query_fields,
"reason": "High document examination ratio"
})
return suggestions
def _extract_query_fields(self, query: dict) -> List[tuple]:
"""Extract fields used in query for index suggestion"""
fields = []
def extract_fields(obj, prefix=""):
if isinstance(obj, dict):
for key, value in obj.items():
if key.startswith("$"):
continue
field_name = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
# Check for range queries
if any(op in value for op in ["$gt", "$gte", "$lt", "$lte"]):
fields.append((field_name, 1)) # Ascending for range
elif "$eq" in value or not any(k.startswith("$") for k in value.keys()):
fields.append((field_name, 1)) # Equality
else:
extract_fields(value, field_name)
else:
fields.append((field_name, 1)) # Simple equality
extract_fields(query)
return fields
# Usage
async def optimize_user_queries():
client = AsyncIOMotorClient("mongodb://localhost:27017")
analyzer = IndexAnalyzer(client)
# Sample queries to analyze
sample_queries = [
{"email": "john@example.com"},
{"age": {"$gte": 18}, "is_active": True},
{"created_at": {"$gte": datetime.utcnow() - timedelta(days=7)}},
{"name": {"$regex": "John", "$options": "i"}}
]
suggestions = await analyzer.suggest_indexes("users", sample_queries)
for suggestion in suggestions:
print(f"Query: {suggestion['query']}")
print(f"Suggested Index: {suggestion['suggested_index']}")
print(f"Reason: {suggestion['reason']}\n")PythonChapter 10: Authentication and Authorization
User Authentication Models
sequenceDiagram
participant Client
participant API
participant Auth
participant DB
Client->>API: POST /login {email, password}
API->>Auth: validate_credentials()
Auth->>DB: find_user(email)
DB-->>Auth: user_document
Auth->>Auth: verify_password()
Auth-->>API: access_token
API-->>Client: {token, user_info}
Client->>API: GET /protected (Authorization: Bearer token)
API->>Auth: verify_token()
Auth-->>API: user_data
API-->>Client: protected_datafrom passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(Document):
email: EmailStr = Field(..., unique=True)
username: str = Field(..., unique=True)
hashed_password: str
is_active: bool = True
is_verified: bool = False
roles: List[str] = Field(default_factory=lambda: ["user"])
created_at: datetime = Field(default_factory=datetime.utcnow)
last_login: Optional[datetime] = None
password_reset_token: Optional[str] = None
password_reset_expires: Optional[datetime] = None
@staticmethod
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(self, password: str) -> bool:
return pwd_context.verify(password, self.hashed_password)
def has_role(self, role: str) -> bool:
return role in self.roles
def add_role(self, role: str):
if role not in self.roles:
self.roles.append(role)
def remove_role(self, role: str):
if role in self.roles:
self.roles.remove(role)
class Settings:
name = "users"
indexes = [
"email",
"username",
"password_reset_token"
]
class RefreshToken(Document):
user_id: ObjectId
token: str = Field(..., unique=True)
expires_at: datetime
is_revoked: bool = False
created_at: datetime = Field(default_factory=datetime.utcnow)
@classmethod
async def create_for_user(cls, user: User, expires_in_days: int = 30):
token = secrets.token_urlsafe(32)
refresh_token = cls(
user_id=user.id,
token=token,
expires_at=datetime.utcnow() + timedelta(days=expires_in_days)
)
await refresh_token.insert()
return refresh_token
async def revoke(self):
self.is_revoked = True
await self.save()
class Settings:
name = "refresh_tokens"
indexes = [
"token",
"user_id",
"expires_at"
]PythonAuthentication Service
import secrets
from jose import JWTError, jwt
from datetime import datetime, timedelta
class AuthService:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expire_minutes = 30
self.refresh_token_expire_days = 30
async def register_user(self, email: str, username: str, password: str) -> User:
"""Register a new user"""
# Check if user exists
existing_user = await User.find_one(
Or(User.email == email, User.username == username)
)
if existing_user:
raise ValueError("User with this email or username already exists")
# Create new user
user = User(
email=email,
username=username,
hashed_password=User.hash_password(password)
)
await user.insert()
return user
async def authenticate_user(self, email: str, password: str) -> Optional[User]:
"""Authenticate user with email and password"""
user = await User.find_one(User.email == email)
if not user or not user.verify_password(password):
return None
if not user.is_active:
raise ValueError("User account is disabled")
# Update last login
user.last_login = datetime.utcnow()
await user.save()
return user
def create_access_token(self, user: User) -> str:
"""Create JWT access token"""
expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
to_encode = {
"sub": str(user.id),
"email": user.email,
"username": user.username,
"roles": user.roles,
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
}
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
async def create_refresh_token(self, user: User) -> str:
"""Create refresh token"""
refresh_token = await RefreshToken.create_for_user(user)
return refresh_token.token
async def verify_access_token(self, token: str) -> User:
"""Verify and decode access token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
user_id = payload.get("sub")
if user_id is None or payload.get("type") != "access":
raise JWTError("Invalid token")
user = await User.get(ObjectId(user_id))
if user is None or not user.is_active:
raise JWTError("User not found or inactive")
return user
except JWTError:
raise JWTError("Could not validate credentials")
async def refresh_access_token(self, refresh_token: str) -> tuple[str, str]:
"""Create new access token using refresh token"""
token_doc = await RefreshToken.find_one(
RefreshToken.token == refresh_token,
RefreshToken.is_revoked == False,
RefreshToken.expires_at > datetime.utcnow()
)
if not token_doc:
raise JWTError("Invalid or expired refresh token")
user = await User.get(token_doc.user_id)
if not user or not user.is_active:
raise JWTError("User not found or inactive")
# Create new tokens
new_access_token = self.create_access_token(user)
new_refresh_token = await self.create_refresh_token(user)
# Revoke old refresh token
await token_doc.revoke()
return new_access_token, new_refresh_token
async def logout_user(self, refresh_token: str):
"""Logout user by revoking refresh token"""
token_doc = await RefreshToken.find_one(RefreshToken.token == refresh_token)
if token_doc:
await token_doc.revoke()PythonRole-Based Access Control
from functools import wraps
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
class Permission:
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class Role:
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
ROLE_PERMISSIONS = {
Role.USER: [Permission.READ],
Role.MODERATOR: [Permission.READ, Permission.WRITE],
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.SUPER_ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN]
}
class AuthorizationService:
@staticmethod
def user_has_permission(user: User, permission: str) -> bool:
"""Check if user has specific permission"""
user_permissions = set()
for role in user.roles:
if role in ROLE_PERMISSIONS:
user_permissions.update(ROLE_PERMISSIONS[role])
return permission in user_permissions
@staticmethod
def require_permission(permission: str):
"""Decorator to require specific permission"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract user from kwargs (passed by get_current_user dependency)
user = kwargs.get('current_user')
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
if not AuthorizationService.user_has_permission(user, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return await func(*args, **kwargs)
return wrapper
return decorator
@staticmethod
def require_roles(*required_roles):
"""Decorator to require specific roles"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
user = kwargs.get('current_user')
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
if not any(role in user.roles for role in required_roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient role permissions"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# FastAPI dependencies
auth_service = AuthService("your-secret-key")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
"""FastAPI dependency to get current authenticated user"""
try:
user = await auth_service.verify_access_token(credentials.credentials)
return user
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""Get current active user"""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Usage in FastAPI routes
from fastapi import FastAPI, Depends
app = FastAPI()
@app.get("/users/me")
async def get_user_profile(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/admin/users")
@AuthorizationService.require_permission(Permission.ADMIN)
async def get_all_users(current_user: User = Depends(get_current_active_user)):
return await User.find_all().to_list()
@app.delete("/users/{user_id}")
@AuthorizationService.require_roles(Role.ADMIN, Role.SUPER_ADMIN)
async def delete_user(
user_id: str,
current_user: User = Depends(get_current_active_user)
):
user_to_delete = await User.get(ObjectId(user_id))
if user_to_delete:
await user_to_delete.delete()
return {"message": "User deleted successfully"}PythonChapter 11: Testing Strategies
Test Environment Setup
graph TD
A[Test Suite] --> B[Unit Tests]
A --> C[Integration Tests]
A --> D[End-to-End Tests]
B --> B1[Model Validation]
B --> B2[Service Logic]
B --> B3[Utility Functions]
C --> C1[Database Operations]
C --> C2[API Endpoints]
C --> C3[Authentication]
D --> D1[Complete User Flows]
D --> D2[Performance Tests]
D --> D3[Load Tests]import pytest
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
from beanie import init_beanie
from httpx import AsyncClient
from fastapi.testclient import TestClient
# Test configuration
TEST_DATABASE_URL = "mongodb://localhost:27017/test_db"
class TestConfig:
mongodb_url = TEST_DATABASE_URL
secret_key = "test-secret-key"
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def mongodb_client():
"""Create MongoDB client for testing"""
client = AsyncIOMotorClient(TEST_DATABASE_URL)
yield client
client.close()
@pytest.fixture(scope="function")
async def database(mongodb_client):
"""Create and clean database for each test"""
db = mongodb_client.test_db
# Initialize Beanie
await init_beanie(
database=db,
document_models=[User, Product, Order, RefreshToken]
)
yield db
# Clean up after test
await db.drop_collection("users")
await db.drop_collection("products")
await db.drop_collection("orders")
await db.drop_collection("refresh_tokens")
@pytest.fixture
async def sample_user(database):
"""Create a sample user for testing"""
user = User(
email="test@example.com",
username="testuser",
hashed_password=User.hash_password("testpassword")
)
await user.insert()
return user
@pytest.fixture
async def auth_service():
"""Create auth service for testing"""
return AuthService(TestConfig.secret_key)PythonUnit Tests
import pytest
from pydantic import ValidationError
from datetime import datetime, timedelta
class TestUserModel:
async def test_user_creation(self, database):
"""Test basic user creation"""
user = User(
email="newuser@example.com",
username="newuser",
hashed_password="hashedpassword"
)
await user.insert()
assert user.id is not None
assert user.email == "newuser@example.com"
assert user.is_active is True
assert user.created_at is not None
async def test_user_validation(self, database):
"""Test user validation"""
# Test invalid email
with pytest.raises(ValidationError):
User(
email="invalid-email",
username="testuser",
hashed_password="password"
)
# Test duplicate email
await User(
email="duplicate@example.com",
username="user1",
hashed_password="password"
).insert()
with pytest.raises(Exception): # DuplicateKeyError
await User(
email="duplicate@example.com",
username="user2",
hashed_password="password"
).insert()
async def test_password_hashing(self):
"""Test password hashing and verification"""
password = "mypassword123"
hashed = User.hash_password(password)
assert hashed != password
assert User.verify_password(password, hashed) is True
assert User.verify_password("wrongpassword", hashed) is False
async def test_user_roles(self, sample_user):
"""Test role management"""
assert sample_user.has_role("user") is True
assert sample_user.has_role("admin") is False
sample_user.add_role("moderator")
assert sample_user.has_role("moderator") is True
sample_user.remove_role("user")
assert sample_user.has_role("user") is False
class TestAuthService:
async def test_register_user(self, auth_service, database):
"""Test user registration"""
user = await auth_service.register_user(
email="register@example.com",
username="registeruser",
password="password123"
)
assert user.email == "register@example.com"
assert user.username == "registeruser"
assert user.verify_password("password123")
async def test_authenticate_user(self, auth_service, sample_user):
"""Test user authentication"""
# Test valid credentials
user = await auth_service.authenticate_user(
email="test@example.com",
password="testpassword"
)
assert user is not None
assert user.email == "test@example.com"
# Test invalid credentials
user = await auth_service.authenticate_user(
email="test@example.com",
password="wrongpassword"
)
assert user is None
async def test_token_creation_and_verification(self, auth_service, sample_user):
"""Test JWT token creation and verification"""
# Create access token
token = auth_service.create_access_token(sample_user)
assert isinstance(token, str)
assert len(token) > 0
# Verify token
user = await auth_service.verify_access_token(token)
assert user.id == sample_user.id
assert user.email == sample_user.email
async def test_refresh_token_flow(self, auth_service, sample_user, database):
"""Test refresh token creation and usage"""
# Create refresh token
refresh_token = await auth_service.create_refresh_token(sample_user)
assert isinstance(refresh_token, str)
# Use refresh token to get new access token
new_access_token, new_refresh_token = await auth_service.refresh_access_token(refresh_token)
assert isinstance(new_access_token, str)
assert isinstance(new_refresh_token, str)
assert new_refresh_token != refresh_token
# Verify old refresh token is revoked
with pytest.raises(Exception):
await auth_service.refresh_access_token(refresh_token)PythonIntegration Tests
from httpx import AsyncClient
from fastapi import FastAPI
@pytest.fixture
async def app():
"""Create FastAPI app for testing"""
from main import create_app # Your app factory
app = create_app(TestConfig)
return app
@pytest.fixture
async def client(app):
"""Create HTTP client for testing"""
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
class TestUserEndpoints:
async def test_register_user(self, client: AsyncClient, database):
"""Test user registration endpoint"""
response = await client.post("/auth/register", json={
"email": "newuser@example.com",
"username": "newuser",
"password": "password123"
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "newuser@example.com"
assert "access_token" in data
async def test_login_user(self, client: AsyncClient, sample_user):
"""Test user login endpoint"""
response = await client.post("/auth/login", json={
"email": "test@example.com",
"password": "testpassword"
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["user"]["email"] == "test@example.com"
async def test_protected_endpoint(self, client: AsyncClient, sample_user, auth_service):
"""Test accessing protected endpoint"""
# Get access token
token = auth_service.create_access_token(sample_user)
# Access protected endpoint
headers = {"Authorization": f"Bearer {token}"}
response = await client.get("/users/me", headers=headers)
assert response.status_code == 200
data = response.json()
assert data["email"] == "test@example.com"
async def test_unauthorized_access(self, client: AsyncClient):
"""Test unauthorized access to protected endpoint"""
response = await client.get("/users/me")
assert response.status_code == 401
class TestProductEndpoints:
async def test_create_product(self, client: AsyncClient, sample_user, auth_service):
"""Test product creation"""
token = auth_service.create_access_token(sample_user)
headers = {"Authorization": f"Bearer {token}"}
response = await client.post("/products/",
headers=headers,
json={
"name": "Test Product",
"price": 99.99,
"description": "A test product"
}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test Product"
assert data["price"] == 99.99
async def test_get_products(self, client: AsyncClient, database):
"""Test getting products list"""
# Create test products
products = [
Product(name=f"Product {i}", price=float(i * 10))
for i in range(1, 6)
]
await Product.insert_many(products)
response = await client.get("/products/")
assert response.status_code == 200
data = response.json()
assert len(data) == 5
assert data[0]["name"] == "Product 1"PythonPerformance Tests
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
class TestPerformance:
async def test_bulk_user_creation(self, database):
"""Test bulk user creation performance"""
start_time = time.time()
users = [
User(
email=f"user{i}@example.com",
username=f"user{i}",
hashed_password=User.hash_password("password")
)
for i in range(1000)
]
await User.insert_many(users)
end_time = time.time()
execution_time = end_time - start_time
assert execution_time < 5.0 # Should complete within 5 seconds
# Verify all users were created
user_count = await User.count()
assert user_count == 1000
async def test_concurrent_reads(self, database):
"""Test concurrent read operations"""
# Create test data
users = [
User(
email=f"concurrent{i}@example.com",
username=f"concurrent{i}",
hashed_password=User.hash_password("password")
)
for i in range(100)
]
await User.insert_many(users)
async def read_users():
return await User.find().limit(10).to_list()
start_time = time.time()
# Run 50 concurrent read operations
tasks = [read_users() for _ in range(50)]
results = await asyncio.gather(*tasks)
end_time = time.time()
execution_time = end_time - start_time
assert execution_time < 3.0 # Should complete within 3 seconds
assert len(results) == 50
assert all(len(result) == 10 for result in results)
async def test_aggregation_performance(self, database):
"""Test aggregation query performance"""
# Create test orders
users = [
User(email=f"agg{i}@example.com", username=f"agg{i}",
hashed_password=User.hash_password("password"))
for i in range(100)
]
await User.insert_many(users)
orders = []
for i, user in enumerate(users):
for j in range(10): # 10 orders per user
orders.append(Order(
user=user,
items=[],
total=float((i + 1) * (j + 1) * 10),
status="completed"
))
await Order.insert_many(orders)
start_time = time.time()
# Complex aggregation query
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$user_id",
"total_spent": {"$sum": "$total"},
"order_count": {"$sum": 1}
}},
{"$match": {"total_spent": {"$gte": 100}}},
{"$sort": {"total_spent": -1}},
{"$limit": 20}
]
results = await Order.aggregate(pipeline).to_list()
end_time = time.time()
execution_time = end_time - start_time
assert execution_time < 2.0 # Should complete within 2 seconds
assert len(results) <= 20
@pytest.mark.load
class TestLoadTests:
"""Load tests - run with pytest -m load"""
async def test_api_load(self, client: AsyncClient, sample_user, auth_service):
"""Test API under load"""
token = auth_service.create_access_token(sample_user)
headers = {"Authorization": f"Bearer {token}"}
async def make_request():
response = await client.get("/users/me", headers=headers)
return response.status_code
# Run 100 concurrent requests
tasks = [make_request() for _ in range(100)]
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
execution_time = end_time - start_time
# All requests should succeed
assert all(status == 200 for status in results)
# Should handle 100 requests reasonably fast
assert execution_time < 10.0
# Calculate requests per second
rps = len(results) / execution_time
print(f"Requests per second: {rps:.2f}")
# Should handle at least 10 RPS
assert rps >= 10PythonTest Utilities
class TestDataFactory:
"""Factory for creating test data"""
@staticmethod
async def create_user(
email: str = None,
username: str = None,
password: str = "testpassword",
roles: List[str] = None,
**kwargs
) -> User:
"""Create a test user"""
if email is None:
email = f"user{int(time.time())}@example.com"
if username is None:
username = f"user{int(time.time())}"
if roles is None:
roles = ["user"]
user = User(
email=email,
username=username,
hashed_password=User.hash_password(password),
roles=roles,
**kwargs
)
await user.insert()
return user
@staticmethod
async def create_product(
name: str = None,
price: float = 99.99,
category: str = "test",
**kwargs
) -> Product:
"""Create a test product"""
if name is None:
name = f"Product {int(time.time())}"
product = Product(
name=name,
price=price,
category=category,
**kwargs
)
await product.insert()
return product
@staticmethod
async def create_order(
user: User,
items: List[dict] = None,
status: str = "pending",
**kwargs
) -> Order:
"""Create a test order"""
if items is None:
product = await TestDataFactory.create_product()
items = [{"product": product, "quantity": 1, "price": product.price}]
total = sum(item["quantity"] * item["price"] for item in items)
order = Order(
user=user,
items=items,
total=total,
status=status,
**kwargs
)
await order.insert()
return order
class DatabaseTestUtils:
"""Utilities for database testing"""
@staticmethod
async def clear_collections(*models):
"""Clear specified collections"""
for model in models:
await model.delete_all()
@staticmethod
async def count_documents(model) -> int:
"""Count documents in collection"""
return await model.count()
@staticmethod
async def create_indexes(model):
"""Ensure indexes are created for model"""
if hasattr(model, 'Settings') and hasattr(model.Settings, 'indexes'):
# Indexes are automatically created by Beanie during init
passPythonChapter 12: Best Practices and Patterns
Project Organization
graph TD
A[Clean Architecture] --> B[Domain Layer]
A --> C[Application Layer]
A --> D[Infrastructure Layer]
A --> E[Presentation Layer]
B --> B1[Models/Entities]
B --> B2[Business Logic]
C --> C1[Services]
C --> C2[Use Cases]
D --> D1[Database]
D --> D2[External APIs]
E --> E1[FastAPI Routes]
E --> E2[Request/Response Models]Repository Pattern
from abc import ABC, abstractmethod
from typing import List, Optional, Generic, TypeVar
from beanie import Document
T = TypeVar('T', bound=Document)
class BaseRepository(ABC, Generic[T]):
"""Abstract base repository"""
def __init__(self, model: type[T]):
self.model = model
@abstractmethod
async def create(self, entity: T) -> T:
pass
@abstractmethod
async def get_by_id(self, id: str) -> Optional[T]:
pass
@abstractmethod
async def update(self, entity: T) -> T:
pass
@abstractmethod
async def delete(self, id: str) -> bool:
pass
@abstractmethod
async def find_all(self, skip: int = 0, limit: int = 100) -> List[T]:
pass
class BeanieRepository(BaseRepository[T]):
"""Beanie implementation of repository pattern"""
async def create(self, entity: T) -> T:
await entity.insert()
return entity
async def get_by_id(self, id: str) -> Optional[T]:
try:
return await self.model.get(ObjectId(id))
except:
return None
async def update(self, entity: T) -> T:
await entity.save()
return entity
async def delete(self, id: str) -> bool:
entity = await self.get_by_id(id)
if entity:
await entity.delete()
return True
return False
async def find_all(self, skip: int = 0, limit: int = 100) -> List[T]:
return await self.model.find().skip(skip).limit(limit).to_list()
class UserRepository(BeanieRepository[User]):
"""User-specific repository methods"""
async def find_by_email(self, email: str) -> Optional[User]:
return await self.model.find_one(self.model.email == email)
async def find_active_users(self, skip: int = 0, limit: int = 100) -> List[User]:
return await self.model.find(
self.model.is_active == True
).skip(skip).limit(limit).to_list()
async def search_by_name(self, name: str) -> List[User]:
return await self.model.find(
{"$text": {"$search": name}}
).to_list()
class ProductRepository(BeanieRepository[Product]):
"""Product-specific repository methods"""
async def find_by_category(self, category: str) -> List[Product]:
return await self.model.find(
self.model.category == category
).to_list()
async def find_in_price_range(self, min_price: float, max_price: float) -> List[Product]:
return await self.model.find(
self.model.price >= min_price,
self.model.price <= max_price
).to_list()
async def get_top_products(self, limit: int = 10) -> List[Product]:
# Assuming we have a rating field
return await self.model.find().sort(-self.model.rating).limit(limit).to_list()PythonService Layer Pattern
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class UserCreateRequest:
email: str
username: str
password: str
name: Optional[str] = None
@dataclass
class UserUpdateRequest:
name: Optional[str] = None
email: Optional[str] = None
is_active: Optional[bool] = None
class UserService:
"""Business logic for user operations"""
def __init__(self, user_repo: UserRepository, auth_service: AuthService):
self.user_repo = user_repo
self.auth_service = auth_service
async def create_user(self, request: UserCreateRequest) -> User:
"""Create a new user with business validation"""
# Check if user already exists
existing_user = await self.user_repo.find_by_email(request.email)
if existing_user:
raise ValueError("User with this email already exists")
# Create user
user = User(
email=request.email,
username=request.username,
hashed_password=User.hash_password(request.password),
name=request.name or request.username
)
return await self.user_repo.create(user)
async def update_user(self, user_id: str, request: UserUpdateRequest) -> Optional[User]:
"""Update user with business validation"""
user = await self.user_repo.get_by_id(user_id)
if not user:
return None
# Update fields
if request.name is not None:
user.name = request.name
if request.email is not None:
# Check email uniqueness
existing = await self.user_repo.find_by_email(request.email)
if existing and existing.id != user.id:
raise ValueError("Email already in use")
user.email = request.email
if request.is_active is not None:
user.is_active = request.is_active
return await self.user_repo.update(user)
async def get_user_profile(self, user_id: str) -> Optional[User]:
"""Get user profile with additional data"""
user = await self.user_repo.get_by_id(user_id)
if user:
# Could add additional business logic here
# e.g., calculating user statistics, recent activity, etc.
pass
return user
async def deactivate_user(self, user_id: str) -> bool:
"""Deactivate user (soft delete)"""
user = await self.user_repo.get_by_id(user_id)
if user:
user.is_active = False
await self.user_repo.update(user)
return True
return False
class OrderService:
"""Business logic for order operations"""
def __init__(self, order_repo: 'OrderRepository', product_repo: ProductRepository, user_repo: UserRepository):
self.order_repo = order_repo
self.product_repo = product_repo
self.user_repo = user_repo
async def create_order(self, user_id: str, items: List[dict]) -> Order:
"""Create order with business validation"""
# Validate user
user = await self.user_repo.get_by_id(user_id)
if not user or not user.is_active:
raise ValueError("Invalid or inactive user")
# Validate and calculate order
validated_items = []
total = 0.0
for item in items:
product = await self.product_repo.get_by_id(item['product_id'])
if not product:
raise ValueError(f"Product {item['product_id']} not found")
quantity = item['quantity']
if quantity <= 0:
raise ValueError("Quantity must be positive")
item_total = product.price * quantity
total += item_total
validated_items.append({
'product': product,
'quantity': quantity,
'price': product.price
})
# Create order
order = Order(
user=user,
items=validated_items,
total=total,
status="pending"
)
return await self.order_repo.create(order)
async def process_payment(self, order_id: str) -> bool:
"""Process order payment"""
order = await self.order_repo.get_by_id(order_id)
if not order or order.status != "pending":
return False
# Payment processing logic here
# This could integrate with external payment services
order.status = "paid"
await self.order_repo.update(order)
return True
async def get_user_orders(self, user_id: str) -> List[Order]:
"""Get all orders for a user"""
# This would be implemented in OrderRepository
passPythonError Handling Patterns
from enum import Enum
from typing import Optional, Any
from fastapi import HTTPException, status
class ErrorCode(Enum):
USER_NOT_FOUND = "USER_NOT_FOUND"
USER_ALREADY_EXISTS = "USER_ALREADY_EXISTS"
INVALID_CREDENTIALS = "INVALID_CREDENTIALS"
INSUFFICIENT_PERMISSIONS = "INSUFFICIENT_PERMISSIONS"
VALIDATION_ERROR = "VALIDATION_ERROR"
INTERNAL_ERROR = "INTERNAL_ERROR"
class BusinessException(Exception):
"""Base exception for business logic errors"""
def __init__(self, error_code: ErrorCode, message: str, details: Optional[Any] = None):
self.error_code = error_code
self.message = message
self.details = details
super().__init__(message)
class UserNotFoundException(BusinessException):
def __init__(self, user_id: str):
super().__init__(
ErrorCode.USER_NOT_FOUND,
f"User with ID {user_id} not found"
)
class UserAlreadyExistsException(BusinessException):
def __init__(self, email: str):
super().__init__(
ErrorCode.USER_ALREADY_EXISTS,
f"User with email {email} already exists"
)
class ErrorHandler:
"""Global error handling for API"""
@staticmethod
def handle_business_exception(exc: BusinessException) -> HTTPException:
"""Convert business exceptions to HTTP exceptions"""
error_mappings = {
ErrorCode.USER_NOT_FOUND: status.HTTP_404_NOT_FOUND,
ErrorCode.USER_ALREADY_EXISTS: status.HTTP_409_CONFLICT,
ErrorCode.INVALID_CREDENTIALS: status.HTTP_401_UNAUTHORIZED,
ErrorCode.INSUFFICIENT_PERMISSIONS: status.HTTP_403_FORBIDDEN,
ErrorCode.VALIDATION_ERROR: status.HTTP_422_UNPROCESSABLE_ENTITY,
ErrorCode.INTERNAL_ERROR: status.HTTP_500_INTERNAL_SERVER_ERROR,
}
status_code = error_mappings.get(exc.error_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
return HTTPException(
status_code=status_code,
detail={
"error_code": exc.error_code.value,
"message": exc.message,
"details": exc.details
}
)
# Usage in FastAPI
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(BusinessException)
async def business_exception_handler(request: Request, exc: BusinessException):
http_exc = ErrorHandler.handle_business_exception(exc)
return JSONResponse(
status_code=http_exc.status_code,
content=http_exc.detail
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"error_code": "VALIDATION_ERROR",
"message": "Validation failed",
"details": exc.errors()
}
)PythonCaching Strategies
import json
import pickle
from typing import Any, Optional, Callable
from functools import wraps
import redis.asyncio as redis
from datetime import timedelta
class CacheService:
"""Redis-based caching service"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
data = await self.redis.get(key)
if data:
return pickle.loads(data)
except Exception:
pass
return None
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""Set value in cache with TTL"""
try:
data = pickle.dumps(value)
return await self.redis.setex(key, ttl, data)
except Exception:
return False
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
return await self.redis.delete(key) > 0
except Exception:
return False
async def clear_pattern(self, pattern: str) -> int:
"""Clear all keys matching pattern"""
try:
keys = await self.redis.keys(pattern)
if keys:
return await self.redis.delete(*keys)
return 0
except Exception:
return 0
def cache_result(ttl: int = 3600, key_prefix: str = ""):
"""Decorator to cache function results"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cache_service = kwargs.get('cache_service')
if cache_service:
cached_result = await cache_service.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache result
if cache_service and result is not None:
await cache_service.set(cache_key, result, ttl)
return result
return wrapper
return decorator
class CachedUserService(UserService):
"""User service with caching"""
def __init__(self, user_repo: UserRepository, auth_service: AuthService, cache_service: CacheService):
super().__init__(user_repo, auth_service)
self.cache = cache_service
@cache_result(ttl=1800, key_prefix="user")
async def get_user_profile(self, user_id: str, cache_service: CacheService = None) -> Optional[User]:
"""Get user profile with caching"""
if cache_service is None:
cache_service = self.cache
return await super().get_user_profile(user_id)
async def update_user(self, user_id: str, request: UserUpdateRequest) -> Optional[User]:
"""Update user and invalidate cache"""
user = await super().update_user(user_id, request)
if user:
# Invalidate user cache
await self.cache.clear_pattern(f"user:*{user_id}*")
return userPythonChapter 13: Advanced Topics
Database Transactions
from motor.motor_asyncio import AsyncIOMotorClientSession
from contextlib import asynccontextmanager
class TransactionManager:
"""Manager for database transactions"""
def __init__(self, client: AsyncIOMotorClient):
self.client = client
@asynccontextmanager
async def transaction(self):
"""Context manager for transactions"""
async with await self.client.start_session() as session:
async with session.start_transaction():
try:
yield session
except Exception:
await session.abort_transaction()
raise
else:
await session.commit_transaction()
class TransactionalOrderService:
"""Order service with transaction support"""
def __init__(self, transaction_manager: TransactionManager):
self.transaction_manager = transaction_manager
async def process_order_with_inventory(self, user_id: str, items: List[dict]) -> Order:
"""Process order with inventory update in transaction"""
async with self.transaction_manager.transaction() as session:
# Create order
order = Order(
user_id=ObjectId(user_id),
items=items,
total=sum(item['quantity'] * item['price'] for item in items),
status="processing"
)
await order.insert(session=session)
# Update inventory for each product
for item in items:
product = await Product.get(
ObjectId(item['product_id']),
session=session
)
if product.inventory < item['quantity']:
raise ValueError(f"Insufficient inventory for product {product.name}")
product.inventory -= item['quantity']
await product.save(session=session)
# Update order status
order.status = "confirmed"
await order.save(session=session)
return orderPythonEvent-Driven Architecture
from typing import Dict, List, Callable
from datetime import datetime
from abc import ABC, abstractmethod
import asyncio
class DomainEvent(ABC):
"""Base class for domain events"""
def __init__(self):
self.occurred_at = datetime.utcnow()
self.event_id = str(uuid.uuid4())
class UserCreatedEvent(DomainEvent):
def __init__(self, user: User):
super().__init__()
self.user = user
self.user_id = str(user.id)
class OrderPlacedEvent(DomainEvent):
def __init__(self, order: Order):
super().__init__()
self.order = order
self.order_id = str(order.id)
self.user_id = str(order.user.id)
class EventBus:
"""Simple in-memory event bus"""
def __init__(self):
self._handlers: Dict[type, List[Callable]] = {}
def subscribe(self, event_type: type, handler: Callable):
"""Subscribe to an event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event: DomainEvent):
"""Publish an event to all subscribers"""
event_type = type(event)
if event_type in self._handlers:
tasks = []
for handler in self._handlers[event_type]:
tasks.append(asyncio.create_task(handler(event)))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
class EventDrivenUserService(UserService):
"""User service with event publishing"""
def __init__(self, user_repo: UserRepository, auth_service: AuthService, event_bus: EventBus):
super().__init__(user_repo, auth_service)
self.event_bus = event_bus
async def create_user(self, request: UserCreateRequest) -> User:
"""Create user and publish event"""
user = await super().create_user(request)
# Publish event
event = UserCreatedEvent(user)
await self.event_bus.publish(event)
return user
# Event handlers
class EmailNotificationHandler:
"""Handle email notifications"""
def __init__(self, email_service: 'EmailService'):
self.email_service = email_service
async def handle_user_created(self, event: UserCreatedEvent):
"""Send welcome email when user is created"""
await self.email_service.send_welcome_email(event.user.email, event.user.name)
async def handle_order_placed(self, event: OrderPlacedEvent):
"""Send order confirmation email"""
user = await User.get(ObjectId(event.user_id))
await self.email_service.send_order_confirmation(user.email, event.order_id)
class AnalyticsHandler:
"""Handle analytics events"""
def __init__(self, analytics_service: 'AnalyticsService'):
self.analytics_service = analytics_service
async def handle_user_created(self, event: UserCreatedEvent):
"""Track user registration"""
await self.analytics_service.track_event("user_registered", {
"user_id": event.user_id,
"timestamp": event.occurred_at.isoformat()
})
async def handle_order_placed(self, event: OrderPlacedEvent):
"""Track order placement"""
await self.analytics_service.track_event("order_placed", {
"order_id": event.order_id,
"user_id": event.user_id,
"order_total": event.order.total,
"timestamp": event.occurred_at.isoformat()
})
# Setup event bus
def setup_event_bus(email_service, analytics_service) -> EventBus:
event_bus = EventBus()
email_handler = EmailNotificationHandler(email_service)
analytics_handler = AnalyticsHandler(analytics_service)
# Subscribe handlers
event_bus.subscribe(UserCreatedEvent, email_handler.handle_user_created)
event_bus.subscribe(UserCreatedEvent, analytics_handler.handle_user_created)
event_bus.subscribe(OrderPlacedEvent, email_handler.handle_order_placed)
event_bus.subscribe(OrderPlacedEvent, analytics_handler.handle_order_placed)
return event_busPythonAdvanced Query Patterns
from typing import TypeVar, Generic, List, Optional, Union
from beanie.operators import In, Regex, GTE, LTE
T = TypeVar('T', bound=Document)
class QueryBuilder(Generic[T]):
"""Fluent query builder for complex queries"""
def __init__(self, model: type[T]):
self.model = model
self._filters = []
self._sort_criteria = []
self._skip_count = 0
self._limit_count = None
self._populate_fields = []
def where(self, condition) -> 'QueryBuilder[T]':
"""Add a filter condition"""
self._filters.append(condition)
return self
def where_in(self, field, values: List) -> 'QueryBuilder[T]':
"""Add IN condition"""
self._filters.append(In(field, values))
return self
def where_regex(self, field, pattern: str, options: str = "i") -> 'QueryBuilder[T]':
"""Add regex condition"""
self._filters.append(Regex(field, pattern, options))
return self
def where_range(self, field, min_value, max_value) -> 'QueryBuilder[T]':
"""Add range condition"""
if min_value is not None:
self._filters.append(GTE(field, min_value))
if max_value is not None:
self._filters.append(LTE(field, max_value))
return self
def order_by(self, field, descending: bool = False) -> 'QueryBuilder[T]':
"""Add sort criteria"""
direction = -1 if descending else 1
self._sort_criteria.append((field, direction))
return self
def skip(self, count: int) -> 'QueryBuilder[T]':
"""Set skip count"""
self._skip_count = count
return self
def limit(self, count: int) -> 'QueryBuilder[T]':
"""Set limit count"""
self._limit_count = count
return self
def populate(self, *fields) -> 'QueryBuilder[T]':
"""Set fields to populate"""
self._populate_fields.extend(fields)
return self
async def to_list(self) -> List[T]:
"""Execute query and return list"""
query = self.model.find(*self._filters)
if self._sort_criteria:
for field, direction in self._sort_criteria:
query = query.sort((field, direction))
if self._skip_count:
query = query.skip(self._skip_count)
if self._limit_count:
query = query.limit(self._limit_count)
if self._populate_fields:
query = query.populate(self._populate_fields)
return await query.to_list()
async def first(self) -> Optional[T]:
"""Get first result"""
results = await self.limit(1).to_list()
return results[0] if results else None
async def count(self) -> int:
"""Count matching documents"""
return await self.model.find(*self._filters).count()
# Usage examples
async def advanced_user_search(
name_pattern: str = None,
age_range: tuple = None,
roles: List[str] = None,
is_active: bool = None,
page: int = 1,
page_size: int = 20
) -> List[User]:
"""Advanced user search with multiple filters"""
query = QueryBuilder(User)
if name_pattern:
query = query.where_regex(User.name, name_pattern)
if age_range:
min_age, max_age = age_range
query = query.where_range(User.age, min_age, max_age)
if roles:
query = query.where_in(User.roles, roles)
if is_active is not None:
query = query.where(User.is_active == is_active)
skip = (page - 1) * page_size
return await query.order_by(User.created_at, descending=True)\
.skip(skip)\
.limit(page_size)\
.to_list()
async def get_recent_orders_with_users(days: int = 30) -> List[Order]:
"""Get recent orders with user information"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
return await QueryBuilder(Order)\
.where(Order.created_at >= cutoff_date)\
.where(Order.status == "completed")\
.order_by(Order.created_at, descending=True)\
.populate(Order.user)\
.to_list()PythonChapter 14: Production Deployment
Configuration Management
graph TD
A[Configuration] --> B[Environment Variables]
A --> C[Config Files]
A --> D[Secret Management]
B --> B1[Development]
B --> B2[Staging]
B --> B3[Production]
C --> C1[YAML/JSON]
C --> C2[TOML]
D --> D1[HashiCorp Vault]
D --> D2[AWS Secrets Manager]
D --> D3[Azure Key Vault]from pydantic import BaseSettings, Field
from typing import List, Optional
import os
class DatabaseSettings(BaseSettings):
"""Database configuration"""
mongodb_url: str = Field(..., env="MONGODB_URL")
database_name: str = Field("myapp", env="DATABASE_NAME")
max_connections: int = Field(100, env="MONGODB_MAX_CONNECTIONS")
min_connections: int = Field(10, env="MONGODB_MIN_CONNECTIONS")
class RedisSettings(BaseSettings):
"""Redis configuration"""
redis_url: str = Field("redis://localhost:6379", env="REDIS_URL")
redis_db: int = Field(0, env="REDIS_DB")
class SecuritySettings(BaseSettings):
"""Security configuration"""
secret_key: str = Field(..., env="SECRET_KEY")
jwt_algorithm: str = Field("HS256", env="JWT_ALGORITHM")
access_token_expire_minutes: int = Field(30, env="ACCESS_TOKEN_EXPIRE_MINUTES")
refresh_token_expire_days: int = Field(30, env="REFRESH_TOKEN_EXPIRE_DAYS")
allowed_hosts: List[str] = Field(["*"], env="ALLOWED_HOSTS")
class LoggingSettings(BaseSettings):
"""Logging configuration"""
log_level: str = Field("INFO", env="LOG_LEVEL")
log_format: str = Field("json", env="LOG_FORMAT") # json or text
log_file: Optional[str] = Field(None, env="LOG_FILE")
class AppSettings(BaseSettings):
"""Main application settings"""
app_name: str = Field("FastAPI Beanie App", env="APP_NAME")
debug: bool = Field(False, env="DEBUG")
environment: str = Field("production", env="ENVIRONMENT")
# Sub-configurations
database: DatabaseSettings = DatabaseSettings()
redis: RedisSettings = RedisSettings()
security: SecuritySettings = SecuritySettings()
logging: LoggingSettings = LoggingSettings()
class Config:
case_sensitive = False
env_file = ".env"
env_file_encoding = "utf-8"
# Global settings instance
settings = AppSettings()
# Environment-specific configuration loading
def load_settings() -> AppSettings:
"""Load settings based on environment"""
env = os.getenv("ENVIRONMENT", "production")
if env == "development":
return AppSettings(_env_file=".env.dev")
elif env == "testing":
return AppSettings(_env_file=".env.test")
elif env == "staging":
return AppSettings(_env_file=".env.staging")
else:
return AppSettings() # Production uses environment variablesPythonLogging Configuration
import logging
import logging.config
import sys
from datetime import datetime
from typing import Dict, Any
import json
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# Add extra fields
for key, value in record.__dict__.items():
if key not in log_data and not key.startswith('_'):
log_data[key] = value
return json.dumps(log_data)
def setup_logging(settings: LoggingSettings):
"""Setup application logging"""
# Determine formatter
if settings.log_format == "json":
formatter = JSONFormatter()
else:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Setup handlers
handlers = []
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
handlers.append(console_handler)
# File handler if specified
if settings.log_file:
file_handler = logging.FileHandler(settings.log_file)
file_handler.setFormatter(formatter)
handlers.append(file_handler)
# Configure root logger
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
handlers=handlers,
force=True
)
# Set specific loggers
logging.getLogger("beanie").setLevel(logging.INFO)
logging.getLogger("motor").setLevel(logging.WARNING)
logging.getLogger("pymongo").setLevel(logging.WARNING)
# Application logger
logger = logging.getLogger("myapp")
# Logging decorators
def log_async_function(func):
"""Decorator to log async function calls"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
logger.info(
f"Starting {func.__name__}",
extra={"function": func.__name__, "args_count": len(args)}
)
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(
f"Completed {func.__name__}",
extra={
"function": func.__name__,
"execution_time": execution_time,
"success": True
}
)
return result
except Exception as e:
execution_time = time.time() - start_time
logger.error(
f"Failed {func.__name__}: {str(e)}",
extra={
"function": func.__name__,
"execution_time": execution_time,
"success": False,
"error": str(e)
},
exc_info=True
)
raise
return wrapperPythonHealth Checks and Monitoring
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse
from datetime import datetime, timedelta
from typing import Dict, Any
import psutil
import asyncio
class HealthChecker:
"""Application health checker"""
def __init__(self, db_client: AsyncIOMotorClient, redis_client: redis.Redis):
self.db_client = db_client
self.redis_client = redis_client
async def check_database(self) -> Dict[str, Any]:
"""Check database connectivity"""
try:
start_time = time.time()
# Simple ping to database
await self.db_client.admin.command('ping')
response_time = time.time() - start_time
return {
"status": "healthy",
"response_time_ms": round(response_time * 1000, 2)
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
async def check_redis(self) -> Dict[str, Any]:
"""Check Redis connectivity"""
try:
start_time = time.time()
await self.redis_client.ping()
response_time = time.time() - start_time
return {
"status": "healthy",
"response_time_ms": round(response_time * 1000, 2)
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
def check_system_resources(self) -> Dict[str, Any]:
"""Check system resource usage"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_available_gb": round(memory.available / (1024**3), 2),
"disk_percent": disk.percent,
"disk_free_gb": round(disk.free / (1024**3), 2)
}
async def check_all(self) -> Dict[str, Any]:
"""Perform all health checks"""
checks = {
"timestamp": datetime.utcnow().isoformat(),
"status": "healthy",
"checks": {}
}
# Run checks in parallel
db_check, redis_check = await asyncio.gather(
self.check_database(),
self.check_redis(),
return_exceptions=True
)
checks["checks"]["database"] = db_check if not isinstance(db_check, Exception) else {
"status": "unhealthy",
"error": str(db_check)
}
checks["checks"]["redis"] = redis_check if not isinstance(redis_check, Exception) else {
"status": "unhealthy",
"error": str(redis_check)
}
checks["checks"]["system"] = self.check_system_resources()
# Determine overall status
if any(check.get("status") == "unhealthy" for check in checks["checks"].values()):
checks["status"] = "unhealthy"
return checks
# Health check endpoints
def add_health_endpoints(app: FastAPI, health_checker: HealthChecker):
"""Add health check endpoints to FastAPI app"""
@app.get("/health", tags=["health"])
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.get("/health/detailed", tags=["health"])
async def detailed_health_check():
"""Detailed health check with dependencies"""
health_status = await health_checker.check_all()
status_code = status.HTTP_200_OK if health_status["status"] == "healthy" else status.HTTP_503_SERVICE_UNAVAILABLE
return JSONResponse(
content=health_status,
status_code=status_code
)
@app.get("/health/ready", tags=["health"])
async def readiness_check():
"""Readiness check for Kubernetes"""
try:
# Check critical dependencies
await health_checker.check_database()
return {"status": "ready"}
except Exception:
return JSONResponse(
content={"status": "not ready"},
status_code=status.HTTP_503_SERVICE_UNAVAILABLE
)
@app.get("/health/live", tags=["health"])
async def liveness_check():
"""Liveness check for Kubernetes"""
return {"status": "alive"}PythonDocker Configuration
# Multi-stage Dockerfile for production deployment
FROM python:3.11-slim as builder
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.11-slim
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy Python packages from builder stage
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application code
COPY . .
# Change ownership to appuser
RUN chown -R appuser:appuser /app
# Switch to non-root user
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]DockerfileKubernetes Deployment
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: beanie-app
labels:
app: beanie-app
spec:
replicas: 3
selector:
matchLabels:
app: beanie-app
template:
metadata:
labels:
app: beanie-app
spec:
containers:
- name: beanie-app
image: myregistry/beanie-app:latest
ports:
- containerPort: 8000
env:
- name: MONGODB_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: mongodb-url
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: secret-key
- name: ENVIRONMENT
value: "production"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 15"]
---
apiVersion: v1
kind: Service
metadata:
name: beanie-app-service
spec:
selector:
app: beanie-app
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: beanie-app-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- api.myapp.com
secretName: beanie-app-tls
rules:
- host: api.myapp.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: beanie-app-service
port:
number: 80YAMLPerformance Optimization
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
def create_optimized_app() -> FastAPI:
"""Create FastAPI app with performance optimizations"""
app = FastAPI(
title="Beanie FastAPI App",
description="Production-ready FastAPI app with Beanie ODM",
version="1.0.0",
docs_url="/docs" if settings.debug else None, # Disable docs in production
redoc_url="/redoc" if settings.debug else None
)
# Add middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.security.allowed_hosts,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
return app
# Uvicorn configuration for production
def run_production_server():
"""Run server with production configuration"""
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=4, # Number of worker processes
loop="uvloop", # Use uvloop for better performance
http="httptools", # Use httptools for better HTTP parsing
access_log=False, # Disable access logs for performance
log_config=None, # Use custom logging configuration
)PythonConclusion
This comprehensive guide covered Python Beanie from basic concepts to advanced production deployment strategies. Key takeaways include:
Best Practices Summary:
- Model Design: Use proper inheritance, validation, and indexes
- Query Optimization: Leverage indexes, aggregation, and efficient pagination
- Architecture: Implement repository pattern, service layer, and event-driven design
- Testing: Comprehensive test coverage with proper fixtures and performance tests
- Production: Proper configuration management, logging, monitoring, and deployment
Performance Tips:
- Use indexes strategically
- Implement proper caching strategies
- Optimize aggregation pipelines
- Use cursor-based pagination for large datasets
- Monitor query performance regularly
Security Considerations:
- Implement proper authentication and authorization
- Use environment variables for sensitive configuration
- Validate all input data
- Implement rate limiting and request validation
- Regular security audits
This guide provides a solid foundation for building production-ready applications with Python Beanie and MongoDB.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
