A Comprehensive Book on Building Modern APIs
Table of Contents
- Introduction to FastAPI
- Getting Started
- Request and Response Handling
- Data Validation with Pydantic
- Authentication and Authorization
- Database Integration
- Advanced Features
- Testing
- Deployment and Production
- Best Practices and Patterns
Chapter 1: Introduction to FastAPI
What is FastAPI?
FastAPI is a modern, fast (high-performance) web framework for building APIs with Python 3.7+ based on standard Python type hints. It’s designed to be easy to use, fast to code, and production-ready.
Key Features
- Fast: Very high performance, on par with NodeJS and Go
- Fast to code: Increase the speed to develop features by about 200% to 300%
- Fewer bugs: Reduce about 40% of human (developer) induced errors
- Intuitive: Great editor support with auto-completion everywhere
- Easy: Designed to be easy to use and learn
- Short: Minimize code duplication
- Robust: Get production-ready code with automatic interactive documentation
- Standards-based: Based on open standards for APIs: OpenAPI and JSON Schema
Architecture Overview
graph TD
A[Client Request] --> B[FastAPI Application]
B --> C[Path Operation]
C --> D[Dependency Injection]
D --> E[Request Validation]
E --> F[Business Logic]
F --> G[Response Serialization]
G --> H[Client Response]
I[Pydantic Models] --> E
I --> G
J[Database] --> F
K[Authentication] --> DChapter 2: Getting Started
Installation
# Basic installation
pip install fastapi
# With all optional dependencies
pip install "fastapi[all]"
# ASGI server for development
pip install uvicorn
# For production
pip install gunicornBashYour First FastAPI Application
from fastapi import FastAPI
app = FastAPI(
title="My First API",
description="A simple FastAPI example",
version="1.0.0"
)
@app.get("/")
async def root():
"""Root endpoint returning a welcome message"""
return {"message": "Hello World"}
@app.get("/hello/{name}")
async def say_hello(name: str):
"""Greet a specific person"""
return {"message": f"Hello {name}"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)PythonRunning the Application
# Development server
uvicorn main:app --reload
# Custom host and port
uvicorn main:app --host 0.0.0.0 --port 8000 --reloadBashInteractive Documentation
FastAPI automatically generates interactive API documentation:
- Swagger UI:
http://localhost:8000/docs - ReDoc:
http://localhost:8000/redoc - OpenAPI Schema:
http://localhost:8000/openapi.json
Chapter 3: Request and Response Handling
Path Parameters
from fastapi import FastAPI, Path
from typing import Optional
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(
user_id: int = Path(..., title="The ID of the user", ge=1)
):
"""Get user by ID with validation"""
return {"user_id": user_id}
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
"""Handle file paths with forward slashes"""
return {"file_path": file_path}
# Enum for path parameters
from enum import Enum
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
"""Path parameter with predefined values"""
if model_name == ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}PythonQuery Parameters
# filepath: query_parameters.py
from fastapi import FastAPI, Query
from typing import Optional, List
app = FastAPI()
@app.get("/items/")
async def read_items(
skip: int = 0,
limit: int = Query(10, le=100),
q: Optional[str] = Query(None, min_length=3, max_length=50)
):
"""Get items with pagination and search"""
items = [{"item_id": i} for i in range(skip, skip + limit)]
if q:
items = [item for item in items if q in str(item["item_id"])]
return {"items": items, "skip": skip, "limit": limit, "q": q}
@app.get("/items/search/")
async def search_items(
tags: List[str] = Query([]),
category: Optional[str] = None
):
"""Search items with multiple tags"""
return {"tags": tags, "category": category}
# Boolean parameters
@app.get("/items/featured/")
async def get_featured_items(
featured: bool = False,
in_stock: Optional[bool] = None
):
"""Get featured items with boolean filters"""
return {"featured": featured, "in_stock": in_stock}PythonRequest Body
from fastapi import FastAPI, Body
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
app = FastAPI()
class Item(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
price: float = Field(..., gt=0, description="Price must be greater than zero")
tax: Optional[float] = Field(None, ge=0)
tags: List[str] = []
created_at: datetime = Field(default_factory=datetime.now)
class User(BaseModel):
username: str
full_name: Optional[str] = None
email: str
@app.post("/items/")
async def create_item(item: Item):
"""Create a new item"""
return {"message": "Item created", "item": item}
@app.post("/items/{item_id}")
async def update_item(
item_id: int,
item: Item,
user: User,
importance: int = Body(..., gt=0)
):
"""Update item with multiple body parameters"""
return {
"item_id": item_id,
"item": item,
"user": user,
"importance": importance
}
# Nested models
class Image(BaseModel):
url: str
name: str
class ItemWithImages(BaseModel):
name: str
description: Optional[str] = None
price: float
images: List[Image] = []
@app.post("/items/with-images/")
async def create_item_with_images(item: ItemWithImages):
"""Create item with nested image models"""
return itemPythonResponse Models
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
class UserBase(BaseModel):
username: str
email: str
full_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserResponse(UserBase):
id: int
is_active: bool
class Config:
orm_mode = True
class ItemResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
owner_id: int
# Response model usage
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
"""Create user with password excluded from response"""
# Simulate user creation
user_dict = user.dict()
user_dict.update({"id": 1, "is_active": True})
del user_dict["password"] # Remove password from response
return user_dict
@app.get("/users/", response_model=List[UserResponse])
async def list_users():
"""List all users"""
return [
{"id": 1, "username": "john", "email": "john@example.com", "is_active": True},
{"id": 2, "username": "jane", "email": "jane@example.com", "is_active": False}
]
# Multiple response models
from fastapi.responses import JSONResponse
@app.get("/items/{item_id}")
async def get_item(item_id: int):
"""Get item with custom response handling"""
if item_id == 0:
return JSONResponse(
status_code=404,
content={"message": "Item not found"}
)
return {"id": item_id, "name": f"Item {item_id}"}PythonChapter 4: Data Validation with Pydantic
Advanced Validation
from fastapi import FastAPI
from pydantic import BaseModel, validator, Field, EmailStr
from typing import Optional, List
import re
from datetime import datetime, date
app = FastAPI()
class User(BaseModel):
username: str = Field(..., min_length=3, max_length=20)
email: EmailStr
age: int = Field(..., ge=18, le=99)
password: str = Field(..., min_length=8)
confirm_password: str
phone: Optional[str] = None
birth_date: Optional[date] = None
@validator('username')
def username_alphanumeric(cls, v):
"""Username must be alphanumeric"""
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username must be alphanumeric')
return v
@validator('password')
def validate_password(cls, v):
"""Password must contain at least one uppercase, lowercase, and digit"""
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one digit')
return v
@validator('confirm_password')
def passwords_match(cls, v, values):
"""Confirm password must match password"""
if 'password' in values and v != values['password']:
raise ValueError('Passwords do not match')
return v
@validator('phone')
def validate_phone(cls, v):
"""Validate phone number format"""
if v and not re.match(r'^\+?1?[-.\s]?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$', v):
raise ValueError('Invalid phone number format')
return v
class Product(BaseModel):
name: str
price: float = Field(..., gt=0)
category: str
tags: List[str] = []
@validator('tags')
def validate_tags(cls, v):
"""Validate tags"""
if len(v) > 5:
raise ValueError('Maximum 5 tags allowed')
return [tag.lower().strip() for tag in v]
@app.post("/users/")
async def create_user(user: User):
"""Create user with advanced validation"""
return {"message": "User created successfully", "username": user.username}
@app.post("/products/")
async def create_product(product: Product):
"""Create product with validation"""
return productPythonCustom Validators and Field Types
from fastapi import FastAPI
from pydantic import BaseModel, validator, root_validator, Field
from typing import Optional, Dict, Any
from datetime import datetime
import json
app = FastAPI()
class EventData(BaseModel):
event_type: str
timestamp: datetime
data: Dict[str, Any]
user_id: Optional[int] = None
@validator('event_type')
def validate_event_type(cls, v):
"""Validate event type"""
allowed_types = ['click', 'view', 'purchase', 'signup']
if v not in allowed_types:
raise ValueError(f'Event type must be one of: {allowed_types}')
return v
@validator('data')
def validate_data(cls, v, values):
"""Validate data based on event type"""
event_type = values.get('event_type')
if event_type == 'purchase':
required_fields = ['amount', 'currency', 'product_id']
for field in required_fields:
if field not in v:
raise ValueError(f'Purchase events must include {field}')
return v
@root_validator
def validate_user_context(cls, values):
"""Root validator for complex validation logic"""
event_type = values.get('event_type')
user_id = values.get('user_id')
# Some events require user authentication
if event_type in ['purchase'] and not user_id:
raise ValueError(f'{event_type} events require user_id')
return values
# Custom field types
class JSONField(str):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except json.JSONDecodeError:
raise ValueError('Invalid JSON')
return v
class Configuration(BaseModel):
name: str
settings: JSONField
enabled: bool = True
@app.post("/events/")
async def create_event(event: EventData):
"""Create event with custom validation"""
return {"message": "Event created", "event": event}
@app.post("/configurations/")
async def create_configuration(config: Configuration):
"""Create configuration with JSON field"""
return configPythonChapter 5: Authentication and Authorization
Basic Authentication
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
app = FastAPI()
security = HTTPBasic()
def get_current_username(credentials: HTTPBasicCredentials = Depends(security)):
"""Validate basic authentication credentials"""
correct_username = secrets.compare_digest(credentials.username, "admin")
correct_password = secrets.compare_digest(credentials.password, "secret")
if not (correct_username and correct_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Basic"},
)
return credentials.username
@app.get("/protected")
async def protected_route(username: str = Depends(get_current_username)):
"""Protected route requiring basic authentication"""
return {"message": f"Hello {username}, this is a protected route"}PythonJWT Authentication
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from datetime import datetime, timedelta
import jwt
from passlib.context import CryptContext
from typing import Optional
app = FastAPI()
# Configuration
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Security
security = HTTPBearer()
# Models
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class UserLogin(BaseModel):
username: str
password: str
# Fake database
fake_users_db = {
"testuser": {
"username": "testuser",
"full_name": "Test User",
"email": "testuser@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # secret
"disabled": False,
}
}
# Utility functions
def verify_password(plain_password, hashed_password):
"""Verify a password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""Hash a password"""
return pwd_context.hash(password)
def get_user(db, username: str):
"""Get user from database"""
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(username: str, password: str):
"""Authenticate user credentials"""
user = get_user(fake_users_db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current user from JWT token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except jwt.PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=username)
if user is None:
raise credentials_exception
return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
"""Get current active user"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Routes
@app.post("/token", response_model=Token)
async def login(user_credentials: UserLogin):
"""Login endpoint to get access token"""
user = authenticate_user(user_credentials.username, user_credentials.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""Get current user information"""
return current_user
@app.get("/protected")
async def protected_route(current_user: User = Depends(get_current_active_user)):
"""Protected route requiring JWT authentication"""
return {"message": f"Hello {current_user.username}, this is a protected route"}PythonRole-Based Access Control
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer
from pydantic import BaseModel
from enum import Enum
from typing import List, Optional
app = FastAPI()
security = HTTPBearer()
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class Permission(str, Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class User(BaseModel):
username: str
email: str
roles: List[Role]
permissions: List[Permission]
# Role-permission mapping
ROLE_PERMISSIONS = {
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.USER: [Permission.READ, Permission.WRITE]
}
def get_current_user() -> User:
"""Mock function to get current user"""
# In real implementation, this would decode JWT and fetch user
return User(
username="testuser",
email="test@example.com",
roles=[Role.USER],
permissions=ROLE_PERMISSIONS[Role.USER]
)
def require_permission(permission: Permission):
"""Dependency to require specific permission"""
def check_permission(current_user: User = Depends(get_current_user)):
if permission not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return current_user
return check_permission
def require_role(role: Role):
"""Dependency to require specific role"""
def check_role(current_user: User = Depends(get_current_user)):
if role not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{role}' required"
)
return current_user
return check_role
def require_any_role(roles: List[Role]):
"""Dependency to require any of the specified roles"""
def check_roles(current_user: User = Depends(get_current_user)):
if not any(role in current_user.roles for role in roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of roles {roles} required"
)
return current_user
return check_roles
# Protected routes
@app.get("/public")
async def public_endpoint():
"""Public endpoint - no authentication required"""
return {"message": "This is a public endpoint"}
@app.get("/user-data")
async def get_user_data(user: User = Depends(require_permission(Permission.READ))):
"""Endpoint requiring read permission"""
return {"message": "User data", "user": user.username}
@app.post("/create-item")
async def create_item(user: User = Depends(require_permission(Permission.WRITE))):
"""Endpoint requiring write permission"""
return {"message": "Item created", "created_by": user.username}
@app.delete("/delete-item/{item_id}")
async def delete_item(
item_id: int,
user: User = Depends(require_permission(Permission.DELETE))
):
"""Endpoint requiring delete permission"""
return {"message": f"Item {item_id} deleted", "deleted_by": user.username}
@app.get("/admin-panel")
async def admin_panel(user: User = Depends(require_role(Role.ADMIN))):
"""Admin-only endpoint"""
return {"message": "Welcome to admin panel", "admin": user.username}
@app.get("/moderation")
async def moderation_panel(
user: User = Depends(require_any_role([Role.ADMIN, Role.MODERATOR]))
):
"""Endpoint accessible by admins and moderators"""
return {"message": "Moderation panel", "user": user.username, "roles": user.roles}PythonChapter 6: Database Integration
SQLAlchemy Setup
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from datetime import datetime
# Database URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./app.db"
# For PostgreSQL: "postgresql://user:password@localhost/dbname"
# For MySQL: "mysql://user:password@localhost/dbname"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # Only for SQLite
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency to get database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Database models
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
owner = relationship("User", back_populates="items")
# Create tables
Base.metadata.create_all(bind=engine)PythonCRUD Operations
# filepath: crud.py
from sqlalchemy.orm import Session
from . import models, schemas
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# User CRUD operations
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_user_by_username(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
hashed_password = get_password_hash(user.password)
db_user = models.User(
username=user.username,
email=user.email,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
# Item CRUD operations
def get_items(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
def get_item(db: Session, item_id: int):
return db.query(models.Item).filter(models.Item.id == item_id).first()
def get_items_by_user(db: Session, user_id: int, skip: int = 0, limit: int = 100):
return db.query(models.Item).filter(
models.Item.owner_id == user_id
).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def update_item(db: Session, item_id: int, item: schemas.ItemUpdate):
db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
if db_item:
for key, value in item.dict(exclude_unset=True).items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
def delete_item(db: Session, item_id: int):
db_item = db.query(models.Item).filter(models.Item.id == item_id).first()
if db_item:
db.delete(db_item)
db.commit()
return db_itemPythonPydantic Schemas
from pydantic import BaseModel, EmailStr
from typing import List, Optional
from datetime import datetime
# User schemas
class UserBase(BaseModel):
username: str
email: EmailStr
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class User(UserBase):
id: int
is_active: bool
created_at: datetime
items: List["Item"] = []
class Config:
orm_mode = True
# Item schemas
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
class ItemCreate(ItemBase):
pass
class ItemUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
class Item(ItemBase):
id: int
owner_id: int
created_at: datetime
owner: User
class Config:
orm_mode = True
# Update forward references
User.update_forward_refs()PythonAPI Routes with Database
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from . import crud, models, schemas
from .database import SessionLocal, engine, get_db
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="FastAPI with Database", version="1.0.0")
# User endpoints
@app.post("/users/", response_model=schemas.User)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
"""Create a new user"""
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="Username already taken")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
async def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get all users"""
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
async def read_user(user_id: int, db: Session = Depends(get_db)):
"""Get user by ID"""
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
# Item endpoints
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
async def create_item_for_user(
user_id: int,
item: schemas.ItemCreate,
db: Session = Depends(get_db)
):
"""Create item for user"""
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=List[schemas.Item])
async def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""Get all items"""
items = crud.get_items(db, skip=skip, limit=limit)
return items
@app.get("/items/{item_id}", response_model=schemas.Item)
async def read_item(item_id: int, db: Session = Depends(get_db)):
"""Get item by ID"""
db_item = crud.get_item(db, item_id=item_id)
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
return db_item
@app.put("/items/{item_id}", response_model=schemas.Item)
async def update_item_endpoint(
item_id: int,
item: schemas.ItemUpdate,
db: Session = Depends(get_db)
):
"""Update item"""
db_item = crud.get_item(db, item_id=item_id)
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
return crud.update_item(db=db, item_id=item_id, item=item)
@app.delete("/items/{item_id}")
async def delete_item_endpoint(item_id: int, db: Session = Depends(get_db)):
"""Delete item"""
db_item = crud.get_item(db, item_id=item_id)
if db_item is None:
raise HTTPException(status_code=404, detail="Item not found")
crud.delete_item(db=db, item_id=item_id)
return {"message": "Item deleted successfully"}PythonChapter 7: Advanced Features
Dependency Injection System
from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Optional
import asyncio
app = FastAPI()
# Simple dependency
async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
"""Common query parameters for pagination and search"""
return {"q": q, "skip": skip, "limit": limit}
# Database dependency
class DatabaseConnection:
def __init__(self):
self.connection = None
async def connect(self):
"""Simulate database connection"""
await asyncio.sleep(0.1)
self.connection = "connected"
return self
async def disconnect(self):
"""Simulate database disconnection"""
self.connection = None
async def get_database():
"""Database dependency with connection management"""
db = DatabaseConnection()
try:
await db.connect()
yield db
finally:
await db.disconnect()
# Authentication dependency
async def get_api_key(x_api_key: str = Header(...)):
"""API key authentication dependency"""
valid_api_keys = ["secret-key-1", "secret-key-2"]
if x_api_key not in valid_api_keys:
raise HTTPException(status_code=403, detail="Invalid API Key")
return x_api_key
# Rate limiting dependency
class RateLimiter:
def __init__(self, max_requests: int = 10):
self.max_requests = max_requests
self.requests = {}
async def __call__(self, client_ip: str = Header(None, alias="x-forwarded-for")):
"""Rate limiting dependency"""
if not client_ip:
client_ip = "unknown"
if client_ip not in self.requests:
self.requests[client_ip] = 0
self.requests[client_ip] += 1
if self.requests[client_ip] > self.max_requests:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return client_ip
rate_limiter = RateLimiter(max_requests=5)
# Subdependencies
async def get_user_from_token(api_key: str = Depends(get_api_key)):
"""Get user information from API key"""
# Simulate user lookup
user_map = {
"secret-key-1": {"id": 1, "username": "user1"},
"secret-key-2": {"id": 2, "username": "user2"}
}
return user_map.get(api_key, {"id": 0, "username": "guest"})
# Routes using dependencies
@app.get("/items/")
async def read_items(
commons: dict = Depends(common_parameters),
db: DatabaseConnection = Depends(get_database),
user: dict = Depends(get_user_from_token),
client_ip: str = Depends(rate_limiter)
):
"""Get items with multiple dependencies"""
return {
"items": [f"item-{i}" for i in range(commons["skip"], commons["skip"] + commons["limit"])],
"search": commons["q"],
"database": db.connection,
"user": user,
"client_ip": client_ip
}
# Dependency with parameters
def pagination_params(max_limit: int = 100):
"""Dependency factory with parameters"""
async def paginate(skip: int = 0, limit: int = 50):
if limit > max_limit:
limit = max_limit
return {"skip": skip, "limit": limit}
return paginate
@app.get("/users/")
async def read_users(pagination: dict = Depends(pagination_params(max_limit=50))):
"""Get users with custom pagination"""
return {
"users": [f"user-{i}" for i in range(pagination["skip"], pagination["skip"] + pagination["limit"])],
"pagination": pagination
}
# Class-based dependencies
class UserService:
def __init__(self, db: DatabaseConnection = Depends(get_database)):
self.db = db
async def get_user(self, user_id: int):
"""Get user by ID"""
return {"id": user_id, "username": f"user-{user_id}", "db": self.db.connection}
@app.get("/users/{user_id}")
async def get_user(user_id: int, user_service: UserService = Depends()):
"""Get user using class-based dependency"""
return await user_service.get_user(user_id)PythonBackground Tasks
from fastapi import FastAPI, BackgroundTasks, Depends
from pydantic import BaseModel, EmailStr
import asyncio
import logging
from typing import List
import smtplib
from email.mime.text import MIMEText
app = FastAPI()
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Models
class EmailSchema(BaseModel):
to: List[EmailStr]
subject: str
body: str
class NotificationSchema(BaseModel):
user_id: int
message: str
type: str = "info"
# Background task functions
async def send_email(email: EmailSchema):
"""Send email in background"""
logger.info(f"Sending email to {email.to}")
# Simulate email sending delay
await asyncio.sleep(2)
# In real implementation, you would use an email service
try:
# Example with SMTP (configure according to your email provider)
# msg = MIMEText(email.body)
# msg['Subject'] = email.subject
# msg['From'] = "noreply@example.com"
# msg['To'] = ", ".join(email.to)
# with smtplib.SMTP('localhost') as server:
# server.send_message(msg)
logger.info(f"Email sent successfully to {email.to}")
except Exception as e:
logger.error(f"Failed to send email: {e}")
async def process_notification(notification: NotificationSchema):
"""Process notification in background"""
logger.info(f"Processing notification for user {notification.user_id}")
# Simulate processing time
await asyncio.sleep(1)
# Here you might save to database, send push notification, etc.
logger.info(f"Notification processed: {notification.message}")
def write_log(message: str):
"""Write log entry (non-async background task)"""
with open("app.log", "a") as log_file:
log_file.write(f"{message}\n")
async def cleanup_temp_files():
"""Cleanup temporary files"""
logger.info("Starting cleanup of temporary files")
await asyncio.sleep(3) # Simulate cleanup time
logger.info("Temporary files cleaned up")
# Routes with background tasks
@app.post("/send-email/")
async def send_email_endpoint(
email: EmailSchema,
background_tasks: BackgroundTasks
):
"""Send email endpoint with background task"""
background_tasks.add_task(send_email, email)
background_tasks.add_task(write_log, f"Email queued for {email.to}")
return {"message": "Email queued for sending"}
@app.post("/notify/")
async def send_notification(
notification: NotificationSchema,
background_tasks: BackgroundTasks
):
"""Send notification with background processing"""
background_tasks.add_task(process_notification, notification)
return {"message": "Notification queued for processing"}
@app.post("/users/")
async def create_user(
user_data: dict,
background_tasks: BackgroundTasks
):
"""Create user with welcome email and cleanup"""
# Process user creation (simulate)
user_id = 123
# Add multiple background tasks
welcome_email = EmailSchema(
to=[user_data["email"]],
subject="Welcome!",
body=f"Welcome {user_data['name']} to our service!"
)
background_tasks.add_task(send_email, welcome_email)
background_tasks.add_task(cleanup_temp_files)
background_tasks.add_task(write_log, f"User created: {user_id}")
return {"user_id": user_id, "message": "User created successfully"}
# Background task with dependency injection
async def get_user_service():
"""Mock user service dependency"""
return {"service": "user_service"}
@app.post("/users/{user_id}/welcome")
async def send_welcome_message(
user_id: int,
background_tasks: BackgroundTasks,
user_service: dict = Depends(get_user_service)
):
"""Send welcome message with dependency injection"""
def send_welcome_with_service(user_id: int, service: dict):
"""Background task that uses dependency"""
logger.info(f"Sending welcome message to user {user_id} using {service}")
# Process with service
background_tasks.add_task(send_welcome_with_service, user_id, user_service)
return {"message": "Welcome message queued"}PythonFile Uploads and Downloads
from fastapi import FastAPI, File, UploadFile, HTTPException, Response
from fastapi.responses import FileResponse, StreamingResponse
from typing import List
import shutil
import os
import aiofiles
from pathlib import Path
import mimetypes
import zipfile
import io
app = FastAPI()
# Configuration
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".pdf", ".txt", ".csv"}
# Utility functions
def validate_file(file: UploadFile):
"""Validate uploaded file"""
# Check file extension
file_extension = Path(file.filename).suffix.lower()
if file_extension not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File type {file_extension} not allowed. Allowed types: {ALLOWED_EXTENSIONS}"
)
# Check file size (this is approximate, actual size check happens during upload)
if hasattr(file.file, 'seek') and hasattr(file.file, 'tell'):
file.file.seek(0, 2) # Seek to end
size = file.file.tell()
file.file.seek(0) # Seek back to beginning
if size > MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"File too large. Maximum size: {MAX_FILE_SIZE} bytes"
)
# Single file upload
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
"""Upload a single file"""
validate_file(file)
file_path = UPLOAD_DIR / file.filename
try:
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
return {
"filename": file.filename,
"size": len(content),
"content_type": file.content_type,
"path": str(file_path)
}
# Multiple file upload
@app.post("/upload-multiple/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
"""Upload multiple files"""
if len(files) > 10:
raise HTTPException(status_code=400, detail="Maximum 10 files allowed")
uploaded_files = []
for file in files:
validate_file(file)
file_path = UPLOAD_DIR / file.filename
try:
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
uploaded_files.append({
"filename": file.filename,
"size": len(content),
"content_type": file.content_type,
"path": str(file_path)
})
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file {file.filename}: {e}")
return {"uploaded_files": uploaded_files}
# File download
@app.get("/download/{filename}")
async def download_file(filename: str):
"""Download a file"""
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# Get MIME type
mime_type, _ = mimetypes.guess_type(str(file_path))
return FileResponse(
path=file_path,
filename=filename,
media_type=mime_type
)
# Stream large file download
@app.get("/stream/{filename}")
async def stream_file(filename: str):
"""Stream a large file"""
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
async def file_generator():
async with aiofiles.open(file_path, 'rb') as f:
while chunk := await f.read(8192): # Read in 8KB chunks
yield chunk
mime_type, _ = mimetypes.guess_type(str(file_path))
return StreamingResponse(
file_generator(),
media_type=mime_type,
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# Create and download ZIP file
@app.get("/download-zip/")
async def download_zip():
"""Create and download a ZIP file of all uploads"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for file_path in UPLOAD_DIR.iterdir():
if file_path.is_file():
zip_file.write(file_path, file_path.name)
zip_buffer.seek(0)
return StreamingResponse(
io.BytesIO(zip_buffer.read()),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=uploads.zip"}
)
# File information
@app.get("/files/")
async def list_files():
"""List all uploaded files"""
files = []
for file_path in UPLOAD_DIR.iterdir():
if file_path.is_file():
stat = file_path.stat()
mime_type, _ = mimetypes.guess_type(str(file_path))
files.append({
"filename": file_path.name,
"size": stat.st_size,
"created": stat.st_ctime,
"mime_type": mime_type
})
return {"files": files}
# Delete file
@app.delete("/files/{filename}")
async def delete_file(filename: str):
"""Delete a file"""
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
try:
file_path.unlink()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting file: {e}")
return {"message": f"File {filename} deleted successfully"}
# Upload with form data
@app.post("/upload-with-metadata/")
async def upload_with_metadata(
file: UploadFile = File(...),
description: str = None,
tags: str = None
):
"""Upload file with additional metadata"""
validate_file(file)
file_path = UPLOAD_DIR / file.filename
try:
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
# Save metadata (in real app, you'd save to database)
metadata = {
"filename": file.filename,
"size": len(content),
"content_type": file.content_type,
"description": description,
"tags": tags.split(",") if tags else []
}
return metadataPythonWebSocket Support
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.responses import HTMLResponse
from typing import List, Dict
import json
import asyncio
from datetime import datetime
app = FastAPI()
# Connection manager for WebSocket connections
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, WebSocket] = {}
self.room_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str = None):
"""Connect a new WebSocket"""
await websocket.accept()
self.active_connections.append(websocket)
if user_id:
self.user_connections[user_id] = websocket
def disconnect(self, websocket: WebSocket, user_id: str = None):
"""Disconnect a WebSocket"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if user_id and user_id in self.user_connections:
del self.user_connections[user_id]
# Remove from all rooms
for room_id, connections in self.room_connections.items():
if websocket in connections:
connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
"""Send message to specific WebSocket"""
await websocket.send_text(message)
async def send_personal_message_to_user(self, message: str, user_id: str):
"""Send message to specific user"""
if user_id in self.user_connections:
websocket = self.user_connections[user_id]
await websocket.send_text(message)
async def broadcast(self, message: str):
"""Broadcast message to all connected clients"""
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove broken connections
self.active_connections.remove(connection)
async def join_room(self, websocket: WebSocket, room_id: str):
"""Join a room"""
if room_id not in self.room_connections:
self.room_connections[room_id] = []
self.room_connections[room_id].append(websocket)
async def leave_room(self, websocket: WebSocket, room_id: str):
"""Leave a room"""
if room_id in self.room_connections:
if websocket in self.room_connections[room_id]:
self.room_connections[room_id].remove(websocket)
async def send_to_room(self, message: str, room_id: str):
"""Send message to all users in a room"""
if room_id in self.room_connections:
for connection in self.room_connections[room_id]:
try:
await connection.send_text(message)
except:
# Remove broken connections
self.room_connections[room_id].remove(connection)
manager = ConnectionManager()
# Simple WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Basic WebSocket endpoint"""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
# WebSocket with user identification
@app.websocket("/ws/{user_id}")
async def websocket_user_endpoint(websocket: WebSocket, user_id: str):
"""WebSocket endpoint with user identification"""
await manager.connect(websocket, user_id)
# Notify others about new user
await manager.broadcast(f"User {user_id} joined the chat")
try:
while True:
data = await websocket.receive_text()
message = f"User {user_id}: {data}"
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
await manager.broadcast(f"User {user_id} left the chat")
# WebSocket chat room
@app.websocket("/ws/room/{room_id}/{user_id}")
async def websocket_room_endpoint(websocket: WebSocket, room_id: str, user_id: str):
"""WebSocket endpoint for chat rooms"""
await manager.connect(websocket, user_id)
await manager.join_room(websocket, room_id)
# Notify room about new user
join_message = json.dumps({
"type": "user_joined",
"user_id": user_id,
"message": f"{user_id} joined room {room_id}",
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(join_message, room_id)
try:
while True:
data = await websocket.receive_text()
try:
# Try to parse JSON message
message_data = json.loads(data)
message_type = message_data.get("type", "message")
if message_type == "message":
# Regular chat message
response = json.dumps({
"type": "message",
"user_id": user_id,
"message": message_data.get("message", ""),
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(response, room_id)
elif message_type == "typing":
# Typing indicator
response = json.dumps({
"type": "typing",
"user_id": user_id,
"typing": message_data.get("typing", False)
})
await manager.send_to_room(response, room_id)
except json.JSONDecodeError:
# Handle plain text messages
response = json.dumps({
"type": "message",
"user_id": user_id,
"message": data,
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(response, room_id)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
await manager.leave_room(websocket, room_id)
# Notify room about user leaving
leave_message = json.dumps({
"type": "user_left",
"user_id": user_id,
"message": f"{user_id} left room {room_id}",
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(leave_message, room_id)
# REST endpoint to send message to user
@app.post("/send-message/{user_id}")
async def send_message_to_user(user_id: str, message: dict):
"""Send message to specific user via REST"""
await manager.send_personal_message_to_user(
json.dumps(message),
user_id
)
return {"message": f"Message sent to user {user_id}"}
# REST endpoint to broadcast message
@app.post("/broadcast/")
async def broadcast_message(message: dict):
"""Broadcast message to all connected users"""
await manager.broadcast(json.dumps(message))
return {"message": "Message broadcasted"}
# REST endpoint to send message to room
@app.post("/room/{room_id}/message/")
async def send_message_to_room(room_id: str, message: dict):
"""Send message to specific room"""
await manager.send_to_room(json.dumps(message), room_id)
return {"message": f"Message sent to room {room_id}"}
# WebSocket with authentication
async def get_user_from_token(token: str):
"""Mock function to get user from token"""
# In real implementation, validate JWT token
valid_tokens = {"user1": "token1", "user2": "token2"}
for user, valid_token in valid_tokens.items():
if token == valid_token:
return user
return None
@app.websocket("/ws/secure/{token}")
async def secure_websocket_endpoint(websocket: WebSocket, token: str):
"""Secure WebSocket endpoint with authentication"""
user_id = await get_user_from_token(token)
if not user_id:
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
response = f"Authenticated user {user_id}: {data}"
await manager.send_personal_message(response, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
# HTML client for testing
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="Type a message...">
<button onclick# filepath: websockets.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from fastapi.responses import HTMLResponse
from typing import List, Dict
import json
import asyncio
from datetime import datetime
app = FastAPI()
# Connection manager for WebSocket connections
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, WebSocket] = {}
self.room_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str = None):
"""Connect a new WebSocket"""
await websocket.accept()
self.active_connections.append(websocket)
if user_id:
self.user_connections[user_id] = websocket
def disconnect(self, websocket: WebSocket, user_id: str = None):
"""Disconnect a WebSocket"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if user_id and user_id in self.user_connections:
del self.user_connections[user_id]
# Remove from all rooms
for room_id, connections in self.room_connections.items():
if websocket in connections:
connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
"""Send message to specific WebSocket"""
await websocket.send_text(message)
async def send_personal_message_to_user(self, message: str, user_id: str):
"""Send message to specific user"""
if user_id in self.user_connections:
websocket = self.user_connections[user_id]
await websocket.send_text(message)
async def broadcast(self, message: str):
"""Broadcast message to all connected clients"""
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
# Remove broken connections
self.active_connections.remove(connection)
async def join_room(self, websocket: WebSocket, room_id: str):
"""Join a room"""
if room_id not in self.room_connections:
self.room_connections[room_id] = []
self.room_connections[room_id].append(websocket)
async def leave_room(self, websocket: WebSocket, room_id: str):
"""Leave a room"""
if room_id in self.room_connections:
if websocket in self.room_connections[room_id]:
self.room_connections[room_id].remove(websocket)
async def send_to_room(self, message: str, room_id: str):
"""Send message to all users in a room"""
if room_id in self.room_connections:
for connection in self.room_connections[room_id]:
try:
await connection.send_text(message)
except:
# Remove broken connections
self.room_connections[room_id].remove(connection)
manager = ConnectionManager()
# Simple WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Basic WebSocket endpoint"""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
# WebSocket with user identification
@app.websocket("/ws/{user_id}")
async def websocket_user_endpoint(websocket: WebSocket, user_id: str):
"""WebSocket endpoint with user identification"""
await manager.connect(websocket, user_id)
# Notify others about new user
await manager.broadcast(f"User {user_id} joined the chat")
try:
while True:
data = await websocket.receive_text()
message = f"User {user_id}: {data}"
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
await manager.broadcast(f"User {user_id} left the chat")
# WebSocket chat room
@app.websocket("/ws/room/{room_id}/{user_id}")
async def websocket_room_endpoint(websocket: WebSocket, room_id: str, user_id: str):
"""WebSocket endpoint for chat rooms"""
await manager.connect(websocket, user_id)
await manager.join_room(websocket, room_id)
# Notify room about new user
join_message = json.dumps({
"type": "user_joined",
"user_id": user_id,
"message": f"{user_id} joined room {room_id}",
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(join_message, room_id)
try:
while True:
data = await websocket.receive_text()
try:
# Try to parse JSON message
message_data = json.loads(data)
message_type = message_data.get("type", "message")
if message_type == "message":
# Regular chat message
response = json.dumps({
"type": "message",
"user_id": user_id,
"message": message_data.get("message", ""),
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(response, room_id)
elif message_type == "typing":
# Typing indicator
response = json.dumps({
"type": "typing",
"user_id": user_id,
"typing": message_data.get("typing", False)
})
await manager.send_to_room(response, room_id)
except json.JSONDecodeError:
# Handle plain text messages
response = json.dumps({
"type": "message",
"user_id": user_id,
"message": data,
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(response, room_id)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
await manager.leave_room(websocket, room_id)
# Notify room about user leaving
leave_message = json.dumps({
"type": "user_left",
"user_id": user_id,
"message": f"{user_id} left room {room_id}",
"timestamp": datetime.now().isoformat()
})
await manager.send_to_room(leave_message, room_id)
# REST endpoint to send message to user
@app.post("/send-message/{user_id}")
async def send_message_to_user(user_id: str, message: dict):
"""Send message to specific user via REST"""
await manager.send_personal_message_to_user(
json.dumps(message),
user_id
)
return {"message": f"Message sent to user {user_id}"}
# REST endpoint to broadcast message
@app.post("/broadcast/")
async def broadcast_message(message: dict):
"""Broadcast message to all connected users"""
await manager.broadcast(json.dumps(message))
return {"message": "Message broadcasted"}
# REST endpoint to send message to room
@app.post("/room/{room_id}/message/")
async def send_message_to_room(room_id: str, message: dict):
"""Send message to specific room"""
await manager.send_to_room(json.dumps(message), room_id)
return {"message": f"Message sent to room {room_id}"}
# WebSocket with authentication
async def get_user_from_token(token: str):
"""Mock function to get user from token"""
# In real implementation, validate JWT token
valid_tokens = {"user1": "token1", "user2": "token2"}
for user, valid_token in valid_tokens.items():
if token == valid_token:
return user
return None
@app.websocket("/ws/secure/{token}")
async def secure_websocket_endpoint(websocket: WebSocket, token: str):
"""Secure WebSocket endpoint with authentication"""
user_id = await get_user_from_token(token)
if not user_id:
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
response = f"Authenticated user {user_id}: {data}"
await manager.send_personal_message(response, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
"""
# HTML client for testing
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
#messages { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; margin: 10px 0; }
input, button { padding: 5px; margin: 5px; }
.message { margin: 5px 0; }
.user-joined { color: green; }
.user-left { color: red; }
</style>
</head>
<body>
<h1>WebSocket Chat Test</h1>
<div>
<input type="text" id="userIdInput" placeholder="Enter your user ID" value="user1">
<input type="text" id="roomIdInput" placeholder="Enter room ID" value="room1">
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>
</div>
<div id="messages"></div>
<div>
<input type="text" id="messageInput" placeholder="Type a message..." onkeypress="handleKeyPress(event)">
<button onclick="sendMessage()">Send</button>
</div>
<script>
let ws = null;
let userId = '';
let roomId = '';
function connect() {
userId = document.getElementById('userIdInput').value;
roomId = document.getElementById('roomIdInput').value;
if (!userId || !roomId) {
alert('Please enter both user ID and room ID');
return;
}
ws = new WebSocket(`ws://localhost:8000/ws/room/${roomId}/${userId}`);
ws.onopen = function(event) {
addMessage('Connected to chat room', 'system');
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
addMessage(`${data.user_id}: ${data.message}`, data.type);
};
ws.onclose = function(event) {
addMessage('Disconnected from chat room', 'system');
};
ws.onerror = function(error) {
addMessage('Error: ' + error, 'error');
};
}
function disconnect() {
if (ws) {
ws.close();
ws = null;
}
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message && ws && ws.readyState === WebSocket.OPEN) {
const messageData = {
type: 'message',
message: message
};
ws.send(JSON.stringify(messageData));
input.value = '';
}
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
function addMessage(message, type = 'message') {
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = `message ${type}`;
messageDiv.textContent = `${new Date().toLocaleTimeString()}: ${message}`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
</script>
</body>
</html>"""Python@app.get("/")
async def get_websocket_client():
"""Serve WebSocket test client"""
return HTMLResponse(html)PythonChapter 8: Testing
Test Setup and Configuration
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from .main import app, get_db
from .database import Base
# Test database setup
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# Test fixtures
@pytest.fixture
def test_user():
return {
"username": "testuser",
"email": "test@example.com",
"password": "testpass123"
}
@pytest.fixture
def test_item():
return {
"title": "Test Item",
"description": "A test item"
}
@pytest.fixture
def authenticated_user(test_user):
# Create user
response = client.post("/users/", json=test_user)
assert response.status_code == 200
# Login and get token
login_data = {
"username": test_user["username"],
"password": test_user["password"]
}
response = client.post("/token", json=login_data)
assert response.status_code == 200
token = response.json()["access_token"]
return {"token": token, "user": response.json()}
# Test classes
class TestUsers:
def test_create_user(self, test_user):
"""Test user creation"""
response = client.post("/users/", json=test_user)
assert response.status_code == 200
data = response.json()
assert data["username"] == test_user["username"]
assert data["email"] == test_user["email"]
assert "id" in data
assert "password" not in data # Password should not be returned
def test_create_duplicate_user(self, test_user):
"""Test creating duplicate user fails"""
# Create first user
response = client.post("/users/", json=test_user)
assert response.status_code == 200
# Try to create duplicate
response = client.post("/users/", json=test_user)
assert response.status_code == 400
assert "already registered" in response.json()["detail"]
def test_get_users(self, test_user):
"""Test getting users list"""
# Create a user first
client.post("/users/", json=test_user)
response = client.get("/users/")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) > 0
def test_get_user_by_id(self, test_user):
"""Test getting user by ID"""
# Create user
create_response = client.post("/users/", json=test_user)
user_id = create_response.json()["id"]
# Get user by ID
response = client.get(f"/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
assert data["username"] == test_user["username"]
def test_get_nonexistent_user(self):
"""Test getting non-existent user returns 404"""
response = client.get("/users/999")
assert response.status_code == 404
class TestAuthentication:
def test_login_success(self, test_user):
"""Test successful login"""
# Create user first
client.post("/users/", json=test_user)
# Login
login_data = {
"username": test_user["username"],
"password": test_user["password"]
}
response = client.post("/token", json=login_data)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
def test_login_wrong_password(self, test_user):
"""Test login with wrong password"""
# Create user first
client.post("/users/", json=test_user)
# Try login with wrong password
login_data = {
"username": test_user["username"],
"password": "wrongpassword"
}
response = client.post("/token", json=login_data)
assert response.status_code == 401
def test_login_nonexistent_user(self):
"""Test login with non-existent user"""
login_data = {
"username": "nonexistent",
"password": "password"
}
response = client.post("/token", json=login_data)
assert response.status_code == 401
def test_protected_endpoint_without_token(self):
"""Test accessing protected endpoint without token"""
response = client.get("/users/me")
assert response.status_code == 401
def test_protected_endpoint_with_token(self, authenticated_user):
"""Test accessing protected endpoint with valid token"""
headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
response = client.get("/users/me", headers=headers)
assert response.status_code == 200
class TestItems:
def test_create_item(self, authenticated_user, test_item):
"""Test creating an item"""
headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
user_id = authenticated_user['user']['id']
response = client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
assert response.status_code == 200
data = response.json()
assert data["title"] == test_item["title"]
assert data["description"] == test_item["description"]
assert data["owner_id"] == user_id
def test_get_items(self, authenticated_user, test_item):
"""Test getting items"""
headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
user_id = authenticated_user['user']['id']
# Create an item first
client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
response = client.get("/items/")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) > 0
def test_get_item_by_id(self, authenticated_user, test_item):
"""Test getting item by ID"""
headers = {"Authorization": f"Bearer {authenticated_user['token']}"}
user_id = authenticated_user['user']['id']
# Create item
create_response = client.post(f"/users/{user_id}/items/", json=test_item, headers=headers)
item_id = create_response.json()["id"]
# Get item by ID
response = client.get(f"/items/{item_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == item_id
assert data["title"] == test_item["title"]
class TestValidation:
def test_invalid_email_format(self):
"""Test user creation with invalid email"""
invalid_user = {
"username": "testuser",
"email": "invalid-email",
"password": "password123"
}
response = client.post("/users/", json=invalid_user)
assert response.status_code == 422
def test_missing_required_fields(self):
"""Test user creation with missing fields"""
incomplete_user = {
"username": "testuser"
# Missing email and password
}
response = client.post("/users/", json=incomplete_user)
assert response.status_code == 422
def test_invalid_data_types(self):
"""Test with invalid data types"""
invalid_item = {
"title": 123, # Should be string
"description": ["not", "a", "string"] # Should be string
}
response = client.post("/users/1/items/", json=invalid_item)
assert response.status_code == 422
# Async tests
@pytest.mark.asyncio
class TestAsyncEndpoints:
async def test_async_endpoint(self):
"""Test async endpoint functionality"""
with TestClient(app) as client:
response = client.get("/async-endpoint")
assert response.status_code == 200
# Database integration tests
class TestDatabase:
def test_database_connection(self):
"""Test database connection"""
response = client.get("/health")
assert response.status_code == 200
def test_database_transaction_rollback(self, test_user):
"""Test database transaction rollback on error"""
# This would test that database transactions are properly rolled back
# when an error occurs during request processing
pass
# Performance tests
class TestPerformance:
def test_response_time(self, test_user):
"""Test response time is acceptable"""
import time
start_time = time.time()
response = client.post("/users/", json=test_user)
end_time = time.time()
assert response.status_code == 200
assert (end_time - start_time) < 1.0 # Should respond within 1 second
def test_concurrent_requests(self, test_user):
"""Test handling concurrent requests"""
import threading
results = []
def make_request():
response = client.get("/users/")
results.append(response.status_code)
threads = []
for i in range(10):
thread = threading.Thread(target=make_request)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
assert all(status == 200 for status in results)PythonWebSocket Testing
import pytest
from fastapi.testclient import TestClient
from .main import app
import json
def test_websocket_connection():
"""Test basic WebSocket connection"""
client = TestClient(app)
with client.websocket_connect("/ws") as websocket:
# Send a message
websocket.send_text("Hello WebSocket")
# Receive response
data = websocket.receive_text()
assert "You wrote: Hello WebSocket" in data
def test_websocket_user_connection():
"""Test WebSocket connection with user ID"""
client = TestClient(app)
with client.websocket_connect("/ws/testuser") as websocket:
# Should receive join notification
data = websocket.receive_text()
assert "testuser joined the chat" in data
# Send a message
websocket.send_text("Hello from testuser")
# Receive echoed message
data = websocket.receive_text()
assert "User testuser: Hello from testuser" in data
def test_websocket_room_functionality():
"""Test WebSocket room functionality"""
client = TestClient(app)
# Connect two users to the same room
with client.websocket_connect("/ws/room/room1/user1") as ws1:
with client.websocket_connect("/ws/room/room1/user2") as ws2:
# User1 should receive user2 join notification
data = ws1.receive_text()
message = json.loads(data)
assert message["type"] == "user_joined"
assert message["user_id"] == "user2"
# Send message from user1
message_data = {
"type": "message",
"message": "Hello from user1"
}
ws1.send_text(json.dumps(message_data))
# User2 should receive the message
data = ws2.receive_text()
received_message = json.loads(data)
assert received_message["user_id"] == "user1"
assert received_message["message"] == "Hello from user1"
def test_websocket_authentication():
"""Test WebSocket with authentication"""
client = TestClient(app)
# Test with invalid token
with pytest.raises(Exception): # Connection should be rejected
with client.websocket_connect("/ws/secure/invalid-token"):
pass
# Test with valid token
with client.websocket_connect("/ws/secure/token1") as websocket:
websocket.send_text("Authenticated message")
data = websocket.receive_text()
assert "Authenticated user user1" in data
def test_websocket_disconnect_handling():
"""Test WebSocket disconnect handling"""
client = TestClient(app)
with client.websocket_connect("/ws/room/room1/user1") as ws1:
# Connect second user
with client.websocket_connect("/ws/room/room1/user2") as ws2:
# User1 receives user2 join notification
ws1.receive_text()
# User2 disconnects, user1 should receive leave notification
# This happens automatically when the context manager exits
# Note: In actual implementation, you might need to handle
# the disconnect event explicitly in your WebSocket handlerPythonChapter 9: Deployment and Production
Environment Configuration
from pydantic import BaseSettings, Field
from typing import Optional
import os
class Settings(BaseSettings):
"""Application settings with environment variable support"""
# Application settings
app_name: str = Field(default="FastAPI Application", env="APP_NAME")
app_version: str = Field(default="1.0.0", env="APP_VERSION")
debug: bool = Field(default=False, env="DEBUG")
# Database settings
database_url: str = Field(env="DATABASE_URL")
database_pool_size: int = Field(default=10, env="DATABASE_POOL_SIZE")
database_max_overflow: int = Field(default=20, env="DATABASE_MAX_OVERFLOW")
# Security settings
secret_key: str = Field(env="SECRET_KEY")
algorithm: str = Field(default="HS256", env="JWT_ALGORITHM")
access_token_expire_minutes: int = Field(default=30, env="ACCESS_TOKEN_EXPIRE_MINUTES")
# Redis settings (for caching/sessions)
redis_url: Optional[str] = Field(default=None, env="REDIS_URL")
# Email settings
smtp_server: Optional[str] = Field(default=None, env="SMTP_SERVER")
smtp_port: int = Field(default=587, env="SMTP_PORT")
smtp_username: Optional[str] = Field(default=None, env="SMTP_USERNAME")
smtp_password: Optional[str] = Field(default=None, env="SMTP_PASSWORD")
# File upload settings
max_file_size: int = Field(default=10485760, env="MAX_FILE_SIZE") # 10MB
upload_directory: str = Field(default="uploads", env="UPLOAD_DIRECTORY")
# API settings
api_rate_limit: int = Field(default=100, env="API_RATE_LIMIT")
cors_origins: str = Field(default="*", env="CORS_ORIGINS")
# Monitoring
sentry_dsn: Optional[str] = Field(default=None, env="SENTRY_DSN")
log_level: str = Field(default="INFO", env="LOG_LEVEL")
class Config:
env_file = ".env"
case_sensitive = False
# Global settings instance
settings = Settings()
# Environment-specific configurations
class DevelopmentSettings(Settings):
debug: bool = True
log_level: str = "DEBUG"
class ProductionSettings(Settings):
debug: bool = False
log_level: str = "WARNING"
class TestingSettings(Settings):
database_url: str = "sqlite:///./test.db"
secret_key: str = "test-secret-key"
def get_settings() -> Settings:
"""Get settings based on environment"""
env = os.getenv("ENVIRONMENT", "development").lower()
if env == "production":
return ProductionSettings()
elif env == "testing":
return TestingSettings()
else:
return DevelopmentSettings()PythonProduction Application Structure
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
import logging
import time
import uvicorn
from .config import get_settings
from .database import engine, Base
from .routers import users, items, auth
from .middleware import rate_limit_middleware, logging_middleware
# Initialize settings
settings = get_settings()
# Create database tables
Base.metadata.create_all(bind=engine)
# Initialize FastAPI app
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
debug=settings.debug,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None
)
# Configure logging
logging.basicConfig(
level=getattr(logging, settings.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Security middleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["localhost", "127.0.0.1", "*.yourdomain.com"]
)
# CORS middleware
origins = settings.cors_origins.split(",") if settings.cors_origins != "*" else ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Custom middleware
app.middleware("http")(rate_limit_middleware)
app.middleware("http")(logging_middleware)
# Include routers
app.include_router(auth.router, prefix="/auth", tags=["Authentication"])
app.include_router(users.router, prefix="/users", tags=["Users"])
app.include_router(items.router, prefix="/items", tags=["Items"])
# Global exception handler
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Global HTTP exception handler"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.status_code,
"message": exc.detail,
"timestamp": time.time()
}
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Global exception handler for unhandled exceptions"""
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": {
"code": 500,
"message": "Internal server error" if not settings.debug else str(exc),
"timestamp": time.time()
}
}
)
# Health check endpoints
@app.get("/health")
async def health_check():
"""Application health check"""
return {
"status": "healthy",
"version": settings.app_version,
"timestamp": time.time()
}
@app.get("/health/db")
async def database_health_check():
"""Database health check"""
try:
# Simple database query to check connection
from .database import SessionLocal
db = SessionLocal()
db.execute("SELECT 1")
db.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
logger.error(f"Database health check failed: {e}")
raise HTTPException(status_code=503, detail="Database unavailable")
# Startup and shutdown events
@app.on_event("startup")
async def startup_event():
"""Application startup event"""
logger.info(f"Starting {settings.app_name} v{settings.app_version}")
# Initialize external services (Redis, etc.)
if settings.redis_url:
# Initialize Redis connection
pass
# Initialize monitoring (Sentry, etc.)
if settings.sentry_dsn:
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[FastApiIntegration()],
environment=settings.environment
)
@app.on_event("shutdown")
async def shutdown_event():
"""Application shutdown event"""
logger.info("Shutting down application")
# Cleanup resources
# Close database connections, Redis connections, etc.
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=settings.debug,
workers=1 if settings.debug else 4
)PythonDocker Configuration
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
POETRY_NO_INTERACTION=1 \
POETRY_VENV_IN_PROJECT=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Poetry
RUN pip install poetry
# Set work directory
WORKDIR /app
# Copy dependency files
COPY pyproject.toml poetry.lock* ./
# Install dependencies
RUN poetry install --only=main && rm -rf $POETRY_CACHE_DIR
# Copy application code
COPY . .
# Create non-root user
RUN adduser --disabled-password --gecos '' appuser && \
chown -R appuser:appuser /app
USER appuser
# Create uploads directory
RUN mkdir -p /app/uploads
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run the application
CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]Dockerfileversion: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/fastapi_db
- REDIS_URL=redis://redis:6379
- SECRET_KEY=your-super-secret-key-here
- ENVIRONMENT=production
depends_on:
- db
- redis
volumes:
- ./uploads:/app/uploads
networks:
- app-network
db:
image: postgres:15
environment:
- POSTGRES_DB=fastapi_db
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- app-network
redis:
image: redis:7-alpine
networks:
- app-network
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
networks:
- app-network
volumes:
postgres_data:
networks:
app-network:
driver: bridgeYAMLChapter 10: Best Practices and Patterns
API Design Best Practices
from fastapi import FastAPI, HTTPException, Query, Path, Depends
from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum
import re
app = FastAPI()
# 1. Use meaningful HTTP status codes
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: str
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=20, regex=r'^[a-zA-Z0-9_]+$')
email: str = Field(..., regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
password: str = Field(..., min_length=8)
# 2. Use consistent naming conventions
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
"""Create a new user - use plural nouns for collections"""
# Implementation here
pass
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int = Path(..., description="The ID of the user to retrieve", gt=0)
):
"""Get user by ID - use descriptive path parameters"""
# Implementation here
pass
# 3. Use proper query parameters with validation
@app.get("/users", response_model=List[UserResponse])
async def list_users(
skip: int = Query(0, ge=0, description="Number of records to skip"),
limit: int = Query(10, ge=1, le=100, description="Maximum number of records to return"),
search: Optional[str] = Query(None, min_length=1, max_length=100),
sort_by: Optional[str] = Query("created_at", regex="^(username|email|created_at)$"),
order: Optional[str] = Query("desc", regex="^(asc|desc)$")
):
"""List users with proper pagination and filtering"""
# Implementation here
pass
# 4. Use enums for fixed sets of values
class UserStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
class UserUpdate(BaseModel):
username: Optional[str] = Field(None, min_length=3, max_length=20)
email: Optional[str] = None
status: Optional[UserStatus] = None
@app.patch("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):
"""Update user - use PATCH for partial updates"""
# Implementation here
pass
# 5. Implement proper error handling
class ErrorResponse(BaseModel):
error: str
message: str
details: Optional[dict] = None
@app.exception_handler(ValueError)
async def value_error_handler(request, exc):
return JSONResponse(
status_code=400,
content=ErrorResponse(
error="VALIDATION_ERROR",
message=str(exc)
).dict()
)
# 6. Use dependency injection for common functionality
async def get_current_user():
"""Dependency to get current authenticated user"""
# Implementation here
pass
async def require_admin_role(current_user = Depends(get_current_user)):
"""Dependency to require admin role"""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin role required")
return current_user
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
admin_user = Depends(require_admin_role)
):
"""Delete user - protected endpoint requiring admin role"""
# Implementation here
pass
# 7. Use response models to control data exposure
class PublicUserResponse(BaseModel):
id: int
username: str
# Note: email is excluded from public response
@app.get("/users/{user_id}/public", response_model=PublicUserResponse)
async def get_public_user_profile(user_id: int):
"""Get public user profile with limited information"""
# Implementation here
pass
# 8. Implement versioning strategy
@app.get("/v1/users/{user_id}", response_model=UserResponse)
async def get_user_v1(user_id: int):
"""Version 1 of get user endpoint"""
# Implementation here
pass
# 9. Use meaningful response structures
class PaginatedResponse(BaseModel):
items: List[dict]
total: int
page: int
page_size: int
has_next: bool
has_prev: bool
@app.get("/users/paginated", response_model=PaginatedResponse)
async def get_users_paginated(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100)
):
"""Get users with proper pagination metadata"""
# Implementation here
pass
# 10. Document your API thoroughly
@app.post(
"/users",
response_model=UserResponse,
status_code=201,
summary="Create a new user",
description="Create a new user account with username, email, and password",
response_description="The created user information",
responses={
201: {"description": "User created successfully"},
400: {"description": "Validation error"},
409: {"description": "User already exists"}
}
)
async def create_user_documented(user: UserCreate):
"""
Create a new user with comprehensive documentation.
- **username**: Must be 3-20 characters, alphanumeric and underscores only
- **email**: Must be a valid email address
- **password**: Must be at least 8 characters long
"""
# Implementation here
passPythonThis comprehensive FastAPI book covers all the essential aspects of building modern APIs with FastAPI, from basic concepts to advanced production deployment strategies. Each chapter includes practical examples, best practices, and real-world patterns that you can apply in your projects.
The book demonstrates how to:
- Build robust APIs with proper validation and error handling
- Implement authentication and authorization
- Integrate with databases using SQLAlchemy
- Test your applications thoroughly
- Deploy to production with Docker and proper configuration management
- Follow best practices for API design and code organization
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
