FastAPI: The Complete Guide

    A Comprehensive Book on Building Modern APIs

    Table of Contents

    1. Introduction to FastAPI
    2. Getting Started
    3. Request and Response Handling
    4. Data Validation with Pydantic
    5. Authentication and Authorization
    6. Database Integration
    7. Advanced Features
    8. Testing
    9. Deployment and Production
    10. Best Practices and Patterns

    Chapter 1: Introduction to FastAPI

    What is FastAPI?

    FastAPI is a modern, fast (high-performance) web framework for building APIs with Python 3.7+ based on standard Python type hints. It’s designed to be easy to use, fast to code, and production-ready.

    Key Features

    • Fast: Very high performance, on par with NodeJS and Go
    • Fast to code: Increase the speed to develop features by about 200% to 300%
    • Fewer bugs: Reduce about 40% of human (developer) induced errors
    • Intuitive: Great editor support with auto-completion everywhere
    • Easy: Designed to be easy to use and learn
    • Short: Minimize code duplication
    • Robust: Get production-ready code with automatic interactive documentation
    • Standards-based: Based on open standards for APIs: OpenAPI and JSON Schema

    Architecture Overview

    graph TD
        A[Client Request] --> B[FastAPI Application]
        B --> C[Path Operation]
        C --> D[Dependency Injection]
        D --> E[Request Validation]
        E --> F[Business Logic]
        F --> G[Response Serialization]
        G --> H[Client Response]
    
        I[Pydantic Models] --> E
        I --> G
        J[Database] --> F
        K[Authentication] --> D

    Chapter 2: Getting Started

    Installation

    # Basic installation
    pip install fastapi
    
    # With all optional dependencies
    pip install "fastapi[all]"
    
    # ASGI server for development
    pip install uvicorn
    
    # For production
    pip install gunicorn
    Bash

    Your First FastAPI Application

    from fastapi import FastAPI
    
    app = FastAPI(
        title="My First API",
        description="A simple FastAPI example",
        version="1.0.0"
    )
    
    @app.get("/")
    async def root():
        """Root endpoint returning a welcome message"""
        return {"message": "Hello World"}
    
    @app.get("/hello/{name}")
    async def say_hello(name: str):
        """Greet a specific person"""
        return {"message": f"Hello {name}"}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    Python

    Running the Application

    # Development server
    uvicorn main:app --reload
    
    # Custom host and port
    uvicorn main:app --host 0.0.0.0 --port 8000 --reload
    Bash

    Interactive Documentation

    FastAPI automatically generates interactive API documentation:

    • Swagger UI: http://localhost:8000/docs
    • ReDoc: http://localhost:8000/redoc
    • OpenAPI Schema: http://localhost:8000/openapi.json

    Chapter 3: Request and Response Handling

    Path Parameters

    from fastapi import FastAPI, Path
    from typing import Optional
    
    app = FastAPI()
    
    @app.get("/users/{user_id}")
    async def get_user(
        user_id: int = Path(..., title="The ID of the user", ge=1)
    ):
        """Get user by ID with validation"""
        return {"user_id": user_id}
    
    @app.get("/files/{file_path:path}")
    async def read_file(file_path: str):
        """Handle file paths with forward slashes"""
        return {"file_path": file_path}
    
    # Enum for path parameters
    from enum import Enum
    
    class ModelName(str, Enum):
        alexnet = "alexnet"
        resnet = "resnet"
        lenet = "lenet"
    
    @app.get("/models/{model_name}")
    async def get_model(model_name: ModelName):
        """Path parameter with predefined values"""
        if model_name == ModelName.alexnet:
            return {"model_name": model_name, "message": "Deep Learning FTW!"}
    
        if model_name.value == "lenet":
            return {"model_name": model_name, "message": "LeCNN all the images"}
    
        return {"model_name": model_name, "message": "Have some residuals"}
    Python

    Query Parameters

    # filepath: query_parameters.py
    from fastapi import FastAPI, Query
    from typing import Optional, List
    
    app = FastAPI()
    
    @app.get("/items/")
    async def read_items(
        skip: int = 0,
        limit: int = Query(10, le=100),
        q: Optional[str] = Query(None, min_length=3, max_length=50)
    ):
        """Get items with pagination and search"""
        items = [{"item_id": i} for i in range(skip, skip + limit)]
        if q:
            items = [item for item in items if q in str(item["item_id"])]
        return {"items": items, "skip": skip, "limit": limit, "q": q}
    
    @app.get("/items/search/")
    async def search_items(
        tags: List[str] = Query([]),
        category: Optional[str] = None
    ):
        """Search items with multiple tags"""
        return {"tags": tags, "category": category}
    
    # Boolean parameters
    @app.get("/items/featured/")
    async def get_featured_items(
        featured: bool = False,
        in_stock: Optional[bool] = None
    ):
        """Get featured items with boolean filters"""
        return {"featured": featured, "in_stock": in_stock}
    Python

    Request Body

    from fastapi import FastAPI, Body
    from pydantic import BaseModel, Field
    from typing import Optional, List
    from datetime import datetime
    
    app = FastAPI()
    
    class Item(BaseModel):
        name: str = Field(..., min_length=1, max_length=100)
        description: Optional[str] = Field(None, max_length=500)
        price: float = Field(..., gt=0, description="Price must be greater than zero")
        tax: Optional[float] = Field(None, ge=0)
        tags: List[str] = []
        created_at: datetime = Field(default_factory=datetime.now)
    
    class User(BaseModel):
        username: str
        full_name: Optional[str] = None
        email: str
    
    @app.post("/items/")
    async def create_item(item: Item):
        """Create a new item"""
        return {"message": "Item created", "item": item}
    
    @app.post("/items/{item_id}")
    async def update_item(
        item_id: int,
        item: Item,
        user: User,
        importance: int = Body(..., gt=0)
    ):
        """Update item with multiple body parameters"""
        return {
            "item_id": item_id,
            "item": item,
            "user": user,
            "importance": importance
        }
    
    # Nested models
    class Image(BaseModel):
        url: str
        name: str
    
    class ItemWithImages(BaseModel):
        name: str
        description: Optional[str] = None
        price: float
        images: List[Image] = []
    
    @app.post("/items/with-images/")
    async def create_item_with_images(item: ItemWithImages):
        """Create item with nested image models"""
        return item
    Python

    Response Models

    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    from typing import List, Optional
    
    app = FastAPI()
    
    class UserBase(BaseModel):
        username: str
        email: str
        full_name: Optional[str] = None
    
    class UserCreate(UserBase):
        password: str
    
    class UserResponse(UserBase):
        id: int
        is_active: bool
    
        class Config:
            orm_mode = True
    
    class ItemResponse(BaseModel):
        id: int
        name: str
        description: Optional[str] = None
        owner_id: int
    
    # Response model usage
    @app.post("/users/", response_model=UserResponse)
    async def create_user(user: UserCreate):
        """Create user with password excluded from response"""
        # Simulate user creation
        user_dict = user.dict()
        user_dict.update({"id": 1, "is_active": True})
        del user_dict["password"]  # Remove password from response
        return user_dict
    
    @app.get("/users/", response_model=List[UserResponse])
    async def list_users():
        """List all users"""
        return [
            {"id": 1, "username": "john", "email": "john@example.com", "is_active": True},
            {"id": 2, "username": "jane", "email": "jane@example.com", "is_active": False}
        ]
    
    # Multiple response models
    from fastapi.responses import JSONResponse
    
    @app.get("/items/{item_id}")
    async def get_item(item_id: int):
        """Get item with custom response handling"""
        if item_id == 0:
            return JSONResponse(
                status_code=404,
                content={"message": "Item not found"}
            )
    
        return {"id": item_id, "name": f"Item {item_id}"}
    Python

    Chapter 4: Data Validation with Pydantic

    Advanced Validation

    from fastapi import FastAPI
    from pydantic import BaseModel, validator, Field, EmailStr
    from typing import Optional, List
    import re
    from datetime import datetime, date
    
    app = FastAPI()
    
    class User(BaseModel):
        username: str = Field(..., min_length=3, max_length=20)
        email: EmailStr
        age: int = Field(..., ge=18, le=99)
        password: str = Field(..., min_length=8)
        confirm_password: str
        phone: Optional[str] = None
        birth_date: Optional[date] = None
    
        @validator('username')
        def username_alphanumeric(cls, v):
            """Username must be alphanumeric"""
            if not re.match(r'^[a-zA-Z0-9_]+$', v):
                raise ValueError('Username must be alphanumeric')
            return v
    
        @validator('password')
        def validate_password(cls, v):
            """Password must contain at least one uppercase, lowercase, and digit"""
            if not re.search(r'[A-Z]', v):
                raise ValueError('Password must contain at least one uppercase letter')
            if not re.search(r'[a-z]', v):
                raise ValueError('Password must contain at least one lowercase letter')
            if not re.search(r'\d', v):
                raise ValueError('Password must contain at least one digit')
            return v
    
        @validator('confirm_password')
        def passwords_match(cls, v, values):
            """Confirm password must match password"""
            if 'password' in values and v != values['password']:
                raise ValueError('Passwords do not match')
            return v
    
        @validator('phone')
        def validate_phone(cls, v):
            """Validate phone number format"""
            if v and not re.match(r'^\+?1?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$', v):
                raise ValueError('Invalid phone number format')
            return v
    
    class Product(BaseModel):
        name: str
        price: float = Field(..., gt=0)
        category: str
        tags: List[str] = []
    
        @validator('tags')
        def validate_tags(cls, v):
            """Validate tags"""
            if len(v) > 5:
                raise ValueError('Maximum 5 tags allowed')
            return [tag.lower().strip() for tag in v]
    
    @app.post("/users/")
    async def create_user(user: User):
        """Create user with advanced validation"""
        return {"message": "User created successfully", "username": user.username}
    
    @app.post("/products/")
    async def create_product(product: Product):
        """Create product with validation"""
        return product
    Python

    Custom Validators and Field Types

    from fastapi import FastAPI
    from pydantic import BaseModel, validator, root_validator, Field
    from typing import Optional, Dict, Any
    from datetime import datetime
    import json
    
    app = FastAPI()
    
    class EventData(BaseModel):
        event_type: str
        timestamp: datetime
        data: Dict[str, Any]
        user_id: Optional[int] = None
    
        @validator('event_type')
        def validate_event_type(cls, v):
            """Validate event type"""
            allowed_types = ['click', 'view', 'purchase', 'signup']
            if v not in allowed_types:
                raise ValueError(f'Event type must be one of: {allowed_types}')
            return v
    
        @validator('data')
        def validate_data(cls, v, values):
            """Validate data based on event type"""
            event_type = values.get('event_type')
    
            if event_type == 'purchase':
                required_fields = ['amount', 'currency', 'product_id']
                for field in required_fields:
                    if field not in v:
                        raise ValueError(f'Purchase events must include {field}')
    
            return v
    
        @root_validator
        def validate_user_context(cls, values):
            """Root validator for complex validation logic"""
            event_type = values.get('event_type')
            user_id = values.get('user_id')
    
            # Some events require user authentication
            if event_type in ['purchase'] and not user_id:
                raise ValueError(f'{event_type} events require user_id')
    
            return values
    
    # Custom field types
    class JSONField(str):
        @classmethod
        def __get_validators__(cls):
            yield cls.validate
    
        @classmethod
        def validate(cls, v):
            if isinstance(v, str):
                try:
                    return json.loads(v)
                except json.JSONDecodeError:
                    raise ValueError('Invalid JSON')
            return v
    
    class Configuration(BaseModel):
        name: str
        settings: JSONField
        enabled: bool = True
    
    @app.post("/events/")
    async def create_event(event: EventData):
        """Create event with custom validation"""
        return {"message": "Event created", "event": event}
    
    @app.post("/configurations/")
    async def create_configuration(config: Configuration):
        """Create configuration with JSON field"""
        return config
    Python

    Chapter 5: Authentication and Authorization

    Basic Authentication

    from fastapi import FastAPI, Depends, HTTPException, status
    from fastapi.security import HTTPBasic, HTTPBasicCredentials
    import secrets
    
    app = FastAPI()
    security = HTTPBasic()
    
    def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
        """Validate basic authentication credentials"""
        correct_username = secrets.compare_digest(credentials.username, "admin")
        correct_password = secrets.compare_digest(credentials.password, "secret")
    
        if not (correct_username and correct_password):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Incorrect username or password",
                headers={"WWW-Authenticate": "Basic"},
            )
        return credentials.username
    
    @app.get("/protected")
    async def protected_route(username: str = Depends(get_current_username)):
        """Protected route requiring basic authentication"""
        return {"message": f"Hello {username}, this is a protected route"}
    Python

    JWT Authentication

    from fastapi import FastAPI, Depends, HTTPException, status
    from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
    from pydantic import BaseModel
    from datetime import datetime, timedelta
    import jwt
    from passlib.context import CryptContext
    from typing import Optional
    
    app = FastAPI()
    
    # Configuration
    SECRET_KEY = "your-secret-key-here"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    
    # Password hashing
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    # Security
    security = HTTPBearer()
    
    # Models
    class User(BaseModel):
        username: str
        email: Optional[str] = None
        full_name: Optional[str] = None
        disabled: Optional[bool] = None
    
    class UserInDB(User):
        hashed_password: str
    
    class Token(BaseModel):
        access_token: str
        token_type: str
    
    class UserLogin(BaseModel):
        username: str
        password: str
    
    # Fake database
    fake_users_db = {
        "testuser": {
            "username": "testuser",
            "full_name": "Test User",
            "email": "testuser@example.com",
            "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",  # secret
            "disabled": False,
        }
    }
    
    # Utility functions
    def verify_password(plain_password, hashed_password):
        """Verify a password against its hash"""
        return pwd_context.verify(plain_password, hashed_password)
    
    def get_password_hash(password):
        """Hash a password"""
        return pwd_context.hash(password)
    
    def get_user(db, username: str):
        """Get user from database"""
        if username in db:
            user_dict = db[username]
            return UserInDB(**user_dict)
    
    def authenticate_user(username: str, password: str):
        """Authenticate user credentials"""
        user = get_user(fake_users_db, username)
        if not user or not verify_password(password, user.hashed_password):
            return False
        return user
    
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
        """Create JWT access token"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=15)
    
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
        """Get current user from JWT token"""
        credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
        try:
            payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                raise credentials_exception
        except jwt.PyJWTError:
            raise credentials_exception
    
        user = get_user(fake_users_db, username=username)
        if user is None:
            raise credentials_exception
        return user
    
    def get_current_active_user(current_user: User = Depends(get_current_user)):
        """Get current active user"""
        if current_user.disabled:
            raise HTTPException(status_code=400, detail="Inactive user")
        return current_user
    
    # Routes
    @app.post("/token", response_model=Token)
    async def login(user_credentials: UserLogin):
        """Login endpoint to get access token"""
        user = authenticate_user(user_credentials.username, user_credentials.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Incorrect username or password",
                headers={"WWW-Authenticate": "Bearer"},
            )
    
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = create_access_token(
            data={"sub": user.username}, expires_delta=access_token_expires
        )
        return {"access_token": access_token, "token_type": "bearer"}
    
    @app.get("/users/me", response_model=User)
    async def read_users_me(current_user: User = Depends(get_current_active_user)):
        """Get current user information"""
        return current_user
    
    @app.get("/protected")
    async def protected_route(current_user: User = Depends(get_current_active_user)):
        """Protected route requiring JWT authentication"""
        return {"message": f"Hello {current_user.username}, this is a protected route"}
    Python

    Role-Based Access Control

    from fastapi import FastAPI, Depends, HTTPException, status
    from fastapi.security import HTTPBearer
    from pydantic import BaseModel
    from enum import Enum
    from typing import List, Optional
    
    app = FastAPI()
    security = HTTPBearer()
    
    class Role(str, Enum):
        ADMIN = "admin"
        USER = "user"
        MODERATOR = "moderator"
    
    class Permission(str, Enum):
        READ = "read"
        WRITE = "write"
        DELETE = "delete"
        ADMIN = "admin"
    
    class User(BaseModel):
        username: str
        email: str
        roles: List[Role]
        permissions: List[Permission]
    
    # Role-permission mapping
    ROLE_PERMISSIONS = {
        Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
        Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
        Role.USER: [Permission.READ, Permission.WRITE]
    }
    
    def get_current_user() -> User:
        """Mock function to get current user"""
        # In real implementation, this would decode JWT and fetch user
        return User(
            username="testuser",
            email="test@example.com",
            roles=[Role.USER],
            permissions=ROLE_PERMISSIONS[Role.USER]
        )
    
    def require_permission(permission: Permission):
        """Dependency to require specific permission"""
        def check_permission(current_user: User = Depends(get_current_user)):
            if permission not in current_user.permissions:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission '{permission}' required"
                )
            return current_user
        return check_permission
    
    def require_role(role: Role):
        """Dependency to require specific role"""
        def check_role(current_user: User = Depends(get_current_user)):
            if role not in current_user.roles:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Role '{role}' required"
                )
            return current_user
        return check_role
    
    def require_any_role(roles: List[Role]):
        """Dependency to require any of the specified roles"""
        def check_roles(current_user: User = Depends(get_current_user)):
            if not any(role in current_user.roles for role in roles):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"One of roles {roles} required"
                )
            return current_user
        return check_roles
    
    # Protected routes
    @app.get("/public")
    async def public_endpoint():
        """Public endpoint - no authentication required"""
        return {"message": "This is a public endpoint"}
    
    @app.get("/user-data")
    async def get_user_data(user: User = Depends(require_permission(Permission.READ))):
        """Endpoint requiring read permission"""
        return {"message": "User data", "user": user.username}
    
    @app.post("/create-item")
    async def create_item(user: User = Depends(require_permission(Permission.WRITE))):
        """Endpoint requiring write permission"""
        return {"message": "Item created", "created_by": user.username}
    
    @app.delete("/delete-item/{item_id}")
    async def delete_item(
        item_id: int,
        user: User = Depends(require_permission(Permission.DELETE))
    ):
        """Endpoint requiring delete permission"""
        return {"message": f"Item {item_id} deleted", "deleted_by": user.username}
    
    @app.get("/admin-panel")
    async def admin_panel(user: User = Depends(require_role(Role.ADMIN))):
        """Admin-only endpoint"""
        return {"message": "Welcome to admin panel", "admin": user.username}
    
    @app.get("/moderation")
    async def moderation_panel(
        user: User = Depends(require_any_role([Role.ADMIN, Role.MODERATOR]))
    ):
        """Endpoint accessible by admins and moderators"""
        return {"message": "Moderation panel", "user": user.username, "roles": user.roles}
    Python

    Chapter 6: Database Integration

    SQLAlchemy Setup

    from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, Session, relationship
    from datetime import datetime
    
    # Database URL
    SQLALCHEMY_DATABASE_URL = "sqlite:///./app.db"
    # For PostgreSQL: "postgresql://user:password@localhost/dbname"
    # For MySQL: "mysql://user:password@localhost/dbname"
    
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL,
        connect_args={"check_same_thread": False}  # Only for SQLite
    )
    
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    Base = declarative_base()
    
    # Dependency to get database session
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    # Database models
    class User(Base):
        __tablename__ = "users"
    
        id = Column(Integer, primary_key=True, index=True)
        username = Column(String, unique=True, index=True)
        email = Column(String, unique=True, index=True)
        hashed_password = Column(String)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
    
        items = relationship("Item", back_populates="owner")
    
    class Item(Base):
        __tablename__ = "items"
    
        id = Column(Integer, primary_key=True, index=True)
        title = Column(String, index=True)
        description = Column(String)
        owner_id = Column(Integer, ForeignKey("users.id"))
        created_at = Column(DateTime, default=datetime.utcnow)
    
        owner = relationship("User", back_populates="items")
    
    # Create tables
    Base.metadata.create_all(bind=engine)
    Python

    CRUD Operations

    # filepath: crud.py
    from sqlalchemy.orm import Session
    from . import models, schemas
    from passlib.context import CryptContext
    
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    def get_password_hash(password):
        return pwd_context.hash(password)
    
    def verify_password(plain_password, hashed_password):
        return pwd_context.verify(plain_password, hashed_password)
    
    # User CRUD operations
    def get_user(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()
    
    def get_user_by_email(db: Session, email: str):
        return db.query(models.User).filter(models.User.email == email).first()
    
    def get_user_by_username(db: Session, username: str):
        return db.query(models.User).filter(models.User.username == username).first()
    
    def get_users(db: Session, skip: int = 0, limit: int = 100):
        return db.query(models.User).offset(skip).limit(limit).all()
    
    def create_user(db: Session, user: schemas.UserCreate):
        hashed_password = get_password_hash(user.password)
        db_user = models.User(
            username=user.username,
            email=user.email,
            hashed_password=hashed_password
        )
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user
    
    def authenticate_user(db: Session, username: str, password: str):
        user = get_user_by_username(db, username)
        if not user or not verify_password(password, user.hashed_password):
            return False
        return user
    
    # Item CRUD operations
    def get_items(db: Session, skip: int = 0, limit: int = 100):
        return db.query(models.Item).offset(skip).limit(limit).all()
    
    def get_item(db: Session, item_id: int):
        return db.query(models.Item).filter(models.Item.id == item_id).first()
    
    def get_items_by_user(db: Session, user_id: int, skip: int = 0, limit: int = 100):
        return db.query(models.Item).filter(
            models.Item.owner_id == user_id
        ).offset(skip).limit(limit).all()
    
    def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
        db_item = models.Item(**item.dict(), owner_id=user_id)
        db.add(db_item)
        db.commit()
        db.refresh(db_item)
        return db_item
    
    def update_item(db: Session, item_id: int, item: schemas.ItemUpdate):
        db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
        if db_item:
            for key, value in item.dict(exclude_unset=True).items():
                setattr(db_item, key, value)
            db.commit()
            db.refresh(db_item)
        return db_item
    
    def delete_item(db: Session, item_id: int):
        db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
        if db_item:
            db.delete(db_item)
            db.commit()
        return db_item
    Python

    Pydantic Schemas

    from pydantic import BaseModel, EmailStr
    from typing import List, Optional
    from datetime import datetime
    
    # User schemas
    class UserBase(BaseModel):
        username: str
        email: EmailStr
    
    class UserCreate(UserBase):
        password: str
    
    class UserUpdate(BaseModel):
        username: Optional[str] = None
        email: Optional[EmailStr] = None
        is_active: Optional[bool] = None
    
    class User(UserBase):
        id: int
        is_active: bool
        created_at: datetime
        items: List["Item"] = []
    
        class Config:
            orm_mode = True
    
    # Item schemas
    class ItemBase(BaseModel):
        title: str
        description: Optional[str] = None
    
    class ItemCreate(ItemBase):
        pass
    
    class ItemUpdate(BaseModel):
        title: Optional[str] = None
        description: Optional[str] = None
    
    class Item(ItemBase):
        id: int
        owner_id: int
        created_at: datetime
        owner: User
    
        class Config:
            orm_mode = True
    
    # Update forward references
    User.update_forward_refs()
    Python

    API Routes with Database

    from fastapi import FastAPI, Depends, HTTPException, status
    from sqlalchemy.orm import Session
    from typing import List
    
    from . import crud, models, schemas
    from .database import SessionLocal, engine, get_db
    
    models.Base.metadata.create_all(bind=engine)
    
    app = FastAPI(title="FastAPI with Database", version="1.0.0")
    
    # User endpoints
    @app.post("/users/", response_model=schemas.User)
    async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
        """Create a new user"""
        db_user = crud.get_user_by_email(db, email=user.email)
        if db_user:
            raise HTTPException(status_code=400, detail="Email already registered")
    
        db_user = crud.get_user_by_username(db, username=user.username)
        if db_user:
            raise HTTPException(status_code=400, detail="Username already taken")
    
        return crud.create_user(db=db, user=user)
    
    @app.get("/users/", response_model=List[schemas.User])
    async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
        """Get all users"""
        users = crud.get_users(db, skip=skip, limit=limit)
        return users
    
    @app.get("/users/{user_id}", response_model=schemas.User)
    async def read_user(user_id: int, db: Session = Depends(get_db)):
        """Get user by ID"""
        db_user = crud.get_user(db, user_id=user_id)
        if db_user is None:
            raise HTTPException(status_code=404, detail="User not found")
        return db_user
    
    # Item endpoints
    @app.post("/users/{user_id}/items/", response_model=schemas.Item)
    async def create_item_for_user(
        user_id: int,
        item: schemas.ItemCreate,
        db: Session = Depends(get_db)
    ):
        """Create item for user"""
        db_user = crud.get_user(db, user_id=user_id)
        if db_user is None:
            raise HTTPException(status_code=404, detail="User not found")
    
        return crud.create_user_item(db=db, item=item, user_id=user_id)
    
    @app.get("/items/", response_model=List[schemas.Item])
    async def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
        """Get all items"""
        items = crud.get_items(db, skip=skip, limit=limit)
        return items
    
    @app.get("/items/{item_id}", response_model=schemas.Item)
    async def read_item(item_id: int, db: Session = Depends(get_db)):
        """Get item by ID"""
        db_item = crud.get_item(db, item_id=item_id)
        if db_item is None:
            raise HTTPException(status_code=404, detail="Item not found")
        return db_item
    
    @app.put("/items/{item_id}", response_model=schemas.Item)
    async def update_item_endpoint(
        item_id: int,
        item: schemas.ItemUpdate,
        db: Session = Depends(get_db)
    ):
        """Update item"""
        db_item = crud.get_item(db, item_id=item_id)
        if db_item is None:
            raise HTTPException(status_code=404, detail="Item not found")
    
        return crud.update_item(db=db, item_id=item_id, item=item)
    
    @app.delete("/items/{item_id}")
    async def delete_item_endpoint(item_id: int, db: Session = Depends(get_db)):
        """Delete item"""
        db_item = crud.get_item(db, item_id=item_id)
        if db_item is None:
            raise HTTPException(status_code=404, detail="Item not found")
    
        crud.delete_item(db=db, item_id=item_id)
        return {"message": "Item deleted successfully"}
    Python

    Chapter 7: Advanced Features

    Dependency Injection System

    from fastapi import FastAPI, Depends, HTTPException, Header
    from typing import Optional
    import asyncio
    
    app = FastAPI()
    
    # Simple dependency
    async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
        """Common query parameters for pagination and search"""
        return {"q": q, "skip": skip, "limit": limit}
    
    # Database dependency
    class DatabaseConnection:
        def __init__(self):
            self.connection = None
    
        async def connect(self):
            """Simulate database connection"""
            await asyncio.sleep(0.1)
            self.connection = "connected"
            return self
    
        async def disconnect(self):
            """Simulate database disconnection"""
            self.connection = None
    
    async def get_database():
        """Database dependency with connection management"""
        db = DatabaseConnection()
        try:
            await db.connect()
            yield db
        finally:
            await db.disconnect()
    
    # Authentication dependency
    async def get_api_key(x_api_key: str = Header(...)):
        """API key authentication dependency"""
        valid_api_keys = ["secret-key-1", "secret-key-2"]
        if x_api_key not in valid_api_keys:
            raise HTTPException(status_code=403, detail="Invalid API Key")
        return x_api_key
    
    # Rate limiting dependency
    class RateLimiter:
        def __init__(self, max_requests: int = 10):
            self.max_requests = max_requests
            self.requests = {}
    
        async def __call__(self, client_ip: str = Header(None, alias="x-forwarded-for")):
            """Rate limiting dependency"""
            if not client_ip:
                client_ip = "unknown"
    
            if client_ip not in self.requests:
                self.requests[client_ip] = 0
    
            self.requests[client_ip] += 1
    
            if self.requests[client_ip] > self.max_requests:
                raise HTTPException(
                    status_code=429,
                    detail="Rate limit exceeded"
                )
    
            return client_ip
    
    rate_limiter = RateLimiter(max_requests=5)
    
    # Subdependencies
    async def get_user_from_token(api_key: str = Depends(get_api_key)):
        """Get user information from API key"""
        # Simulate user lookup
        user_map = {
            "secret-key-1": {"id": 1, "username": "user1"},
            "secret-key-2": {"id": 2, "username": "user2"}
        }
        return user_map.get(api_key, {"id": 0, "username": "guest"})
    
    # Routes using dependencies
    @app.get("/items/")
    async def read_items(
        commons: dict = Depends(common_parameters),
        db: DatabaseConnection = Depends(get_database),
        user: dict = Depends(get_user_from_token),
        client_ip: str = Depends(rate_limiter)
    ):
        """Get items with multiple dependencies"""
        return {
            "items": [f"item-{i}" for i in range(commons["skip"], commons["skip"] + commons["limit"])],
            "search": commons["q"],
            "database": db.connection,
            "user": user,
            "client_ip": client_ip
        }
    
    # Dependency with parameters
    def pagination_params(max_limit: int = 100):
        """Dependency factory with parameters"""
        async def paginate(skip: int = 0, limit: int = 50):
            if limit > max_limit:
                limit = max_limit
            return {"skip": skip, "limit": limit}
        return paginate
    
    @app.get("/users/")
    async def read_users(pagination: dict = Depends(pagination_params(max_limit=50))):
        """Get users with custom pagination"""
        return {
            "users": [f"user-{i}" for i in range(pagination["skip"], pagination["skip"] + pagination["limit"])],
            "pagination": pagination
        }
    
    # Class-based dependencies
    class UserService:
        def __init__(self, db: DatabaseConnection = Depends(get_database)):
            self.db = db
    
        async def get_user(self, user_id: int):
            """Get user by ID"""
            return {"id": user_id, "username": f"user-{user_id}", "db": self.db.connection}
    
    @app.get("/users/{user_id}")
    async def get_user(user_id: int, user_service: UserService = Depends()):
        """Get user using class-based dependency"""
        return await user_service.get_user(user_id)
    Python

    Background Tasks

    from fastapi import FastAPI, BackgroundTasks, Depends
    from pydantic import BaseModel, EmailStr
    import asyncio
    import logging
    from typing import List
    import smtplib
    from email.mime.text import MIMEText
    
    app = FastAPI()
    
    # Setup logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Models
    class EmailSchema(BaseModel):
        to: List[EmailStr]
        subject: str
        body: str
    
    class NotificationSchema(BaseModel):
        user_id: int
        message: str
        type: str = "info"
    
    # Background task functions
    async def send_email(email: EmailSchema):
        """Send email in background"""
        logger.info(f"Sending email to {email.to}")
    
        # Simulate email sending delay
        await asyncio.sleep(2)
    
        # In real implementation, you would use an email service
        try:
            # Example with SMTP (configure according to your email provider)
            # msg = MIMEText(email.body)
            # msg['Subject'] = email.subject
            # msg['From'] = "noreply@example.com"
            # msg['To'] = ", ".join(email.to)
    
            # with smtplib.SMTP('localhost') as server:
            #     server.send_message(msg)
    
            logger.info(f"Email sent successfully to {email.to}")
        except Exception as e:
            logger.error(f"Failed to send email: {e}")
    
    async def process_notification(notification: NotificationSchema):
        """Process notification in background"""
        logger.info(f"Processing notification for user {notification.user_id}")
    
        # Simulate processing time
        await asyncio.sleep(1)
    
        # Here you might save to database, send push notification, etc.
        logger.info(f"Notification processed: {notification.message}")
    
    def write_log(message: str):
        """Write log entry (non-async background task)"""
        with open("app.log", "a") as log_file:
            log_file.write(f"{message}\n")
    
    async def cleanup_temp_files():
        """Cleanup temporary files"""
        logger.info("Starting cleanup of temporary files")
        await asyncio.sleep(3)  # Simulate cleanup time
        logger.info("Temporary files cleaned up")
    
    # Routes with background tasks
    @app.post("/send-email/")
    async def send_email_endpoint(
        email: EmailSchema,
        background_tasks: BackgroundTasks
    ):
        """Send email endpoint with background task"""
        background_tasks.add_task(send_email, email)
        background_tasks.add_task(write_log, f"Email queued for {email.to}")
    
        return {"message": "Email queued for sending"}
    
    @app.post("/notify/")
    async def send_notification(
        notification: NotificationSchema,
        background_tasks: BackgroundTasks
    ):
        """Send notification with background processing"""
        background_tasks.add_task(process_notification, notification)
    
        return {"message": "Notification queued for processing"}
    
    @app.post("/users/")
    async def create_user(
        user_data: dict,
        background_tasks: BackgroundTasks
    ):
        """Create user with welcome email and cleanup"""
        # Process user creation (simulate)
        user_id = 123
    
        # Add multiple background tasks
        welcome_email = EmailSchema(
            to=[user_data["email"]],
            subject="Welcome!",
            body=f"Welcome {user_data['name']} to our service!"
        )
    
        background_tasks.add_task(send_email, welcome_email)
        background_tasks.add_task(cleanup_temp_files)
        background_tasks.add_task(write_log, f"User created: {user_id}")
    
        return {"user_id": user_id, "message": "User created successfully"}
    
    # Background task with dependency injection
    async def get_user_service():
        """Mock user service dependency"""
        return {"service": "user_service"}
    
    @app.post("/users/{user_id}/welcome")
    async def send_welcome_message(
        user_id: int,
        background_tasks: BackgroundTasks,
        user_service: dict = Depends(get_user_service)
    ):
        """Send welcome message with dependency injection"""
    
        def send_welcome_with_service(user_id: int, service: dict):
            """Background task that uses dependency"""
            logger.info(f"Sending welcome message to user {user_id} using {service}")
            # Process with service
    
        background_tasks.add_task(send_welcome_with_service, user_id, user_service)
    
        return {"message": "Welcome message queued"}
    Python

    File Uploads and Downloads

    from fastapi import FastAPI, File, UploadFile, HTTPException, Response
    from fastapi.responses import FileResponse, StreamingResponse
    from typing import List
    import shutil
    import os
    import aiofiles
    from pathlib import Path
    import mimetypes
    import zipfile
    import io
    
    app = FastAPI()
    
    # Configuration
    UPLOAD_DIR = Path("uploads")
    UPLOAD_DIR.mkdir(exist_ok=True)
    
    MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB
    ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".pdf", ".txt", ".csv"}
    
    # Utility functions
    def validate_file(file: UploadFile):
        """Validate uploaded file"""
        # Check file extension
        file_extension = Path(file.filename).suffix.lower()
        if file_extension not in ALLOWED_EXTENSIONS:
            raise HTTPException(
                status_code=400,
                detail=f"File type {file_extension} not allowed. Allowed types: {ALLOWED_EXTENSIONS}"
            )
    
        # Check file size (this is approximate, actual size check happens during upload)
        if hasattr(file.file, 'seek') and hasattr(file.file, 'tell'):
            file.file.seek(0, 2)  # Seek to end
            size = file.file.tell()
            file.file.seek(0)  # Seek back to beginning
    
            if size > MAX_FILE_SIZE:
                raise HTTPException(
                    status_code=400,
                    detail=f"File too large. Maximum size: {MAX_FILE_SIZE} bytes"
                )
    
    # Single file upload
    @app.post("/upload/")
    async def upload_file(file: UploadFile = File(...)):
        """Upload a single file"""
        validate_file(file)
    
        file_path = UPLOAD_DIR / file.filename
    
        try:
            async with aiofiles.open(file_path, 'wb') as f:
                content = await file.read()
                await f.write(content)
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
    
        return {
            "filename": file.filename,
            "size": len(content),
            "content_type": file.content_type,
            "path": str(file_path)
        }
    
    # Multiple file upload
    @app.post("/upload-multiple/")
    async def upload_multiple_files(files: List[UploadFile] = File(...)):
        """Upload multiple files"""
        if len(files) > 10:
            raise HTTPException(status_code=400, detail="Maximum 10 files allowed")
    
        uploaded_files = []
    
        for file in files:
            validate_file(file)
    
            file_path = UPLOAD_DIR / file.filename
    
            try:
                async with aiofiles.open(file_path, 'wb') as f:
                    content = await file.read()
                    await f.write(content)
    
                uploaded_files.append({
                    "filename": file.filename,
                    "size": len(content),
                    "content_type": file.content_type,
                    "path": str(file_path)
                })
            except Exception as e:
                raise HTTPException(status_code=500, detail=f"Error saving file {file.filename}: {e}")
    
        return {"uploaded_files": uploaded_files}
    
    # File download
    @app.get("/download/{filename}")
    async def download_file(filename: str):
        """Download a file"""
        file_path = UPLOAD_DIR / filename
    
        if not file_path.exists():
            raise HTTPException(status_code=404, detail="File not found")
    
        # Get MIME type
        mime_type, _ = mimetypes.guess_type(str(file_path))
    
        return FileResponse(
            path=file_path,
            filename=filename,
            media_type=mime_type
        )
    
    # Stream large file download
    @app.get("/stream/{filename}")
    async def stream_file(filename: str):
        """Stream a large file"""
        file_path = UPLOAD_DIR / filename
    
        if not file_path.exists():
            raise HTTPException(status_code=404, detail="File not found")
    
        async def file_generator():
            async with aiofiles.open(file_path, 'rb') as f:
                while chunk := await f.read(8192):  # Read in 8KB chunks
                    yield chunk
    
        mime_type, _ = mimetypes.guess_type(str(file_path))
    
        return StreamingResponse(
            file_generator(),
            media_type=mime_type,
            headers={"Content-Disposition": f"attachment; filename={filename}"}
        )
    
    # Create and download ZIP file
    @app.get("/download-zip/")
    async def download_zip():
        """Create and download a ZIP file of all uploads"""
        zip_buffer = io.BytesIO()
    
        with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
            for file_path in UPLOAD_DIR.iterdir():
                if file_path.is_file():
                    zip_file.write(file_path, file_path.name)
    
        zip_buffer.seek(0)
    
        return StreamingResponse(
            io.BytesIO(zip_buffer.read()),
            media_type="application/zip",
            headers={"Content-Disposition": "attachment; filename=uploads.zip"}
        )
    
    # File information
    @app.get("/files/")
    async def list_files():
        """List all uploaded files"""
        files = []
    
        for file_path in UPLOAD_DIR.iterdir():
            if file_path.is_file():
                stat = file_path.stat()
                mime_type, _ = mimetypes.guess_type(str(file_path))
    
                files.append({
                    "filename": file_path.name,
                    "size": stat.st_size,
                    "created": stat.st_ctime,
                    "mime_type": mime_type
                })
    
        return {"files": files}
    
    # Delete file
    @app.delete("/files/{filename}")
    async def delete_file(filename: str):
        """Delete a file"""
        file_path = UPLOAD_DIR / filename
    
        if not file_path.exists():
            raise HTTPException(status_code=404, detail="File not found")
    
        try:
            file_path.unlink()
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Error deleting file: {e}")
    
        return {"message": f"File {filename} deleted successfully"}
    
    # Upload with form data
    @app.post("/upload-with-metadata/")
    async def upload_with_metadata(
        file: UploadFile = File(...),
        description: str = None,
        tags: str = None
    ):
        """Upload file with additional metadata"""
        validate_file(file)
    
        file_path = UPLOAD_DIR / file.filename
    
        try:
            async with aiofiles.open(file_path, 'wb') as f:
                content = await file.read()
                await f.write(content)
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
    
        # Save metadata (in real app, you'd save to database)
        metadata = {
            "filename": file.filename,
            "size": len(content),
            "content_type": file.content_type,
            "description": description,
            "tags": tags.split(",") if tags else []
        }
    
        return metadata
    Python

    WebSocket Support

    from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
    from fastapi.responses import HTMLResponse
    from typing import List, Dict
    import json
    import asyncio
    from datetime import datetime
    
    app = FastAPI()
    
    # Connection manager for WebSocket connections
    class ConnectionManager:
        def __init__(self):
            self.active_connections: List[WebSocket] = []
            self.user_connections: Dict[str, WebSocket] = {}
            self.room_connections: Dict[str, List[WebSocket]] = {}
    
        async def connect(self, websocket: WebSocket, user_id: str = None):
            """Connect a new WebSocket"""
            await websocket.accept()
            self.active_connections.append(websocket)
    
            if user_id:
                self.user_connections[user_id] = websocket
    
        def disconnect(self, websocket: WebSocket, user_id: str = None):
            """Disconnect a WebSocket"""
            if websocket in self.active_connections:
                self.active_connections.remove(websocket)
    
            if user_id and user_id in self.user_connections:
                del self.user_connections[user_id]
    
            # Remove from all rooms
            for room_id, connections in self.room_connections.items():
                if websocket in connections:
                    connections.remove(websocket)
    
        async def send_personal_message(self, message: str, websocket: WebSocket):
            """Send message to specific WebSocket"""
            await websocket.send_text(message)
    
        async def send_personal_message_to_user(self, message: str, user_id: str):
            """Send message to specific user"""
            if user_id in self.user_connections:
                websocket = self.user_connections[user_id]
                await websocket.send_text(message)
    
        async def broadcast(self, message: str):
            """Broadcast message to all connected clients"""
            for connection in self.active_connections:
                try:
                    await connection.send_text(message)
                except:
                    # Remove broken connections
                    self.active_connections.remove(connection)
    
        async def join_room(self, websocket: WebSocket, room_id: str):
            """Join a room"""
            if room_id not in self.room_connections:
                self.room_connections[room_id] = []
            self.room_connections[room_id].append(websocket)
    
        async def leave_room(self, websocket: WebSocket, room_id: str):
            """Leave a room"""
            if room_id in self.room_connections:
                if websocket in self.room_connections[room_id]:
                    self.room_connections[room_id].remove(websocket)
    
        async def send_to_room(self, message: str, room_id: str):
            """Send message to all users in a room"""
            if room_id in self.room_connections:
                for connection in self.room_connections[room_id]:
                    try:
                        await connection.send_text(message)
                    except:
                        # Remove broken connections
                        self.room_connections[room_id].remove(connection)
    
    manager = ConnectionManager()
    
    # Simple WebSocket endpoint
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        """Basic WebSocket endpoint"""
        await manager.connect(websocket)
        try:
            while True:
                data = await websocket.receive_text()
                await manager.send_personal_message(f"You wrote: {data}", websocket)
        except WebSocketDisconnect:
            manager.disconnect(websocket)
    
    # WebSocket with user identification
    @app.websocket("/ws/{user_id}")
    async def websocket_user_endpoint(websocket: WebSocket, user_id: str):
        """WebSocket endpoint with user identification"""
        await manager.connect(websocket, user_id)
    
        # Notify others about new user
        await manager.broadcast(f"User {user_id} joined the chat")
    
        try:
            while True:
                data = await websocket.receive_text()
                message = f"User {user_id}: {data}"
                await manager.broadcast(message)
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
            await manager.broadcast(f"User {user_id} left the chat")
    
    # WebSocket chat room
    @app.websocket("/ws/room/{room_id}/{user_id}")
    async def websocket_room_endpoint(websocket: WebSocket, room_id: str, user_id: str):
        """WebSocket endpoint for chat rooms"""
        await manager.connect(websocket, user_id)
        await manager.join_room(websocket, room_id)
    
        # Notify room about new user
        join_message = json.dumps({
            "type": "user_joined",
            "user_id": user_id,
            "message": f"{user_id} joined room {room_id}",
            "timestamp": datetime.now().isoformat()
        })
        await manager.send_to_room(join_message, room_id)
    
        try:
            while True:
                data = await websocket.receive_text()
    
                try:
                    # Try to parse JSON message
                    message_data = json.loads(data)
                    message_type = message_data.get("type", "message")
    
                    if message_type == "message":
                        # Regular chat message
                        response = json.dumps({
                            "type": "message",
                            "user_id": user_id,
                            "message": message_data.get("message", ""),
                            "timestamp": datetime.now().isoformat()
                        })
                        await manager.send_to_room(response, room_id)
    
                    elif message_type == "typing":
                        # Typing indicator
                        response = json.dumps({
                            "type": "typing",
                            "user_id": user_id,
                            "typing": message_data.get("typing", False)
                        })
                        await manager.send_to_room(response, room_id)
    
                except json.JSONDecodeError:
                    # Handle plain text messages
                    response = json.dumps({
                        "type": "message",
                        "user_id": user_id,
                        "message": data,
                        "timestamp": datetime.now().isoformat()
                    })
                    await manager.send_to_room(response, room_id)
    
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
            await manager.leave_room(websocket, room_id)
    
            # Notify room about user leaving
            leave_message = json.dumps({
                "type": "user_left",
                "user_id": user_id,
                "message": f"{user_id} left room {room_id}",
                "timestamp": datetime.now().isoformat()
            })
            await manager.send_to_room(leave_message, room_id)
    
    # REST endpoint to send message to user
    @app.post("/send-message/{user_id}")
    async def send_message_to_user(user_id: str, message: dict):
        """Send message to specific user via REST"""
        await manager.send_personal_message_to_user(
            json.dumps(message),
            user_id
        )
        return {"message": f"Message sent to user {user_id}"}
    
    # REST endpoint to broadcast message
    @app.post("/broadcast/")
    async def broadcast_message(message: dict):
        """Broadcast message to all connected users"""
        await manager.broadcast(json.dumps(message))
        return {"message": "Message broadcasted"}
    
    # REST endpoint to send message to room
    @app.post("/room/{room_id}/message/")
    async def send_message_to_room(room_id: str, message: dict):
        """Send message to specific room"""
        await manager.send_to_room(json.dumps(message), room_id)
        return {"message": f"Message sent to room {room_id}"}
    
    # WebSocket with authentication
    async def get_user_from_token(token: str):
        """Mock function to get user from token"""
        # In real implementation, validate JWT token
        valid_tokens = {"user1": "token1", "user2": "token2"}
        for user, valid_token in valid_tokens.items():
            if token == valid_token:
                return user
        return None
    
    @app.websocket("/ws/secure/{token}")
    async def secure_websocket_endpoint(websocket: WebSocket, token: str):
        """Secure WebSocket endpoint with authentication"""
        user_id = await get_user_from_token(token)
    
        if not user_id:
            await websocket.close(code=4001, reason="Invalid token")
            return
    
        await manager.connect(websocket, user_id)
    
        try:
            while True:
                data = await websocket.receive_text()
                response = f"Authenticated user {user_id}: {data}"
                await manager.send_personal_message(response, websocket)
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
    
    # HTML client for testing
    html = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>WebSocket Test</title>
    </head>
    <body>
        <h1>WebSocket Test</h1>
        <div id="messages"></div>
        <input type="text" id="messageInput" placeholder="Type a message...">
        <button onclick# filepath: websockets.py
    from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
    from fastapi.responses import HTMLResponse
    from typing import List, Dict
    import json
    import asyncio
    from datetime import datetime
    
    app = FastAPI()
    
    # Connection manager for WebSocket connections
    class ConnectionManager:
        def __init__(self):
            self.active_connections: List[WebSocket] = []
            self.user_connections: Dict[str, WebSocket] = {}
            self.room_connections: Dict[str, List[WebSocket]] = {}
    
        async def connect(self, websocket: WebSocket, user_id: str = None):
            """Connect a new WebSocket"""
            await websocket.accept()
            self.active_connections.append(websocket)
    
            if user_id:
                self.user_connections[user_id] = websocket
    
        def disconnect(self, websocket: WebSocket, user_id: str = None):
            """Disconnect a WebSocket"""
            if websocket in self.active_connections:
                self.active_connections.remove(websocket)
    
            if user_id and user_id in self.user_connections:
                del self.user_connections[user_id]
    
            # Remove from all rooms
            for room_id, connections in self.room_connections.items():
                if websocket in connections:
                    connections.remove(websocket)
    
        async def send_personal_message(self, message: str, websocket: WebSocket):
            """Send message to specific WebSocket"""
            await websocket.send_text(message)
    
        async def send_personal_message_to_user(self, message: str, user_id: str):
            """Send message to specific user"""
            if user_id in self.user_connections:
                websocket = self.user_connections[user_id]
                await websocket.send_text(message)
    
        async def broadcast(self, message: str):
            """Broadcast message to all connected clients"""
            for connection in self.active_connections:
                try:
                    await connection.send_text(message)
                except:
                    # Remove broken connections
                    self.active_connections.remove(connection)
    
        async def join_room(self, websocket: WebSocket, room_id: str):
            """Join a room"""
            if room_id not in self.room_connections:
                self.room_connections[room_id] = []
            self.room_connections[room_id].append(websocket)
    
        async def leave_room(self, websocket: WebSocket, room_id: str):
            """Leave a room"""
            if room_id in self.room_connections:
                if websocket in self.room_connections[room_id]:
                    self.room_connections[room_id].remove(websocket)
    
        async def send_to_room(self, message: str, room_id: str):
            """Send message to all users in a room"""
            if room_id in self.room_connections:
                for connection in self.room_connections[room_id]:
                    try:
                        await connection.send_text(message)
                    except:
                        # Remove broken connections
                        self.room_connections[room_id].remove(connection)
    
    manager = ConnectionManager()
    
    # Simple WebSocket endpoint
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
        """Basic WebSocket endpoint"""
        await manager.connect(websocket)
        try:
            while True:
                data = await websocket.receive_text()
                await manager.send_personal_message(f"You wrote: {data}", websocket)
        except WebSocketDisconnect:
            manager.disconnect(websocket)
    
    # WebSocket with user identification
    @app.websocket("/ws/{user_id}")
    async def websocket_user_endpoint(websocket: WebSocket, user_id: str):
        """WebSocket endpoint with user identification"""
        await manager.connect(websocket, user_id)
    
        # Notify others about new user
        await manager.broadcast(f"User {user_id} joined the chat")
    
        try:
            while True:
                data = await websocket.receive_text()
                message = f"User {user_id}: {data}"
                await manager.broadcast(message)
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
            await manager.broadcast(f"User {user_id} left the chat")
    
    # WebSocket chat room
    @app.websocket("/ws/room/{room_id}/{user_id}")
    async def websocket_room_endpoint(websocket: WebSocket, room_id: str, user_id: str):
        """WebSocket endpoint for chat rooms"""
        await manager.connect(websocket, user_id)
        await manager.join_room(websocket, room_id)
    
        # Notify room about new user
        join_message = json.dumps({
            "type": "user_joined",
            "user_id": user_id,
            "message": f"{user_id} joined room {room_id}",
            "timestamp": datetime.now().isoformat()
        })
        await manager.send_to_room(join_message, room_id)
    
        try:
            while True:
                data = await websocket.receive_text()
    
                try:
                    # Try to parse JSON message
                    message_data = json.loads(data)
                    message_type = message_data.get("type", "message")
    
                    if message_type == "message":
                        # Regular chat message
                        response = json.dumps({
                            "type": "message",
                            "user_id": user_id,
                            "message": message_data.get("message", ""),
                            "timestamp": datetime.now().isoformat()
                        })
                        await manager.send_to_room(response, room_id)
    
                    elif message_type == "typing":
                        # Typing indicator
                        response = json.dumps({
                            "type": "typing",
                            "user_id": user_id,
                            "typing": message_data.get("typing", False)
                        })
                        await manager.send_to_room(response, room_id)
    
                except json.JSONDecodeError:
                    # Handle plain text messages
                    response = json.dumps({
                        "type": "message",
                        "user_id": user_id,
                        "message": data,
                        "timestamp": datetime.now().isoformat()
                    })
                    await manager.send_to_room(response, room_id)
    
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
            await manager.leave_room(websocket, room_id)
    
            # Notify room about user leaving
            leave_message = json.dumps({
                "type": "user_left",
                "user_id": user_id,
                "message": f"{user_id} left room {room_id}",
                "timestamp": datetime.now().isoformat()
            })
            await manager.send_to_room(leave_message, room_id)
    
    # REST endpoint to send message to user
    @app.post("/send-message/{user_id}")
    async def send_message_to_user(user_id: str, message: dict):
        """Send message to specific user via REST"""
        await manager.send_personal_message_to_user(
            json.dumps(message),
            user_id
        )
        return {"message": f"Message sent to user {user_id}"}
    
    # REST endpoint to broadcast message
    @app.post("/broadcast/")
    async def broadcast_message(message: dict):
        """Broadcast message to all connected users"""
        await manager.broadcast(json.dumps(message))
        return {"message": "Message broadcasted"}
    
    # REST endpoint to send message to room
    @app.post("/room/{room_id}/message/")
    async def send_message_to_room(room_id: str, message: dict):
        """Send message to specific room"""
        await manager.send_to_room(json.dumps(message), room_id)
        return {"message": f"Message sent to room {room_id}"}
    
    # WebSocket with authentication
    async def get_user_from_token(token: str):
        """Mock function to get user from token"""
        # In real implementation, validate JWT token
        valid_tokens = {"user1": "token1", "user2": "token2"}
        for user, valid_token in valid_tokens.items():
            if token == valid_token:
                return user
        return None
    
    @app.websocket("/ws/secure/{token}")
    async def secure_websocket_endpoint(websocket: WebSocket, token: str):
        """Secure WebSocket endpoint with authentication"""
        user_id = await get_user_from_token(token)
    
        if not user_id:
            await websocket.close(code=4001, reason="Invalid token")
            return
    
        await manager.connect(websocket, user_id)
    
        try:
            while True:
                data = await websocket.receive_text()
                response = f"Authenticated user {user_id}: {data}"
                await manager.send_personal_message(response, websocket)
        except WebSocketDisconnect:
            manager.disconnect(websocket, user_id)
    """
    # HTML client for testing
    html = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>WebSocket Test</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            #messages { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; margin: 10px 0; }
            input, button { padding: 5px; margin: 5px; }
            .message { margin: 5px 0; }
            .user-joined { color: green; }
            .user-left { color: red; }
        </style>
    </head>
    <body>
        <h1>WebSocket Chat Test</h1>
        <div>
            <input type="text" id="userIdInput" placeholder="Enter your user ID" value="user1">
            <input type="text" id="roomIdInput" placeholder="Enter room ID" value="room1">
            <button onclick="connect()">Connect</button>
            <button onclick="disconnect()">Disconnect</button>
        </div>
        <div id="messages"></div>
        <div>
            <input type="text" id="messageInput" placeholder="Type a message..." onkeypress="handleKeyPress(event)">
            <button onclick="sendMessage()">Send</button>
        </div>
    
        <script>
            let ws = null;
            let userId = '';
            let roomId = '';
    
            function connect() {
                userId = document.getElementById('userIdInput').value;
                roomId = document.getElementById('roomIdInput').value;
    
                if (!userId || !roomId) {
                    alert('Please enter both user ID and room ID');
                    return;
                }
    
                ws = new WebSocket(`ws://localhost:8000/ws/room/${roomId}/${userId}`);
    
                ws.onopen = function(event) {
                    addMessage('Connected to chat room', 'system');
                };
    
                ws.onmessage = function(event) {
                    const data = JSON.parse(event.data);
                    addMessage(`${data.user_id}: ${data.message}`, data.type);
                };
    
                ws.onclose = function(event) {
                    addMessage('Disconnected from chat room', 'system');
                };
    
                ws.onerror = function(error) {
                    addMessage('Error: ' + error, 'error');
                };
            }
    
            function disconnect() {
                if (ws) {
                    ws.close();
                    ws = null;
                }
            }
    
            function sendMessage() {
                const input = document.getElementById('messageInput');
                const message = input.value.trim();
    
                if (message && ws && ws.readyState === WebSocket.OPEN) {
                    const messageData = {
                        type: 'message',
                        message: message
                    };
                    ws.send(JSON.stringify(messageData));
                    input.value = '';
                }
            }
    
            function handleKeyPress(event) {
                if (event.key === 'Enter') {
                    sendMessage();
                }
            }
    
            function addMessage(message, type = 'message') {
                const messagesDiv = document.getElementById('messages');
                const messageDiv = document.createElement('div');
                messageDiv.className = `message ${type}`;
                messageDiv.textContent = `${new Date().toLocaleTimeString()}: ${message}`;
                messagesDiv.appendChild(messageDiv);
                messagesDiv.scrollTop = messagesDiv.scrollHeight;
            }
        </script>
    </body>
    </html>"""
    Python
    @app.get("/")
    async def get_websocket_client():
        """Serve WebSocket test client"""
        return HTMLResponse(html)
    Python

    Chapter 8: Testing

    Test Setup and Configuration

    import pytest
    from fastapi.testclient import TestClient
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.pool import StaticPool
    
    from .main import app, get_db
    from .database import Base
    
    # Test database setup
    SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
    
    engine = create_engine(
        SQLALCHEMY_DATABASE_URL,
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    Base.metadata.create_all(bind=engine)
    
    def override_get_db():
        try:
            db = TestingSessionLocal()
            yield db
        finally:
            db.close()
    
    app.dependency_overrides[get_db] = override_get_db
    
    client = TestClient(app)
    
    # Test fixtures
    @pytest.fixture
    def test_user():
        return {
            "username": "testuser",
            "email": "test@example.com",
            "password": "testpass123"
        }
    
    @pytest.fixture
    def test_item():
        return {
            "title": "Test Item",
            "description": "A test item"
        }
    
    @pytest.fixture
    def authenticated_user(test_user):
        # Create user
        response = client.post("/users/", json=test_user)
        assert response.status_code == 200
    
        # Login and get token
        login_data = {
            "username": test_user["username"],
            "password": test_user["password"]
        }
        response = client.post("/token", json=login_data)
        assert response.status_code == 200
        token = response.json()["access_token"]
    
        return {"token": token, "user": response.json()}
    
    # Test classes
    class TestUsers:
        def test_create_user(self, test_user):
            """Test user creation"""
            response = client.post("/users/", json=test_user)
            assert response.status_code == 200
            data = response.json()
            assert data["username"] == test_user["username"]
            assert data["email"] == test_user["email"]
            assert "id" in data
            assert "password" not in data  # Password should not be returned
    
        def test_create_duplicate_user(self, test_user):
            """Test creating duplicate user fails"""
            # Create first user
            response = client.post("/users/", json=test_user)
            assert response.status_code == 200
    
            # Try to create duplicate
            response = client.post("/users/", json=test_user)
            assert response.status_code == 400
            assert "already registered" in response.json()["detail"]
    
        def test_get_users(self, test_user):
            """Test getting users list"""
            # Create a user first
            client.post("/users/", json=test_user)
    
            response = client.get("/users/")
            assert response.status_code == 200
            data = response.json()
            assert isinstance(data, list)
            assert len(data) > 0
    
        def test_get_user_by_id(self, test_user):
            """Test getting user by ID"""
            # Create user
            create_response = client.post("/users/", json=test_user)
            user_id = create_response.json()["id"]
    
            # Get user by ID
            response = client.get(f"/users/{user_id}")
            assert response.status_code == 200
            data = response.json()
            assert data["id"] == user_id
            assert data["username"] == test_user["username"]
    
        def test_get_nonexistent_user(self):
            """Test getting non-existent user returns 404"""
            response = client.get("/users/999")
            assert response.status_code == 404
    
    class TestAuthentication:
        def test_login_success(self, test_user):
            """Test successful login"""
            # Create user first
            client.post("/users/", json=test_user)
    
            # Login
            login_data = {
                "username": test_user["username"],
                "password": test_user["password"]
            }
            response = client.post("/token", json=login_data)
            assert response.status_code == 200
            data = response.json()
            assert "access_token" in data
            assert data["token_type"] == "bearer"
    
        def test_login_wrong_password(self, test_user):
            """Test login with wrong password"""
            # Create user first
            client.post("/users/", json=test_user)
    
            # Try login with wrong password
            login_data = {
                "username": test_user["username"],
                "password": "wrongpassword"
            }
            response = client.post("/token", json=login_data)
            assert response.status_code == 401
    
        def test_login_nonexistent_user(self):
            """Test login with non-existent user"""
            login_data = {
                "username": "nonexistent",
                "password": "password"
            }
            response = client.post("/token", json=login_data)
            assert response.status_code == 401
    
        def test_protected_endpoint_without_token(self):
            """Test accessing protected endpoint without token"""
            response = client.get("/users/me")
            assert response.status_code == 401
    
        def test_protected_endpoint_with_token(self, authenticated_user):
            """Test accessing protected endpoint with valid token"""
            headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
            response = client.get("/users/me", headers=headers)
            assert response.status_code == 200
    
    class TestItems:
        def test_create_item(self, authenticated_user, test_item):
            """Test creating an item"""
            headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
            user_id = authenticated_user['user']['id']
    
            response = client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
            assert response.status_code == 200
            data = response.json()
            assert data["title"] == test_item["title"]
            assert data["description"] == test_item["description"]
            assert data["owner_id"] == user_id
    
        def test_get_items(self, authenticated_user, test_item):
            """Test getting items"""
            headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
            user_id = authenticated_user['user']['id']
    
            # Create an item first
            client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
    
            response = client.get("/items/")
            assert response.status_code == 200
            data = response.json()
            assert isinstance(data, list)
            assert len(data) > 0
    
        def test_get_item_by_id(self, authenticated_user, test_item):
            """Test getting item by ID"""
            headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
            user_id = authenticated_user['user']['id']
    
            # Create item
            create_response = client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
            item_id = create_response.json()["id"]
    
            # Get item by ID
            response = client.get(f"/items/{item_id}")
            assert response.status_code == 200
            data = response.json()
            assert data["id"] == item_id
            assert data["title"] == test_item["title"]
    
    class TestValidation:
        def test_invalid_email_format(self):
            """Test user creation with invalid email"""
            invalid_user = {
                "username": "testuser",
                "email": "invalid-email",
                "password": "password123"
            }
            response = client.post("/users/", json=invalid_user)
            assert response.status_code == 422
    
        def test_missing_required_fields(self):
            """Test user creation with missing fields"""
            incomplete_user = {
                "username": "testuser"
                # Missing email and password
            }
            response = client.post("/users/", json=incomplete_user)
            assert response.status_code == 422
    
        def test_invalid_data_types(self):
            """Test with invalid data types"""
            invalid_item = {
                "title": 123,  # Should be string
                "description": ["not", "a", "string"]  # Should be string
            }
            response = client.post("/users/1/items/", json=invalid_item)
            assert response.status_code == 422
    
    # Async tests
    @pytest.mark.asyncio
    class TestAsyncEndpoints:
        async def test_async_endpoint(self):
            """Test async endpoint functionality"""
            with TestClient(app) as client:
                response = client.get("/async-endpoint")
                assert response.status_code == 200
    
    # Database integration tests
    class TestDatabase:
        def test_database_connection(self):
            """Test database connection"""
            response = client.get("/health")
            assert response.status_code == 200
    
        def test_database_transaction_rollback(self, test_user):
            """Test database transaction rollback on error"""
            # This would test that database transactions are properly rolled back
            # when an error occurs during request processing
            pass
    
    # Performance tests
    class TestPerformance:
        def test_response_time(self, test_user):
            """Test response time is acceptable"""
            import time
    
            start_time = time.time()
            response = client.post("/users/", json=test_user)
            end_time = time.time()
    
            assert response.status_code == 200
            assert (end_time - start_time) < 1.0  # Should respond within 1 second
    
        def test_concurrent_requests(self, test_user):
            """Test handling concurrent requests"""
            import threading
    
            results = []
    
            def make_request():
                response = client.get("/users/")
                results.append(response.status_code)
    
            threads = []
            for i in range(10):
                thread = threading.Thread(target=make_request)
                threads.append(thread)
                thread.start()
    
            for thread in threads:
                thread.join()
    
            assert all(status == 200 for status in results)
    Python

    WebSocket Testing

    import pytest
    from fastapi.testclient import TestClient
    from .main import app
    import json
    
    def test_websocket_connection():
        """Test basic WebSocket connection"""
        client = TestClient(app)
    
        with client.websocket_connect("/ws") as websocket:
            # Send a message
            websocket.send_text("Hello WebSocket")
    
            # Receive response
            data = websocket.receive_text()
            assert "You wrote: Hello WebSocket" in data
    
    def test_websocket_user_connection():
        """Test WebSocket connection with user ID"""
        client = TestClient(app)
    
        with client.websocket_connect("/ws/testuser") as websocket:
            # Should receive join notification
            data = websocket.receive_text()
            assert "testuser joined the chat" in data
    
            # Send a message
            websocket.send_text("Hello from testuser")
    
            # Receive echoed message
            data = websocket.receive_text()
            assert "User testuser: Hello from testuser" in data
    
    def test_websocket_room_functionality():
        """Test WebSocket room functionality"""
        client = TestClient(app)
    
        # Connect two users to the same room
        with client.websocket_connect("/ws/room/room1/user1") as ws1:
            with client.websocket_connect("/ws/room/room1/user2") as ws2:
                # User1 should receive user2 join notification
                data = ws1.receive_text()
                message = json.loads(data)
                assert message["type"] == "user_joined"
                assert message["user_id"] == "user2"
    
                # Send message from user1
                message_data = {
                    "type": "message",
                    "message": "Hello from user1"
                }
                ws1.send_text(json.dumps(message_data))
    
                # User2 should receive the message
                data = ws2.receive_text()
                received_message = json.loads(data)
                assert received_message["user_id"] == "user1"
                assert received_message["message"] == "Hello from user1"
    
    def test_websocket_authentication():
        """Test WebSocket with authentication"""
        client = TestClient(app)
    
        # Test with invalid token
        with pytest.raises(Exception):  # Connection should be rejected
            with client.websocket_connect("/ws/secure/invalid-token"):
                pass
    
        # Test with valid token
        with client.websocket_connect("/ws/secure/token1") as websocket:
            websocket.send_text("Authenticated message")
            data = websocket.receive_text()
            assert "Authenticated user user1" in data
    
    def test_websocket_disconnect_handling():
        """Test WebSocket disconnect handling"""
        client = TestClient(app)
    
        with client.websocket_connect("/ws/room/room1/user1") as ws1:
            # Connect second user
            with client.websocket_connect("/ws/room/room1/user2") as ws2:
                # User1 receives user2 join notification
                ws1.receive_text()
    
            # User2 disconnects, user1 should receive leave notification
            # This happens automatically when the context manager exits
    
            # Note: In actual implementation, you might need to handle
            # the disconnect event explicitly in your WebSocket handler
    Python

    Chapter 9: Deployment and Production

    Environment Configuration

    from pydantic import BaseSettings, Field
    from typing import Optional
    import os
    
    class Settings(BaseSettings):
        """Application settings with environment variable support"""
    
        # Application settings
        app_name: str = Field(default="FastAPI Application", env="APP_NAME")
        app_version: str = Field(default="1.0.0", env="APP_VERSION")
        debug: bool = Field(default=False, env="DEBUG")
    
        # Database settings
        database_url: str = Field(env="DATABASE_URL")
        database_pool_size: int = Field(default=10, env="DATABASE_POOL_SIZE")
        database_max_overflow: int = Field(default=20, env="DATABASE_MAX_OVERFLOW")
    
        # Security settings
        secret_key: str = Field(env="SECRET_KEY")
        algorithm: str = Field(default="HS256", env="JWT_ALGORITHM")
        access_token_expire_minutes: int = Field(default=30, env="ACCESS_TOKEN_EXPIRE_MINUTES")
    
        # Redis settings (for caching/sessions)
        redis_url: Optional[str] = Field(default=None, env="REDIS_URL")
    
        # Email settings
        smtp_server: Optional[str] = Field(default=None, env="SMTP_SERVER")
        smtp_port: int = Field(default=587, env="SMTP_PORT")
        smtp_username: Optional[str] = Field(default=None, env="SMTP_USERNAME")
        smtp_password: Optional[str] = Field(default=None, env="SMTP_PASSWORD")
    
        # File upload settings
        max_file_size: int = Field(default=10485760, env="MAX_FILE_SIZE")  # 10MB
        upload_directory: str = Field(default="uploads", env="UPLOAD_DIRECTORY")
    
        # API settings
        api_rate_limit: int = Field(default=100, env="API_RATE_LIMIT")
        cors_origins: str = Field(default="*", env="CORS_ORIGINS")
    
        # Monitoring
        sentry_dsn: Optional[str] = Field(default=None, env="SENTRY_DSN")
        log_level: str = Field(default="INFO", env="LOG_LEVEL")
    
        class Config:
            env_file = ".env"
            case_sensitive = False
    
    # Global settings instance
    settings = Settings()
    
    # Environment-specific configurations
    class DevelopmentSettings(Settings):
        debug: bool = True
        log_level: str = "DEBUG"
    
    class ProductionSettings(Settings):
        debug: bool = False
        log_level: str = "WARNING"
    
    class TestingSettings(Settings):
        database_url: str = "sqlite:///./test.db"
        secret_key: str = "test-secret-key"
    
    def get_settings() -> Settings:
        """Get settings based on environment"""
        env = os.getenv("ENVIRONMENT", "development").lower()
    
        if env == "production":
            return ProductionSettings()
        elif env == "testing":
            return TestingSettings()
        else:
            return DevelopmentSettings()
    Python

    Production Application Structure

    from fastapi import FastAPI, Request, HTTPException
    from fastapi.middleware.cors import CORSMiddleware
    from fastapi.middleware.trustedhost import TrustedHostMiddleware
    from fastapi.middleware.gzip import GZipMiddleware
    from fastapi.responses import JSONResponse
    import logging
    import time
    import uvicorn
    
    from .config import get_settings
    from .database import engine, Base
    from .routers import users, items, auth
    from .middleware import rate_limit_middleware, logging_middleware
    
    # Initialize settings
    settings = get_settings()
    
    # Create database tables
    Base.metadata.create_all(bind=engine)
    
    # Initialize FastAPI app
    app = FastAPI(
        title=settings.app_name,
        version=settings.app_version,
        debug=settings.debug,
        docs_url="/docs" if settings.debug else None,
        redoc_url="/redoc" if settings.debug else None
    )
    
    # Configure logging
    logging.basicConfig(
        level=getattr(logging, settings.log_level),
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger(__name__)
    
    # Security middleware
    app.add_middleware(
        TrustedHostMiddleware,
        allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"]
    )
    
    # CORS middleware
    origins = settings.cors_origins.split(",") if settings.cors_origins != "*" else ["*"]
    app.add_middleware(
        CORSMiddleware,
        allow_origins=origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    # Compression middleware
    app.add_middleware(GZipMiddleware, minimum_size=1000)
    
    # Custom middleware
    app.middleware("http")(rate_limit_middleware)
    app.middleware("http")(logging_middleware)
    
    # Include routers
    app.include_router(auth.router, prefix="/auth", tags=["Authentication"])
    app.include_router(users.router, prefix="/users", tags=["Users"])
    app.include_router(items.router, prefix="/items", tags=["Items"])
    
    # Global exception handler
    @app.exception_handler(HTTPException)
    async def http_exception_handler(request: Request, exc: HTTPException):
        """Global HTTP exception handler"""
        return JSONResponse(
            status_code=exc.status_code,
            content={
                "error": {
                    "code": exc.status_code,
                    "message": exc.detail,
                    "timestamp": time.time()
                }
            }
        )
    
    @app.exception_handler(Exception)
    async def general_exception_handler(request: Request, exc: Exception):
        """Global exception handler for unhandled exceptions"""
        logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
    
        return JSONResponse(
            status_code=500,
            content={
                "error": {
                    "code": 500,
                    "message": "Internal server error" if not settings.debug else str(exc),
                    "timestamp": time.time()
                }
            }
        )
    
    # Health check endpoints
    @app.get("/health")
    async def health_check():
        """Application health check"""
        return {
            "status": "healthy",
            "version": settings.app_version,
            "timestamp": time.time()
        }
    
    @app.get("/health/db")
    async def database_health_check():
        """Database health check"""
        try:
            # Simple database query to check connection
            from .database import SessionLocal
            db = SessionLocal()
            db.execute("SELECT 1")
            db.close()
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            logger.error(f"Database health check failed: {e}")
            raise HTTPException(status_code=503, detail="Database unavailable")
    
    # Startup and shutdown events
    @app.on_event("startup")
    async def startup_event():
        """Application startup event"""
        logger.info(f"Starting {settings.app_name} v{settings.app_version}")
    
        # Initialize external services (Redis, etc.)
        if settings.redis_url:
            # Initialize Redis connection
            pass
    
        # Initialize monitoring (Sentry, etc.)
        if settings.sentry_dsn:
            import sentry_sdk
            from sentry_sdk.integrations.fastapi import FastApiIntegration
    
            sentry_sdk.init(
                dsn=settings.sentry_dsn,
                integrations=[FastApiIntegration()],
                environment=settings.environment
            )
    
    @app.on_event("shutdown")
    async def shutdown_event():
        """Application shutdown event"""
        logger.info("Shutting down application")
    
        # Cleanup resources
        # Close database connections, Redis connections, etc.
    
    if __name__ == "__main__":
        uvicorn.run(
            "main:app",
            host="0.0.0.0",
            port=8000,
            reload=settings.debug,
            workers=1 if settings.debug else 4
        )
    Python

    Docker Configuration

    FROM python:3.11-slim
    
    # Set environment variables
    ENV PYTHONDONTWRITEBYTECODE=1 \
        PYTHONUNBUFFERED=1 \
        POETRY_NO_INTERACTION=1 \
        POETRY_VENV_IN_PROJECT=1 \
        POETRY_CACHE_DIR=/tmp/poetry_cache
    
    # Install system dependencies
    RUN apt-get update && apt-get install -y \
        build-essential \
        curl \
        && rm -rf /var/lib/apt/lists/*
    
    # Install Poetry
    RUN pip install poetry
    
    # Set work directory
    WORKDIR /app
    
    # Copy dependency files
    COPY pyproject.toml poetry.lock* ./
    
    # Install dependencies
    RUN poetry install --only=main && rm -rf $POETRY_CACHE_DIR
    
    # Copy application code
    COPY . .
    
    # Create non-root user
    RUN adduser --disabled-password --gecos '' appuser && \
        chown -R appuser:appuser /app
    USER appuser
    
    # Create uploads directory
    RUN mkdir -p /app/uploads
    
    # Expose port
    EXPOSE 8000
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
        CMD curl -f http://localhost:8000/health || exit 1
    
    # Run the application
    CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
    Dockerfile
    version: '3.8'
    
    services:
      app:
        build: .
        ports:
          - "8000:8000"
        environment:
          - DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db
          - REDIS_URL=redis://redis:6379
          - SECRET_KEY=your-super-secret-key-here
          - ENVIRONMENT=production
        depends_on:
          - db
          - redis
        volumes:
          - ./uploads:/app/uploads
        networks:
          - app-network
    
      db:
        image: postgres:15
        environment:
          - POSTGRES_DB=fastapi_db
          - POSTGRES_USER=postgres
          - POSTGRES_PASSWORD=password
        volumes:
          - postgres_data:/var/lib/postgresql/data
        networks:
          - app-network
    
      redis:
        image: redis:7-alpine
        networks:
          - app-network
    
      nginx:
        image: nginx:alpine
        ports:
          - "80:80"
          - "443:443"
        volumes:
          - ./nginx.conf:/etc/nginx/nginx.conf
          - ./ssl:/etc/nginx/ssl
        depends_on:
          - app
        networks:
          - app-network
    
    volumes:
      postgres_data:
    
    networks:
      app-network:
        driver: bridge
    YAML

    Chapter 10: Best Practices and Patterns

    API Design Best Practices

    from fastapi import FastAPI, HTTPException, Query, Path, Depends
    from pydantic import BaseModel, Field
    from typing import Optional, List
    from enum import Enum
    import re
    
    app = FastAPI()
    
    # 1. Use meaningful HTTP status codes
    class UserResponse(BaseModel):
        id: int
        username: str
        email: str
        created_at: str
    
    class UserCreate(BaseModel):
        username: str = Field(..., min_length=3, max_length=20, regex=r'^[a-zA-Z0-9_]+$')
        email: str = Field(..., regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
        password: str = Field(..., min_length=8)
    
    # 2. Use consistent naming conventions
    @app.post("/users", response_model=UserResponse, status_code=201)
    async def create_user(user: UserCreate):
        """Create a new user - use plural nouns for collections"""
        # Implementation here
        pass
    
    @app.get("/users/{user_id}", response_model=UserResponse)
    async def get_user(
        user_id: int = Path(..., description="The ID of the user to retrieve", gt=0)
    ):
        """Get user by ID - use descriptive path parameters"""
        # Implementation here
        pass
    
    # 3. Use proper query parameters with validation
    @app.get("/users", response_model=List[UserResponse])
    async def list_users(
        skip: int = Query(0, ge=0, description="Number of records to skip"),
        limit: int = Query(10, ge=1, le=100, description="Maximum number of records to return"),
        search: Optional[str] = Query(None, min_length=1, max_length=100),
        sort_by: Optional[str] = Query("created_at", regex="^(username|email|created_at)$"),
        order: Optional[str] = Query("desc", regex="^(asc|desc)$")
    ):
        """List users with proper pagination and filtering"""
        # Implementation here
        pass
    
    # 4. Use enums for fixed sets of values
    class UserStatus(str, Enum):
        ACTIVE = "active"
        INACTIVE = "inactive"
        SUSPENDED = "suspended"
    
    class UserUpdate(BaseModel):
        username: Optional[str] = Field(None, min_length=3, max_length=20)
        email: Optional[str] = None
        status: Optional[UserStatus] = None
    
    @app.patch("/users/{user_id}", response_model=UserResponse)
    async def update_user(user_id: int, user_update: UserUpdate):
        """Update user - use PATCH for partial updates"""
        # Implementation here
        pass
    
    # 5. Implement proper error handling
    class ErrorResponse(BaseModel):
        error: str
        message: str
        details: Optional[dict] = None
    
    @app.exception_handler(ValueError)
    async def value_error_handler(request, exc):
        return JSONResponse(
            status_code=400,
            content=ErrorResponse(
                error="VALIDATION_ERROR",
                message=str(exc)
            ).dict()
        )
    
    # 6. Use dependency injection for common functionality
    async def get_current_user():
        """Dependency to get current authenticated user"""
        # Implementation here
        pass
    
    async def require_admin_role(current_user = Depends(get_current_user)):
        """Dependency to require admin role"""
        if not current_user.is_admin:
            raise HTTPException(status_code=403, detail="Admin role required")
        return current_user
    
    @app.delete("/users/{user_id}")
    async def delete_user(
        user_id: int,
        admin_user = Depends(require_admin_role)
    ):
        """Delete user - protected endpoint requiring admin role"""
        # Implementation here
        pass
    
    # 7. Use response models to control data exposure
    class PublicUserResponse(BaseModel):
        id: int
        username: str
        # Note: email is excluded from public response
    
    @app.get("/users/{user_id}/public", response_model=PublicUserResponse)
    async def get_public_user_profile(user_id: int):
        """Get public user profile with limited information"""
        # Implementation here
        pass
    
    # 8. Implement versioning strategy
    @app.get("/v1/users/{user_id}", response_model=UserResponse)
    async def get_user_v1(user_id: int):
        """Version 1 of get user endpoint"""
        # Implementation here
        pass
    
    # 9. Use meaningful response structures
    class PaginatedResponse(BaseModel):
        items: List[dict]
        total: int
        page: int
        page_size: int
        has_next: bool
        has_prev: bool
    
    @app.get("/users/paginated", response_model=PaginatedResponse)
    async def get_users_paginated(
        page: int = Query(1, ge=1),
        page_size: int = Query(10, ge=1, le=100)
    ):
        """Get users with proper pagination metadata"""
        # Implementation here
        pass
    
    # 10. Document your API thoroughly
    @app.post(
        "/users",
        response_model=UserResponse,
        status_code=201,
        summary="Create a new user",
        description="Create a new user account with username, email, and password",
        response_description="The created user information",
        responses={
            201: {"description": "User created successfully"},
            400: {"description": "Validation error"},
            409: {"description": "User already exists"}
        }
    )
    async def create_user_documented(user: UserCreate):
        """
        Create a new user with comprehensive documentation.
    
        - **username**: Must be 3-20 characters, alphanumeric and underscores only
        - **email**: Must be a valid email address
        - **password**: Must be at least 8 characters long
        """
        # Implementation here
        pass
    Python

    This comprehensive FastAPI book covers all the essential aspects of building modern APIs with FastAPI, from basic concepts to advanced production deployment strategies. Each chapter includes practical examples, best practices, and real-world patterns that you can apply in your projects.

    The book demonstrates how to:

    • Build robust APIs with proper validation and error handling
    • Implement authentication and authorization
    • Integrate with databases using SQLAlchemy
    • Test your applications thoroughly
    • Deploy to production with Docker and proper configuration management
    • Follow best practices for API design and code organization


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *