A Comprehensive Guide to Data Validation and Serialization in Python
Table of Contents
- Introduction
- Getting Started
- Basic Models
- Field Types and Validation
- Advanced Validation
- Custom Validators
- Model Configuration
- Serialization and Deserialization
- Working with Complex Data Structures
- Error Handling
- Performance and Optimization
- Integration with FastAPI
- Migration from v1 to v2
- Advanced Patterns and Best Practices
- Real-World Applications
Introduction
What is Pydantic?
Pydantic is a Python library that provides data validation and serialization using Python type hints. It’s built on top of type hints and provides runtime type checking, making it an essential tool for building robust Python applications.
graph TD
A[Raw Data] --> B[Pydantic Model]
B --> C[Validated Data]
B --> D[Type Conversion]
B --> E[Error Reporting]
C --> F[Serialization]
F --> G[JSON/Dict Output]Why Pydantic v2?
Pydantic v2 represents a complete rewrite with significant performance improvements and new features:
- Performance: 5-50x faster than v1
- Better Error Messages: More detailed and user-friendly
- Improved Type Support: Better handling of complex types
- New Features: Computed fields, serialization aliases, and more
graph LR
A[Pydantic v1] --> B[Performance Issues]
A --> C[Limited Features]
D[Pydantic v2] --> E[5-50x Faster]
D --> F[Better Error Messages]
D --> G[Enhanced Features]
D --> H[Rust Core]Getting Started
Installation
pip install pydanticBashFor additional features:
pip install pydantic[email] # Email validation
pip install pydantic[dotenv] # .env file supportBashYour First Model
from pydantic import BaseModel
from typing import Optional
class User(BaseModel):
id: int
name: str
email: str
age: Optional[int] = None
# Create an instance
user = User(id=1, name="John Doe", email="john@example.com", age=30)
print(user)
# Output: id=1 name='John Doe' email='john@example.com' age=30PythonclassDiagram
class BaseModel {
+model_validate()
+model_dump()
+model_fields
+model_config
}
class User {
+int id
+str name
+str email
+Optional[int] age
}
BaseModel <|-- UserBasic Models
Defining Models
Models in Pydantic are Python classes that inherit from BaseModel. Each attribute represents a field with its type annotation.
from pydantic import BaseModel
from datetime import datetime
from typing import List, Optional
class Address(BaseModel):
street: str
city: str
country: str
postal_code: str
class User(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
created_at: datetime
addresses: List[Address] = []
is_active: bool = TruePythonModel Instantiation
# From keyword arguments
user = User(
id=1,
username="johndoe",
email="john@example.com",
created_at=datetime.now()
)
# From dictionary
user_data = {
"id": 1,
"username": "johndoe",
"email": "john@example.com",
"created_at": "2023-01-01T00:00:00"
}
user = User.model_validate(user_data)
# From JSON
json_str = '{"id": 1, "username": "johndoe", "email": "john@example.com", "created_at": "2023-01-01T00:00:00"}'
user = User.model_validate_json(json_str)Pythonflowchart TD
A[Input Data] --> B{Data Source}
B -->|Dict| C[model_validate]
B -->|JSON| D[model_validate_json]
B -->|Keyword Args| E[Direct Instantiation]
C --> F[Validated Model]
D --> F
E --> F
F --> G[Type Conversion]
F --> H[Validation]Field Types and Validation
Built-in Types
Pydantic supports all Python built-in types and many additional types:
from pydantic import BaseModel, Field
from typing import List, Dict, Set, Tuple, Union
from datetime import datetime, date, time
from decimal import Decimal
from uuid import UUID
class DataTypes(BaseModel):
# Basic types
integer: int
floating: float
string: str
boolean: bool
# Collections
list_items: List[str]
dict_items: Dict[str, int]
set_items: Set[str]
tuple_items: Tuple[str, int, bool]
# Advanced types
datetime_field: datetime
date_field: date
time_field: time
decimal_field: Decimal
uuid_field: UUID
# Union types
union_field: Union[str, int]
optional_field: Optional[str] = NonePythonField Constraints
from pydantic import BaseModel, Field
from typing import Annotated
class Product(BaseModel):
name: Annotated[str, Field(min_length=1, max_length=100)]
price: Annotated[float, Field(gt=0, le=10000)]
quantity: Annotated[int, Field(ge=0)]
description: Annotated[str, Field(max_length=500)] = ""
tags: Annotated[List[str], Field(max_length=10)]
# Alternative syntax
rating: float = Field(ge=1, le=5, description="Product rating from 1 to 5")Pythongraph TD
A[Field Definition] --> B[Type Annotation]
A --> C[Constraints]
C --> D[String Constraints]
C --> E[Numeric Constraints]
C --> F[Collection Constraints]
D --> D1[min_length, max_length]
D --> D2[pattern, regex]
E --> E1[gt, ge, lt, le]
F --> F1[min_length, max_length]
F --> F2[unique_items]Custom Field Types
from pydantic import BaseModel, field_validator, Field
from typing import Annotated
import re
def validate_phone(v: str) -> str:
pattern = r'^\+?1?\d{9,15}$'
if not re.match(pattern, v):
raise ValueError('Invalid phone number format')
return v
PhoneNumber = Annotated[str, Field(description="Phone number")]
class Contact(BaseModel):
name: str
phone: PhoneNumber
@field_validator('phone')
@classmethod
def validate_phone_number(cls, v):
return validate_phone(v)PythonAdvanced Validation
Field Validators
from pydantic import BaseModel, field_validator, ValidationError
from typing import List
import re
class User(BaseModel):
username: str
email: str
password: str
tags: List[str]
@field_validator('username')
@classmethod
def username_must_be_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v.lower()
@field_validator('email')
@classmethod
def validate_email(cls, v):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, v):
raise ValueError('Invalid email format')
return v.lower()
@field_validator('password')
@classmethod
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one digit')
return v
@field_validator('tags')
@classmethod
def validate_tags(cls, v):
if len(v) > 5:
raise ValueError('Maximum 5 tags allowed')
return [tag.lower().strip() for tag in v]PythonModel Validators
from pydantic import BaseModel, model_validator
from datetime import datetime, date
class Event(BaseModel):
name: str
start_date: date
end_date: date
start_time: str
end_time: str
@model_validator(mode='after')
def validate_dates_and_times(self):
if self.end_date < self.start_date:
raise ValueError('End date must be after start date')
if self.start_date == self.end_date:
start_hour = int(self.start_time.split(':')[0])
end_hour = int(self.end_time.split(':')[0])
if end_hour <= start_hour:
raise ValueError('End time must be after start time on the same day')
return self
class UserProfile(BaseModel):
username: str
email: str
confirm_email: str
password: str
confirm_password: str
@model_validator(mode='after')
def validate_confirmations(self):
if self.email != self.confirm_email:
raise ValueError('Emails do not match')
if self.password != self.confirm_password:
raise ValueError('Passwords do not match')
return selfPythonsequenceDiagram
participant Input
participant FieldValidator
participant ModelValidator
participant Output
Input->>FieldValidator: Raw field values
FieldValidator->>FieldValidator: Validate individual fields
FieldValidator->>ModelValidator: Validated fields
ModelValidator->>ModelValidator: Cross-field validation
ModelValidator->>Output: Final validated modelCustom Validators
Creating Reusable Validators
from pydantic import BaseModel, field_validator, Field
from typing import Annotated, Any
import re
from functools import wraps
def create_regex_validator(pattern: str, error_msg: str):
"""Factory function to create regex validators"""
def validator(v: str) -> str:
if not re.match(pattern, v):
raise ValueError(error_msg)
return v
return validator
# Create specific validators
email_validator = create_regex_validator(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'Invalid email format'
)
phone_validator = create_regex_validator(
r'^\+?1?\d{9,15}$',
'Invalid phone number format'
)
class Contact(BaseModel):
name: str
email: str
phone: str
@field_validator('email')
@classmethod
def validate_email(cls, v):
return email_validator(v)
@field_validator('phone')
@classmethod
def validate_phone(cls, v):
return phone_validator(v)PythonConditional Validation
from pydantic import BaseModel, field_validator, model_validator
from typing import Optional, Literal
class PaymentMethod(BaseModel):
type: Literal['credit_card', 'bank_transfer', 'paypal']
# Credit card fields
card_number: Optional[str] = None
expiry_month: Optional[int] = None
expiry_year: Optional[int] = None
cvv: Optional[str] = None
# Bank transfer fields
account_number: Optional[str] = None
routing_number: Optional[str] = None
# PayPal fields
paypal_email: Optional[str] = None
@model_validator(mode='after')
def validate_payment_fields(self):
if self.type == 'credit_card':
required_fields = [self.card_number, self.expiry_month, self.expiry_year, self.cvv]
if any(field is None for field in required_fields):
raise ValueError('Credit card details are required')
elif self.type == 'bank_transfer':
if not self.account_number or not self.routing_number:
raise ValueError('Bank account details are required')
elif self.type == 'paypal':
if not self.paypal_email:
raise ValueError('PayPal email is required')
return selfPythonModel Configuration
ConfigDict
from pydantic import BaseModel, ConfigDict, Field
from typing import Dict, Any
class StrictUser(BaseModel):
model_config = ConfigDict(
str_strip_whitespace=True,
validate_default=True,
validate_assignment=True,
use_enum_values=True,
frozen=False,
extra='forbid'
)
name: str
age: int
email: str
class FlexibleUser(BaseModel):
model_config = ConfigDict(
extra='allow',
str_to_lower=True,
validate_default=False
)
name: str
age: int
# Usage
strict_user = StrictUser(name=" John Doe ", age=30, email="john@example.com")
print(strict_user.name) # "John Doe" (whitespace stripped)
flexible_user = FlexibleUser(name="Jane", age=25, extra_field="allowed")
print(flexible_user.model_dump()) # Includes extra_fieldPythonAliases and Serialization
from pydantic import BaseModel, Field, AliasChoices, AliasPath
class APIResponse(BaseModel):
user_id: int = Field(alias='userId')
full_name: str = Field(alias=AliasChoices('fullName', 'full_name', 'name'))
email_address: str = Field(alias='email')
profile_picture: str = Field(alias=AliasPath('profile', 'picture', 'url'))
class Config:
populate_by_name = True # Allow both field name and alias
# Input data with different formats
data1 = {
"userId": 1,
"fullName": "John Doe",
"email": "john@example.com",
"profile": {"picture": {"url": "http://example.com/pic.jpg"}}
}
data2 = {
"user_id": 1,
"full_name": "John Doe",
"email_address": "john@example.com",
"profile_picture": "http://example.com/pic.jpg"
}
response1 = APIResponse.model_validate(data1)
response2 = APIResponse.model_validate(data2)Pythongraph TD
A[Raw Input Data] --> B{Alias Resolution}
B --> C[Field Name]
B --> D[Primary Alias]
B --> E[Alternative Aliases]
C --> F[Model Field]
D --> F
E --> F
F --> G[Validation]
G --> H[Final Model]Serialization and Deserialization
Model Serialization
from pydantic import BaseModel, Field, field_serializer
from datetime import datetime
from typing import Optional, List
class User(BaseModel):
id: int
username: str
email: str
created_at: datetime
last_login: Optional[datetime] = None
tags: List[str] = []
@field_serializer('created_at', 'last_login')
def serialize_datetime(self, value: Optional[datetime]) -> Optional[str]:
if value is None:
return None
return value.isoformat()
@field_serializer('tags')
def serialize_tags(self, value: List[str]) -> str:
return ','.join(value)
user = User(
id=1,
username="johndoe",
email="john@example.com",
created_at=datetime.now(),
tags=["python", "programming"]
)
# Different serialization formats
print(user.model_dump()) # Python dict
print(user.model_dump_json()) # JSON string
print(user.model_dump(include={'id', 'username'})) # Specific fields
print(user.model_dump(exclude={'email'})) # Exclude fields
print(user.model_dump(by_alias=True)) # Use aliasesPythonCustom Serializers
from pydantic import BaseModel, field_serializer, model_serializer
from decimal import Decimal
from typing import Dict, Any
class Product(BaseModel):
name: str
price: Decimal
discount_percentage: float
@field_serializer('price')
def serialize_price(self, value: Decimal) -> str:
return f"${value:.2f}"
@model_serializer
def serialize_model(self) -> Dict[str, Any]:
# Custom model serialization
final_price = self.price * (1 - self.discount_percentage / 100)
return {
'product_name': self.name,
'original_price': f"${self.price:.2f}",
'discount': f"{self.discount_percentage}%",
'final_price': f"${final_price:.2f}",
'savings': f"${self.price - final_price:.2f}"
}
product = Product(name="Laptop", price=Decimal("999.99"), discount_percentage=10)
print(product.model_dump())PythonDeserialization with Custom Logic
from pydantic import BaseModel, field_validator, ValidationError
from typing import Union, List
import json
class FlexibleData(BaseModel):
numbers: List[int]
metadata: dict
@field_validator('numbers', mode='before')
@classmethod
def parse_numbers(cls, v):
if isinstance(v, str):
# Handle comma-separated string
return [int(x.strip()) for x in v.split(',')]
elif isinstance(v, (int, float)):
# Handle single number
return [int(v)]
return v
@field_validator('metadata', mode='before')
@classmethod
def parse_metadata(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except json.JSONDecodeError:
raise ValueError('Invalid JSON string')
return v
# Different input formats
data1 = FlexibleData(numbers="1,2,3,4", metadata='{"key": "value"}')
data2 = FlexibleData(numbers=42, metadata={"another": "dict"})
data3 = FlexibleData(numbers=[1, 2, 3], metadata={"direct": "dict"})PythonWorking with Complex Data Structures
Nested Models
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from datetime import datetime
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str = Field(alias='zipCode')
country: str = "USA"
class ContactInfo(BaseModel):
email: str
phone: Optional[str] = None
addresses: List[Address] = []
class User(BaseModel):
id: int
username: str
role: UserRole
contact: ContactInfo
preferences: Dict[str, bool] = {}
created_at: datetime
class Organization(BaseModel):
name: str
users: List[User]
admin: User
settings: Dict[str, str] = {}
# Create complex nested structure
org_data = {
"name": "Tech Corp",
"admin": {
"id": 1,
"username": "admin",
"role": "admin",
"contact": {
"email": "admin@techcorp.com",
"phone": "+1234567890",
"addresses": [
{
"street": "123 Tech St",
"city": "San Francisco",
"state": "CA",
"zipCode": "94105"
}
]
},
"created_at": "2023-01-01T00:00:00"
},
"users": [
{
"id": 2,
"username": "john_doe",
"role": "user",
"contact": {
"email": "john@techcorp.com"
},
"created_at": "2023-01-15T00:00:00"
}
]
}
organization = Organization.model_validate(org_data)PythonRecursive Models
from pydantic import BaseModel
from typing import List, Optional, ForwardRef
class Category(BaseModel):
id: int
name: str
parent_id: Optional[int] = None
children: List['Category'] = []
# Update forward references
Category.model_rebuild()
class Comment(BaseModel):
id: int
content: str
author: str
replies: List['Comment'] = []
Comment.model_rebuild()
# Tree structure example
category_data = {
"id": 1,
"name": "Electronics",
"children": [
{
"id": 2,
"name": "Computers",
"parent_id": 1,
"children": [
{"id": 3, "name": "Laptops", "parent_id": 2},
{"id": 4, "name": "Desktops", "parent_id": 2}
]
},
{
"id": 5,
"name": "Phones",
"parent_id": 1
}
]
}
root_category = Category.model_validate(category_data)PythonGeneric Models
from pydantic import BaseModel
from typing import TypeVar, Generic, List, Optional
from datetime import datetime
T = TypeVar('T')
class APIResponse(BaseModel, Generic[T]):
success: bool
data: Optional[T] = None
message: str = ""
timestamp: datetime = Field(default_factory=datetime.now)
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
page_size: int
@property
def total_pages(self) -> int:
return (self.total + self.page_size - 1) // self.page_size
class User(BaseModel):
id: int
name: str
email: str
# Usage with generics
UserResponse = APIResponse[User]
UserListResponse = APIResponse[PaginatedResponse[User]]
user_response = UserResponse(
success=True,
data=User(id=1, name="John", email="john@example.com"),
message="User retrieved successfully"
)
users_response = UserListResponse(
success=True,
data=PaginatedResponse[User](
items=[
User(id=1, name="John", email="john@example.com"),
User(id=2, name="Jane", email="jane@example.com")
],
total=50,
page=1,
page_size=10
)
)PythonclassDiagram
class BaseModel {
+model_validate()
+model_dump()
}
class APIResponse~T~ {
+bool success
+T data
+str message
+datetime timestamp
}
class PaginatedResponse~T~ {
+List~T~ items
+int total
+int page
+int page_size
+total_pages()
}
class User {
+int id
+str name
+str email
}
BaseModel <|-- APIResponse
BaseModel <|-- PaginatedResponse
BaseModel <|-- User
APIResponse --> PaginatedResponse
PaginatedResponse --> UserError Handling
Understanding Validation Errors
from pydantic import BaseModel, ValidationError, Field
from typing import List
class Product(BaseModel):
name: str = Field(min_length=1, max_length=100)
price: float = Field(gt=0)
tags: List[str] = Field(max_length=5)
try:
product = Product(
name="", # Too short
price=-10, # Negative
tags=["tag1", "tag2", "tag3", "tag4", "tag5", "tag6"] # Too many
)
except ValidationError as e:
print("Validation failed:")
print(f"Error count: {e.error_count()}")
for error in e.errors():
print(f"Field: {error['loc']}")
print(f"Error: {error['msg']}")
print(f"Type: {error['type']}")
print(f"Input: {error['input']}")
print("---")
# JSON representation
print("\nJSON representation:")
print(e.json(indent=2))PythonCustom Error Messages
from pydantic import BaseModel, Field, field_validator, ValidationError
class User(BaseModel):
username: str = Field(
min_length=3,
max_length=20,
description="Username must be 3-20 characters"
)
age: int = Field(
ge=13,
le=120,
description="Age must be between 13 and 120"
)
@field_validator('username')
@classmethod
def validate_username(cls, v):
if not v.isalnum():
raise ValueError('Username must contain only letters and numbers')
if v.lower() in ['admin', 'root', 'user']:
raise ValueError('Username cannot be a reserved word')
return v
class CustomErrorHandler:
@staticmethod
def format_errors(e: ValidationError) -> dict:
formatted_errors = {}
for error in e.errors():
field = '.'.join(str(loc) for loc in error['loc'])
formatted_errors[field] = {
'message': error['msg'],
'value': error['input'],
'type': error['type']
}
return formatted_errors
try:
user = User(username="ad", age=150)
except ValidationError as e:
errors = CustomErrorHandler.format_errors(e)
print(errors)PythonError Context and Debugging
from pydantic import BaseModel, ValidationError, validator
from typing import List, Dict, Any
import traceback
class DebugModel(BaseModel):
class Config:
validate_assignment = True
def __init__(self, **data):
try:
super().__init__(**data)
except ValidationError as e:
self._debug_validation_error(e, data)
raise
def _debug_validation_error(self, error: ValidationError, original_data: Dict[str, Any]):
print("=== VALIDATION DEBUG INFO ===")
print(f"Original data: {original_data}")
print(f"Model: {self.__class__.__name__}")
print("\nDetailed errors:")
for err in error.errors():
field_path = " -> ".join(str(loc) for loc in err['loc'])
print(f"\nField: {field_path}")
print(f"Error Type: {err['type']}")
print(f"Message: {err['msg']}")
print(f"Input Value: {err['input']}")
if 'ctx' in err:
print(f"Context: {err['ctx']}")
class DebuggableUser(DebugModel):
name: str
age: int
emails: List[str]
# This will trigger debug output
try:
user = DebuggableUser(
name=123, # Wrong type
age="not_a_number", # Wrong type
emails="not_a_list" # Wrong type
)
except ValidationError:
print("Validation failed as expected")PythonPerformance and Optimization
Performance Best Practices
from pydantic import BaseModel, Field, ConfigDict
from typing import List, Optional
import time
from functools import wraps
def time_operation(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start:.4f} seconds")
return result
return wrapper
# Optimized model configuration
class OptimizedUser(BaseModel):
model_config = ConfigDict(
# Performance optimizations
validate_assignment=False, # Only validate on creation
use_enum_values=True, # Use enum values directly
arbitrary_types_allowed=False, # Restrict to known types
str_strip_whitespace=True, # Built-in optimization
)
id: int
name: str
email: str
tags: List[str] = Field(default_factory=list)
# Batch processing optimization
class UserProcessor:
@staticmethod
@time_operation
def process_users_individually(user_data_list: List[dict]) -> List[OptimizedUser]:
"""Process users one by one (slower)"""
return [OptimizedUser.model_validate(data) for data in user_data_list]
@staticmethod
@time_operation
def process_users_batch(user_data_list: List[dict]) -> List[OptimizedUser]:
"""Process users in batch (faster)"""
users = []
for data in user_data_list:
try:
users.append(OptimizedUser.model_validate(data))
except Exception as e:
print(f"Failed to process user {data.get('id', 'unknown')}: {e}")
return users
# Memory-efficient streaming
class StreamingProcessor:
@staticmethod
def process_large_dataset(data_iterator):
"""Process large datasets without loading everything into memory"""
for data_chunk in data_iterator:
try:
yield OptimizedUser.model_validate(data_chunk)
except Exception as e:
print(f"Skipping invalid record: {e}")
continue
# Example usage
sample_data = [
{"id": i, "name": f"User {i}", "email": f"user{i}@example.com", "tags": [f"tag{i}"]}
for i in range(1000)
]
processor = UserProcessor()
users1 = processor.process_users_individually(sample_data)
users2 = processor.process_users_batch(sample_data)PythonMemory Management
from pydantic import BaseModel, Field
from typing import List, Iterator
import sys
from weakref import WeakSet
class MemoryOptimizedModel(BaseModel):
"""Model with memory optimization techniques"""
__slots__ = () # Reduce memory overhead
# Use __dict__ optimization for known fields
model_config = ConfigDict(
extra='forbid', # Prevent extra fields
frozen=True, # Immutable objects use less memory
)
class ResourceManager:
"""Manage model instances to prevent memory leaks"""
def __init__(self):
self._instances: WeakSet = WeakSet()
def create_model(self, model_class: type, data: dict):
instance = model_class.model_validate(data)
self._instances.add(instance)
return instance
def get_active_instances(self) -> int:
return len(self._instances)
def get_memory_usage(self) -> int:
"""Get approximate memory usage in bytes"""
total_size = 0
for instance in self._instances:
total_size += sys.getsizeof(instance)
return total_size
class LargeDataProcessor:
"""Process large datasets with memory management"""
def __init__(self, batch_size: int = 1000):
self.batch_size = batch_size
self.resource_manager = ResourceManager()
def process_data_stream(self, data_stream: Iterator[dict], model_class: type):
"""Process data in batches to manage memory"""
batch = []
for data in data_stream:
batch.append(data)
if len(batch) >= self.batch_size:
yield from self._process_batch(batch, model_class)
batch.clear()
# Report memory usage
print(f"Active instances: {self.resource_manager.get_active_instances()}")
print(f"Memory usage: {self.resource_manager.get_memory_usage()} bytes")
# Process remaining items
if batch:
yield from self._process_batch(batch, model_class)
def _process_batch(self, batch: List[dict], model_class: type):
for data in batch:
try:
instance = self.resource_manager.create_model(model_class, data)
yield instance
except Exception as e:
print(f"Failed to process record: {e}")
# Usage example
class OptimizedUser(MemoryOptimizedModel):
id: int
name: str
email: str
def data_generator(count: int):
"""Generate data without storing it all in memory"""
for i in range(count):
yield {
"id": i,
"name": f"User {i}",
"email": f"user{i}@example.com"
}
processor = LargeDataProcessor(batch_size=100)
data_stream = data_generator(10000)
# Process data efficiently
processed_count = 0
for user in processor.process_data_stream(data_stream, OptimizedUser):
processed_count += 1
if processed_count % 1000 == 0:
print(f"Processed {processed_count} users")Pythongraph TD
A[Large Dataset] --> B[Batch Processing]
B --> C[Memory Management]
C --> D[Validation]
D --> E[Model Creation]
E --> F[Weak References]
F --> G[Garbage Collection]
H[Performance Optimizations] --> I[ConfigDict Settings]
H --> J[Batch Validation]
H --> K[Streaming Processing]
H --> L[Memory Monitoring]Integration with FastAPI
Basic Integration
from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field, ValidationError
from typing import List, Optional
from datetime import datetime
import uvicorn
app = FastAPI(title="Pydantic FastAPI Integration", version="1.0.0")
# Request/Response Models
class UserCreate(BaseModel):
username: str = Field(min_length=3, max_length=20, description="Username")
email: str = Field(description="Valid email address")
full_name: Optional[str] = Field(None, max_length=100)
age: int = Field(ge=13, le=120, description="Age must be between 13 and 120")
class UserResponse(BaseModel):
id: int
username: str
email: str
full_name: Optional[str]
age: int
created_at: datetime
is_active: bool = True
class UserUpdate(BaseModel):
username: Optional[str] = Field(None, min_length=3, max_length=20)
email: Optional[str] = None
full_name: Optional[str] = Field(None, max_length=100)
age: Optional[int] = Field(None, ge=13, le=120)
class PaginationParams(BaseModel):
page: int = Field(1, ge=1, description="Page number")
page_size: int = Field(10, ge=1, le=100, description="Items per page")
# In-memory storage (use database in production)
users_db: List[UserResponse] = []
next_id = 1
@app.post("/users/", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
"""Create a new user"""
global next_id
# Check if username already exists
if any(u.username == user.username for u in users_db):
raise HTTPException(status_code=400, detail="Username already exists")
new_user = UserResponse(
id=next_id,
username=user.username,
email=user.email,
full_name=user.full_name,
age=user.age,
created_at=datetime.now()
)
users_db.append(new_user)
next_id += 1
return new_user
@app.get("/users/", response_model=List[UserResponse])
async def list_users(
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(10, ge=1, le=100, description="Items per page"),
username: Optional[str] = Query(None, description="Filter by username")
):
"""List users with pagination and filtering"""
filtered_users = users_db
if username:
filtered_users = [u for u in users_db if username.lower() in u.username.lower()]
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
return filtered_users[start_idx:end_idx]
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
"""Get user by ID"""
user = next((u for u in users_db if u.id == user_id), None)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):
"""Update user"""
user = next((u for u in users_db if u.id == user_id), None)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Update only provided fields
update_data = user_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
return user
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
"""Delete user"""
global users_db
user_index = next((i for i, u in enumerate(users_db) if u.id == user_id), None)
if user_index is None:
raise HTTPException(status_code=404, detail="User not found")
users_db.pop(user_index)
return {"message": "User deleted successfully"}
# Error handling
@app.exception_handler(ValidationError)
async def validation_exception_handler(request, exc):
return {"error": "Validation failed", "details": exc.errors()}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)PythonAdvanced FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Generic, TypeVar
from sqlalchemy.orm import Session
import asyncio
T = TypeVar('T')
class APIResponse(BaseModel, Generic[T]):
"""Generic API response wrapper"""
success: bool = True
data: Optional[T] = None
message: str = ""
errors: Optional[List[str]] = None
class PaginatedResponse(BaseModel, Generic[T]):
"""Paginated response wrapper"""
items: List[T]
total: int
page: int
page_size: int
has_next: bool
has_prev: bool
# Advanced request validation
class BulkUserCreate(BaseModel):
users: List[UserCreate] = Field(min_length=1, max_length=100)
@validator('users')
def validate_unique_usernames(cls, v):
usernames = [user.username for user in v]
if len(usernames) != len(set(usernames)):
raise ValueError('Usernames must be unique within the batch')
return v
class UserSearchParams(BaseModel):
q: Optional[str] = Field(None, min_length=2, description="Search query")
age_min: Optional[int] = Field(None, ge=13)
age_max: Optional[int] = Field(None, le=120)
is_active: Optional[bool] = None
sort_by: str = Field("created_at", regex="^(username|created_at|age)$")
sort_order: str = Field("desc", regex="^(asc|desc)$")
# Dependency injection with Pydantic
def get_search_params(
q: Optional[str] = None,
age_min: Optional[int] = None,
age_max: Optional[int] = None,
is_active: Optional[bool] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
) -> UserSearchParams:
return UserSearchParams(
q=q,
age_min=age_min,
age_max=age_max,
is_active=is_active,
sort_by=sort_by,
sort_order=sort_order
)
@app.post("/users/bulk", response_model=APIResponse[List[UserResponse]])
async def create_users_bulk(bulk_request: BulkUserCreate, background_tasks: BackgroundTasks):
"""Create multiple users at once"""
created_users = []
errors = []
for user_data in bulk_request.users:
try:
# Check for existing username
if any(u.username == user_data.username for u in users_db):
errors.append(f"Username '{user_data.username}' already exists")
continue
new_user = UserResponse(
id=next_id,
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
age=user_data.age,
created_at=datetime.now()
)
users_db.append(new_user)
created_users.append(new_user)
except Exception as e:
errors.append(f"Failed to create user '{user_data.username}': {str(e)}")
# Background task for logging
background_tasks.add_task(log_bulk_operation, len(created_users), len(errors))
return APIResponse(
success=len(errors) == 0,
data=created_users,
message=f"Created {len(created_users)} users",
errors=errors if errors else None
)
@app.get("/users/search", response_model=PaginatedResponse[UserResponse])
async def search_users(
search_params: UserSearchParams = Depends(get_search_params),
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100)
):
"""Advanced user search with filtering and sorting"""
filtered_users = users_db.copy()
# Apply filters
if search_params.q:
filtered_users = [
u for u in filtered_users
if search_params.q.lower() in u.username.lower()
or (u.full_name and search_params.q.lower() in u.full_name.lower())
]
if search_params.age_min is not None:
filtered_users = [u for u in filtered_users if u.age >= search_params.age_min]
if search_params.age_max is not None:
filtered_users = [u for u in filtered_users if u.age <= search_params.age_max]
if search_params.is_active is not None:
filtered_users = [u for u in filtered_users if u.is_active == search_params.is_active]
# Apply sorting
reverse = search_params.sort_order == "desc"
filtered_users.sort(key=lambda x: getattr(x, search_params.sort_by), reverse=reverse)
# Pagination
total = len(filtered_users)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
items = filtered_users[start_idx:end_idx]
return PaginatedResponse(
items=items,
total=total,
page=page,
page_size=page_size,
has_next=end_idx < total,
has_prev=page > 1
)
async def log_bulk_operation(created_count: int, error_count: int):
"""Background task for logging"""
await asyncio.sleep(1) # Simulate async operation
print(f"Bulk operation completed: {created_count} created, {error_count} errors")PythonsequenceDiagram
participant Client
participant FastAPI
participant Pydantic
participant BusinessLogic
participant Database
Client->>FastAPI: HTTP Request
FastAPI->>Pydantic: Validate Request Body
Pydantic->>Pydantic: Type Conversion & Validation
Pydantic-->>FastAPI: Validated Model
FastAPI->>BusinessLogic: Process Request
BusinessLogic->>Database: Query/Update
Database-->>BusinessLogic: Result
BusinessLogic-->>FastAPI: Response Data
FastAPI->>Pydantic: Serialize Response
Pydantic-->>FastAPI: JSON Response
FastAPI-->>Client: HTTP ResponseMigration from v1 to v2
Key Differences
# Pydantic v1 vs v2 comparison
# V1 Style (deprecated)
"""
from pydantic import BaseModel, validator, Field
class UserV1(BaseModel):
name: str
age: int
@validator('name')
def name_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Name cannot be empty')
return v.title()
class Config:
validate_assignment = True
allow_population_by_field_name = True
"""
# V2 Style (current)
from pydantic import BaseModel, field_validator, ConfigDict, Field
class UserV2(BaseModel):
model_config = ConfigDict(
validate_assignment=True,
populate_by_name=True
)
name: str
age: int
@field_validator('name')
@classmethod
def name_must_not_be_empty(cls, v):
if not v.strip():
raise ValueError('Name cannot be empty')
return v.title()PythonMigration Utilities
from pydantic import BaseModel, ConfigDict, field_validator, ValidationError
from typing import Dict, Any, Type, Optional
import warnings
class MigrationHelper:
"""Helper class for migrating from v1 to v2"""
@staticmethod
def create_v2_config(v1_config: Dict[str, Any]) -> ConfigDict:
"""Convert v1 Config to v2 ConfigDict"""
mapping = {
'allow_population_by_field_name': 'populate_by_name',
'allow_mutation': 'frozen', # Note: inverted logic
'use_enum_values': 'use_enum_values',
'validate_assignment': 'validate_assignment',
'extra': 'extra',
'schema_extra': 'json_schema_extra'
}
v2_config = {}
for v1_key, v2_key in mapping.items():
if v1_key in v1_config:
value = v1_config[v1_key]
if v1_key == 'allow_mutation':
# Invert logic for frozen
v2_config['frozen'] = not value
else:
v2_config[v2_key] = value
return ConfigDict(**v2_config)
@staticmethod
def migrate_validator_decorator(func):
"""Decorator to help migrate v1 validators"""
def wrapper(cls, v, values=None, **kwargs):
# v2 validators don't receive 'values' parameter
warnings.warn(
"Using v1 style validator. Consider updating to v2 syntax.",
DeprecationWarning
)
if 'values' in func.__code__.co_varnames:
return func(cls, v, values, **kwargs)
else:
return func(cls, v, **kwargs)
return wrapper
class LegacyModel(BaseModel):
"""Example of a model that supports both v1 and v2 patterns"""
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
str_strip_whitespace=True
)
name: str
email: str
age: Optional[int] = None
@field_validator('email')
@classmethod
def validate_email(cls, v):
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, v):
raise ValueError('Invalid email format')
return v.lower()
# Migration script example
def migrate_model_definition(v1_model_code: str) -> str:
"""Convert v1 model code to v2 syntax"""
replacements = [
# Config class to ConfigDict
('class Config:', 'model_config = ConfigDict('),
('Config:', 'model_config = ConfigDict('),
# Validator decorator
('@validator(', '@field_validator('),
('def validate_', 'def validate_'),
# Field validation function signature
('def validate_([^(]+)\\(cls, v, values\\)', 'def validate_\\1(cls, v)'),
# Config attributes
('allow_population_by_field_name', 'populate_by_name'),
('allow_mutation = False', 'frozen = True'),
('allow_mutation = True', 'frozen = False'),
]
migrated_code = v1_model_code
for old, new in replacements:
migrated_code = migrated_code.replace(old, new)
return migrated_code
# Example migration
v1_code = '''
class User(BaseModel):
name: str
email: str
@validator('email')
def validate_email(cls, v, values):
return v.lower()
class Config:
allow_population_by_field_name = True
validate_assignment = True
'''
v2_code = migrate_model_definition(v1_code)
print("Migrated code:")
print(v2_code)PythonCompatibility Layer
from pydantic import BaseModel, ConfigDict, field_validator
from typing import Dict, Any, Callable, Optional
import functools
import warnings
class V1CompatibilityMixin:
"""Mixin to provide v1 compatibility for v2 models"""
@classmethod
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
# Check for v1 style Config class
if hasattr(cls, 'Config'):
warnings.warn(
f"{cls.__name__} uses v1 style Config. Consider migrating to model_config.",
DeprecationWarning
)
cls._migrate_config()
@classmethod
def _migrate_config(cls):
"""Migrate v1 Config to v2 model_config"""
if not hasattr(cls, 'Config'):
return
config_attrs = {
attr: getattr(cls.Config, attr)
for attr in dir(cls.Config)
if not attr.startswith('_')
}
# Map v1 config to v2
v2_config = {}
mapping = {
'allow_population_by_field_name': 'populate_by_name',
'allow_mutation': ('frozen', lambda x: not x), # Inverted
'validate_assignment': 'validate_assignment',
'extra': 'extra',
'use_enum_values': 'use_enum_values'
}
for v1_attr, v2_mapping in mapping.items():
if v1_attr in config_attrs:
if isinstance(v2_mapping, tuple):
v2_attr, transform = v2_mapping
v2_config[v2_attr] = transform(config_attrs[v1_attr])
else:
v2_config[v2_mapping] = config_attrs[v1_attr]
cls.model_config = ConfigDict(**v2_config)
def v1_validator(field_name: str, **kwargs):
"""Decorator that mimics v1 validator behavior"""
def decorator(func: Callable) -> Callable:
@field_validator(field_name, **kwargs)
@classmethod
@functools.wraps(func)
def wrapper(cls, v):
# Call original function without 'values' parameter
return func(cls, v)
return wrapper
return decorator
# Example usage of compatibility layer
class CompatibleUser(V1CompatibilityMixin, BaseModel):
name: str
email: str
age: int
# V1 style config (will be migrated automatically)
class Config:
allow_population_by_field_name = True
validate_assignment = True
extra = 'forbid'
@v1_validator('email')
def validate_email(cls, v):
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError('Invalid email')
return v.lower()
# Test the compatibility
user = CompatibleUser(name="John", email="JOHN@EXAMPLE.COM", age=30)
print(f"Config migrated: {user.model_config}")
print(f"Email normalized: {user.email}")PythonAdvanced Patterns and Best Practices
Factory Pattern with Pydantic
from pydantic import BaseModel, Field, ConfigDict
from typing import Dict, Type, Union, Literal, Any
from abc import ABC, abstractmethod
from enum import Enum
class NotificationType(str, Enum):
EMAIL = "email"
SMS = "sms"
PUSH = "push"
WEBHOOK = "webhook"
class BaseNotification(BaseModel, ABC):
model_config = ConfigDict(extra='forbid')
type: NotificationType
recipient: str
message: str
@abstractmethod
def send(self) -> bool:
pass
class EmailNotification(BaseNotification):
type: Literal[NotificationType.EMAIL] = NotificationType.EMAIL
subject: str
html_content: bool = False
def send(self) -> bool:
print(f"Sending email to {self.recipient}: {self.subject}")
return True
class SMSNotification(BaseNotification):
type: Literal[NotificationType.SMS] = NotificationType.SMS
phone_number: str = Field(alias='recipient')
def send(self) -> bool:
print(f"Sending SMS to {self.phone_number}: {self.message}")
return True
class PushNotification(BaseNotification):
type: Literal[NotificationType.PUSH] = NotificationType.PUSH
device_token: str = Field(alias='recipient')
badge_count: int = 1
def send(self) -> bool:
print(f"Sending push to {self.device_token}: {self.message}")
return True
class NotificationFactory:
"""Factory for creating notification instances"""
_notification_types: Dict[NotificationType, Type[BaseNotification]] = {
NotificationType.EMAIL: EmailNotification,
NotificationType.SMS: SMSNotification,
NotificationType.PUSH: PushNotification,
}
@classmethod
def create(cls, notification_data: Dict[str, Any]) -> BaseNotification:
"""Create notification instance based on type"""
notification_type = NotificationType(notification_data.get('type'))
if notification_type not in cls._notification_types:
raise ValueError(f"Unsupported notification type: {notification_type}")
notification_class = cls._notification_types[notification_type]
return notification_class.model_validate(notification_data)
@classmethod
def register_type(cls, notification_type: NotificationType,
notification_class: Type[BaseNotification]):
"""Register new notification type"""
cls._notification_types[notification_type] = notification_class
# Usage
notifications_data = [
{
"type": "email",
"recipient": "user@example.com",
"message": "Hello World",
"subject": "Test Email"
},
{
"type": "sms",
"recipient": "+1234567890",
"message": "Hello SMS"
},
{
"type": "push",
"recipient": "device_token_123",
"message": "Hello Push",
"badge_count": 5
}
]
for data in notifications_data:
notification = NotificationFactory.create(data)
notification.send()PythonRepository Pattern with Pydantic
from pydantic import BaseModel, Field, ConfigDict
from typing import List, Optional, Generic, TypeVar, Protocol
from abc import ABC, abstractmethod
from datetime import datetime
import json
T = TypeVar('T', bound=BaseModel)
class Repository(Protocol, Generic[T]):
"""Repository protocol for data access"""
def save(self, entity: T) -> T:
...
def find_by_id(self, id: int) -> Optional[T]:
...
def find_all(self) -> List[T]:
...
def delete(self, id: int) -> bool:
...
class User(BaseModel):
model_config = ConfigDict(
validate_assignment=True,
frozen=False
)
id: Optional[int] = None
username: str = Field(min_length=3, max_length=20)
email: str
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
class InMemoryRepository(Generic[T]):
"""In-memory implementation of repository"""
def __init__(self, model_class: type[T]):
self.model_class = model_class
self._data: Dict[int, T] = {}
self._next_id = 1
def save(self, entity: T) -> T:
if entity.id is None:
# Create new entity
entity.id = self._next_id
self._next_id += 1
entity.created_at = datetime.now()
else:
# Update existing entity
entity.updated_at = datetime.now()
self._data[entity.id] = entity
return entity
def find_by_id(self, id: int) -> Optional[T]:
return self._data.get(id)
def find_all(self) -> List[T]:
return list(self._data.values())
def delete(self, id: int) -> bool:
if id in self._data:
del self._data[id]
return True
return False
def find_by_criteria(self, **criteria) -> List[T]:
"""Find entities matching criteria"""
results = []
for entity in self._data.values():
match = True
for key, value in criteria.items():
if not hasattr(entity, key) or getattr(entity, key) != value:
match = False
break
if match:
results.append(entity)
return results
class Filefrom pydantic import BaseModel, Field, ConfigDict
from typing import List, Optional, Generic, TypeVar, Protocol
from abc import ABC, abstractmethod
from datetime import datetime
import json
T = TypeVar('T', bound=BaseModel)
class Repository(Protocol, Generic[T]):
"""Repository protocol for data access"""
def save(self, entity: T) -> T:
...
def find_by_id(self, id: int) -> Optional[T]:
...
def find_all(self) -> List[T]:
...
def delete(self, id: int) -> bool:
...
class User(BaseModel):
model_config = ConfigDict(
validate_assignment=True,
frozen=False
)
id: Optional[int] = None
username: str = Field(min_length=3, max_length=20)
email: str
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
class InMemoryRepository(Generic[T]):
"""In-memory implementation of repository"""
def __init__(self, model_class: type[T]):
self.model_class = model_class
self._data: Dict[int, T] = {}
self._next_id = 1
def save(self, entity: T) -> T:
if entity.id is None:
# Create new entity
entity.id = self._next_id
self._next_id += 1
entity.created_at = datetime.now()
else:
# Update existing entity
entity.updated_at = datetime.now()
self._data[entity.id] = entity
return entity
def find_by_id(self, id: int) -> Optional[T]:
return self._data.get(id)
def find_all(self) -> List[T]:
return list(self._data.values())
def delete(self, id: int) -> bool:
if id in self._data:
del self._data[id]
return True
return False
def find_by_criteria(self, **criteria) -> List[T]:
"""Find entities matching criteria"""
results = []
for entity in self._data.values():
match = True
for key, value in criteria.items():
if not hasattr(entity, key) or getattr(entity, key) != value:
match = False
break
if match:
results.append(entity)
return results
class FileBasedRepository(Generic[T]):
"""File-based repository implementation"""
def __init__(self, model_class: Type[T], filename: str):
self.model_class = model_class
self.filename = filename
self._load_data()
def _load_data(self):
try:
with open(self.filename, 'r') as f:
data = json.load(f)
self._data = {
item['id']: self.model_class.model_validate(item)
for item in data
}
self._next_id = max(self._data.keys(), default=0) + 1
except FileNotFoundError:
self._data = {}
self._next_id = 1
def _save_data(self):
with open(self.filename, 'w') as f:
data = [entity.model_dump() for entity in self._data.values()]
json.dump(data, f, indent=2, default=str)
def save(self, entity: T) -> T:
if entity.id is None:
entity.id = self._next_id
self._next_id += 1
entity.created_at = datetime.now()
else:
entity.updated_at = datetime.now()
self._data[entity.id] = entity
self._save_data()
return entity
def find_by_id(self, id: int) -> Optional[T]:
return self._data.get(id)
def find_all(self) -> List[T]:
return list(self._data.values())
def delete(self, id: int) -> bool:
if id in self._data:
del self._data[id]
self._save_data()
return True
return False
# Usage example
user_repo = InMemoryRepository(User)
file_repo = FileBasedRepository(User, "users.json")
# Create users
user1 = User(username="john_doe", email="john@example.com")
user2 = User(username="jane_doe", email="jane@example.com")
saved_user1 = user_repo.save(user1)
saved_user2 = file_repo.save(user2)
print(f"Created user: {saved_user1}")
print(f"All users: {user_repo.find_all()}")PythonclassDiagram
class Repository~T~ {
<<interface>>
+save(entity: T) T
+find_by_id(id: int) Optional[T]
+find_all() List[T]
+delete(id: int) bool
}
class InMemoryRepository~T~ {
-_data: Dict[int, T]
-_next_id: int
+save(entity: T) T
+find_by_id(id: int) Optional[T]
+find_all() List[T]
+delete(id: int) bool
}
class FileBasedRepository~T~ {
-filename: str
-_data: Dict[int, T]
-_load_data()
-_save_data()
+save(entity: T) T
+find_by_id(id: int) Optional[T]
+find_all() List[T]
+delete(id: int) bool
}
Repository <|.. InMemoryRepository
Repository <|.. FileBasedRepository
Builder Pattern with Validation
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List, Dict, Any
from datetime import datetime, date
class UserBuilder:
"""Builder pattern for creating complex User objects with validation"""
def __init__(self):
self.reset()
def reset(self):
self._data = {}
return self
def with_basic_info(self, username: str, email: str, full_name: Optional[str] = None):
self._data.update({
'username': username,
'email': email,
'full_name': full_name
})
return self
def with_personal_info(self, age: int, birth_date: Optional[date] = None):
self._data.update({
'age': age,
'birth_date': birth_date
})
return self
def with_preferences(self, **preferences):
if 'preferences' not in self._data:
self._data['preferences'] = {}
self._data['preferences'].update(preferences)
return self
def with_roles(self, *roles: str):
self._data['roles'] = list(roles)
return self
def with_metadata(self, **metadata):
if 'metadata' not in self._data:
self._data['metadata'] = {}
self._data['metadata'].update(metadata)
return self
def build(self) -> 'ComplexUser':
"""Build and validate the user object"""
return ComplexUser.model_validate(self._data)
class ComplexUser(BaseModel):
username: str = Field(min_length=3, max_length=20)
email: str
full_name: Optional[str] = None
age: Optional[int] = Field(None, ge=13, le=120)
birth_date: Optional[date] = None
roles: List[str] = Field(default_factory=list)
preferences: Dict[str, Any] = Field(default_factory=dict)
metadata: Dict[str, Any] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=datetime.now)
@field_validator('email')
@classmethod
def validate_email(cls, v):
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError('Invalid email format')
return v.lower()
@field_validator('roles')
@classmethod
def validate_roles(cls, v):
valid_roles = {'admin', 'user', 'moderator', 'editor'}
for role in v:
if role not in valid_roles:
raise ValueError(f'Invalid role: {role}')
return v
# Usage
builder = UserBuilder()
user = (builder
.with_basic_info("john_doe", "JOHN@EXAMPLE.COM", "John Doe")
.with_personal_info(25, date(1998, 5, 15))
.with_roles("user", "editor")
.with_preferences(theme="dark", notifications=True)
.with_metadata(source="web_signup", campaign="spring2023")
.build())
print(user.model_dump_json(indent=2))PythonEvent-Driven Architecture
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Protocol, runtime_checkable
from datetime import datetime
from abc import ABC, abstractmethod
import asyncio
from enum import Enum
class EventType(str, Enum):
USER_CREATED = "user.created"
USER_UPDATED = "user.updated"
USER_DELETED = "user.deleted"
ORDER_PLACED = "order.placed"
PAYMENT_PROCESSED = "payment.processed"
class BaseEvent(BaseModel):
"""Base class for all domain events"""
id: str = Field(default_factory=lambda: str(datetime.now().timestamp()))
event_type: EventType
timestamp: datetime = Field(default_factory=datetime.now)
source: str = "system"
version: str = "1.0"
metadata: Dict[str, Any] = Field(default_factory=dict)
class UserCreatedEvent(BaseEvent):
event_type: EventType = EventType.USER_CREATED
user_id: int
username: str
email: str
class OrderPlacedEvent(BaseEvent):
event_type: EventType = EventType.ORDER_PLACED
order_id: int
user_id: int
total_amount: float
items: List[Dict[str, Any]]
@runtime_checkable
class EventHandler(Protocol):
"""Protocol for event handlers"""
async def handle(self, event: BaseEvent) -> None:
...
def can_handle(self, event_type: EventType) -> bool:
...
class EmailNotificationHandler:
"""Handler for sending email notifications"""
def can_handle(self, event_type: EventType) -> bool:
return event_type in [EventType.USER_CREATED, EventType.ORDER_PLACED]
async def handle(self, event: BaseEvent) -> None:
if isinstance(event, UserCreatedEvent):
await self._send_welcome_email(event)
elif isinstance(event, OrderPlacedEvent):
await self._send_order_confirmation(event)
async def _send_welcome_email(self, event: UserCreatedEvent):
print(f"Sending welcome email to {event.email}")
await asyncio.sleep(0.1) # Simulate async operation
async def _send_order_confirmation(self, event: OrderPlacedEvent):
print(f"Sending order confirmation for order {event.order_id}")
await asyncio.sleep(0.1)
class AnalyticsHandler:
"""Handler for analytics tracking"""
def can_handle(self, event_type: EventType) -> bool:
return True # Track all events
async def handle(self, event: BaseEvent) -> None:
print(f"Analytics: {event.event_type} at {event.timestamp}")
await asyncio.sleep(0.05)
class EventBus:
"""Simple event bus implementation"""
def __init__(self):
self._handlers: List[EventHandler] = []
def subscribe(self, handler: EventHandler):
"""Subscribe a handler to the event bus"""
self._handlers.append(handler)
async def publish(self, event: BaseEvent):
"""Publish an event to all interested handlers"""
tasks = []
for handler in self._handlers:
if handler.can_handle(event.event_type):
tasks.append(handler.handle(event))
if tasks:
await asyncio.gather(*tasks)
# Usage example
async def main():
# Setup event bus and handlers
event_bus = EventBus()
event_bus.subscribe(EmailNotificationHandler())
event_bus.subscribe(AnalyticsHandler())
# Publish events
user_event = UserCreatedEvent(
user_id=1,
username="john_doe",
email="john@example.com",
metadata={"signup_source": "web"}
)
order_event = OrderPlacedEvent(
order_id=101,
user_id=1,
total_amount=99.99,
items=[{"product": "Laptop", "quantity": 1, "price": 99.99}]
)
await event_bus.publish(user_event)
await event_bus.publish(order_event)
# Run the example
# asyncio.run(main())Pythongraph TD
A[Domain Event] --> B[Event Bus]
B --> C[Email Handler]
B --> D[Analytics Handler]
B --> E[Audit Handler]
C --> F[Send Email]
D --> G[Track Analytics]
E --> H[Log Event]
I[User Created] --> B
J[Order Placed] --> B
K[Payment Processed] --> BReal-World Applications
E-commerce API Models
from pydantic import BaseModel, Field, field_validator, computed_field
from typing import List, Optional, Dict, Any
from datetime import datetime
from decimal import Decimal
from enum import Enum
class ProductStatus(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
OUT_OF_STOCK = "out_of_stock"
DISCONTINUED = "discontinued"
class OrderStatus(str, Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
REFUNDED = "refunded"
class Category(BaseModel):
id: int
name: str
slug: str
description: Optional[str] = None
parent_id: Optional[int] = None
class Product(BaseModel):
id: int
name: str = Field(min_length=1, max_length=200)
description: str = Field(max_length=2000)
sku: str = Field(min_length=3, max_length=50)
price: Decimal = Field(gt=0, decimal_places=2)
compare_price: Optional[Decimal] = Field(None, gt=0, decimal_places=2)
cost: Optional[Decimal] = Field(None, ge=0, decimal_places=2)
weight: Optional[float] = Field(None, gt=0)
dimensions: Optional[Dict[str, float]] = None
inventory_quantity: int = Field(ge=0)
status: ProductStatus = ProductStatus.ACTIVE
category_id: int
category: Optional[Category] = None
tags: List[str] = Field(default_factory=list)
images: List[str] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
@field_validator('sku')
@classmethod
def validate_sku(cls, v):
if not v.replace('-', '').replace('_', '').isalnum():
raise ValueError('SKU must contain only letters, numbers, hyphens, and underscores')
return v.upper()
@field_validator('tags')
@classmethod
def validate_tags(cls, v):
return [tag.lower().strip() for tag in v if tag.strip()]
@computed_field
@property
def discount_percentage(self) -> Optional[float]:
if self.compare_price and self.compare_price > self.price:
return float((self.compare_price - self.price) / self.compare_price * 100)
return None
@computed_field
@property
def is_on_sale(self) -> bool:
return self.compare_price is not None and self.compare_price > self.price
@computed_field
@property
def profit_margin(self) -> Optional[float]:
if self.cost:
return float((self.price - self.cost) / self.price * 100)
return None
class CartItem(BaseModel):
product_id: int
product: Optional[Product] = None
quantity: int = Field(gt=0, le=100)
unit_price: Decimal = Field(gt=0, decimal_places=2)
@computed_field
@property
def total_price(self) -> Decimal:
return self.unit_price * self.quantity
class ShippingAddress(BaseModel):
first_name: str = Field(min_length=1, max_length=50)
last_name: str = Field(min_length=1, max_length=50)
company: Optional[str] = Field(None, max_length=100)
address_line_1: str = Field(min_length=5, max_length=100)
address_line_2: Optional[str] = Field(None, max_length=100)
city: str = Field(min_length=2, max_length=50)
state: str = Field(min_length=2, max_length=50)
postal_code: str = Field(min_length=3, max_length=20)
country: str = Field(min_length=2, max_length=3)
phone: Optional[str] = None
class Order(BaseModel):
id: Optional[int] = None
order_number: str = Field(min_length=5, max_length=20)
customer_id: int
status: OrderStatus = OrderStatus.PENDING
items: List[CartItem] = Field(min_length=1)
shipping_address: ShippingAddress
billing_address: Optional[ShippingAddress] = None
subtotal: Decimal = Field(gt=0, decimal_places=2)
tax_amount: Decimal = Field(ge=0, decimal_places=2)
shipping_amount: Decimal = Field(ge=0, decimal_places=2)
discount_amount: Decimal = Field(ge=0, decimal_places=2)
notes: Optional[str] = Field(None, max_length=500)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
shipped_at: Optional[datetime] = None
delivered_at: Optional[datetime] = None
@computed_field
@property
def total_amount(self) -> Decimal:
return self.subtotal + self.tax_amount + self.shipping_amount - self.discount_amount
@computed_field
@property
def item_count(self) -> int:
return sum(item.quantity for item in self.items)
@field_validator('billing_address', mode='before')
@classmethod
def set_billing_address(cls, v, info):
# Use shipping address as billing address if not provided
if v is None and 'shipping_address' in info.data:
return info.data['shipping_address']
return v
# API Request/Response Models
class ProductCreateRequest(BaseModel):
name: str = Field(min_length=1, max_length=200)
description: str = Field(max_length=2000)
sku: str = Field(min_length=3, max_length=50)
price: Decimal = Field(gt=0, decimal_places=2)
compare_price: Optional[Decimal] = Field(None, gt=0, decimal_places=2)
cost: Optional[Decimal] = Field(None, ge=0, decimal_places=2)
weight: Optional[float] = Field(None, gt=0)
inventory_quantity: int = Field(ge=0)
category_id: int
tags: List[str] = Field(default_factory=list)
attributes: Dict[str, Any] = Field(default_factory=dict)
class OrderCreateRequest(BaseModel):
customer_id: int
items: List[Dict[str, Any]] = Field(min_length=1)
shipping_address: ShippingAddress
billing_address: Optional[ShippingAddress] = None
notes: Optional[str] = Field(None, max_length=500)
class OrderUpdateRequest(BaseModel):
status: Optional[OrderStatus] = None
notes: Optional[str] = Field(None, max_length=500)
shipping_address: Optional[ShippingAddress] = None
# Usage example
product_data = {
"id": 1,
"name": "Gaming Laptop",
"description": "High-performance gaming laptop with RTX graphics",
"sku": "LAPTOP-GAMING-001",
"price": "1299.99",
"compare_price": "1499.99",
"cost": "900.00",
"weight": 2.5,
"inventory_quantity": 10,
"status": "active",
"category_id": 1,
"tags": ["gaming", "laptop", "electronics"],
"images": ["image1.jpg", "image2.jpg"],
"attributes": {
"brand": "TechBrand",
"processor": "Intel i7",
"ram": "16GB",
"storage": "512GB SSD"
}
}
product = Product.model_validate(product_data)
print(f"Product: {product.name}")
print(f"Discount: {product.discount_percentage}%")
print(f"Profit margin: {product.profit_margin}%")PythonData Pipeline Models
from pydantic import BaseModel, Field, field_validator, ConfigDict
from typing import List, Dict, Any, Optional, Union, Literal
from datetime import datetime, timedelta
from enum import Enum
import json
class DataSourceType(str, Enum):
DATABASE = "database"
API = "api"
FILE = "file"
STREAM = "stream"
class DataFormat(str, Enum):
JSON = "json"
CSV = "csv"
XML = "xml"
PARQUET = "parquet"
AVRO = "avro"
class TransformationType(str, Enum):
FILTER = "filter"
MAP = "map"
AGGREGATE = "aggregate"
JOIN = "join"
VALIDATE = "validate"
class PipelineStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class DataSource(BaseModel):
model_config = ConfigDict(extra='forbid')
id: str
name: str
type: DataSourceType
format: DataFormat
connection_config: Dict[str, Any]
schema_definition: Optional[Dict[str, Any]] = None
@field_validator('connection_config')
@classmethod
def validate_connection_config(cls, v, info):
source_type = info.data.get('type')
if source_type == DataSourceType.DATABASE:
required_fields = ['host', 'port', 'database', 'username']
for field in required_fields:
if field not in v:
raise ValueError(f'Database source requires {field}')
elif source_type == DataSourceType.API:
required_fields = ['url']
for field in required_fields:
if field not in v:
raise ValueError(f'API source requires {field}')
elif source_type == DataSourceType.FILE:
required_fields = ['path']
for field in required_fields:
if field not in v:
raise ValueError(f'File source requires {field}')
return v
class TransformationRule(BaseModel):
id: str
name: str
type: TransformationType
config: Dict[str, Any]
order: int = 0
@field_validator('config')
@classmethod
def validate_config(cls, v, info):
transform_type = info.data.get('type')
if transform_type == TransformationType.FILTER:
if 'condition' not in v:
raise ValueError('Filter transformation requires condition')
elif transform_type == TransformationType.MAP:
if 'mapping' not in v:
raise ValueError('Map transformation requires mapping')
elif transform_type == TransformationType.AGGREGATE:
if 'group_by' not in v or 'aggregations' not in v:
raise ValueError('Aggregate transformation requires group_by and aggregations')
return v
class DataValidationRule(BaseModel):
field: str
rule_type: Literal['required', 'type', 'range', 'pattern', 'custom']
config: Dict[str, Any]
severity: Literal['error', 'warning'] = 'error'
class DataQualityConfig(BaseModel):
enabled: bool = True
validation_rules: List[DataValidationRule] = Field(default_factory=list)
null_threshold: float = Field(0.1, ge=0, le=1)
duplicate_threshold: float = Field(0.05, ge=0, le=1)
completeness_threshold: float = Field(0.95, ge=0, le=1)
class PipelineConfig(BaseModel):
id: str
name: str = Field(min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
source: DataSource
transformations: List[TransformationRule] = Field(default_factory=list)
destination: Dict[str, Any]
data_quality: DataQualityConfig = Field(default_factory=DataQualityConfig)
schedule: Optional[str] = None # Cron expression
timeout_minutes: int = Field(60, gt=0, le=1440)
retry_attempts: int = Field(3, ge=0, le=10)
tags: List[str] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.now)
created_by: str
@field_validator('schedule')
@classmethod
def validate_schedule(cls, v):
if v is not None:
# Basic cron validation (simplified)
parts = v.split()
if len(parts) != 5:
raise ValueError('Schedule must be a valid cron expression (5 parts)')
return v
class PipelineExecution(BaseModel):
id: str
pipeline_id: str
status: PipelineStatus = PipelineStatus.PENDING
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
duration_seconds: Optional[int] = None
records_processed: int = 0
records_failed: int = 0
error_message: Optional[str] = None
logs: List[str] = Field(default_factory=list)
metrics: Dict[str, Any] = Field(default_factory=dict)
@computed_field
@property
def success_rate(self) -> float:
total = self.records_processed + self.records_failed
if total == 0:
return 0.0
return self.records_processed / total
class DataQualityReport(BaseModel):
execution_id: str
total_records: int
valid_records: int
invalid_records: int
null_count: Dict[str, int] = Field(default_factory=dict)
duplicate_count: int = 0
validation_errors: List[Dict[str, Any]] = Field(default_factory=list)
completeness_score: float = Field(ge=0, le=1)
quality_score: float = Field(ge=0, le=1)
generated_at: datetime = Field(default_factory=datetime.now)
# Pipeline Management Service
class PipelineManager:
def __init__(self):
self.pipelines: Dict[str, PipelineConfig] = {}
self.executions: Dict[str, PipelineExecution] = {}
def create_pipeline(self, config: PipelineConfig) -> PipelineConfig:
"""Create a new data pipeline"""
if config.id in self.pipelines:
raise ValueError(f"Pipeline {config.id} already exists")
self.pipelines[config.id] = config
return config
def execute_pipeline(self, pipeline_id: str) -> PipelineExecution:
"""Execute a data pipeline"""
if pipeline_id not in self.pipelines:
raise ValueError(f"Pipeline {pipeline_id} not found")
execution = PipelineExecution(
id=f"{pipeline_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
pipeline_id=pipeline_id,
started_at=datetime.now()
)
self.executions[execution.id] = execution
return execution
def get_pipeline_status(self, pipeline_id: str) -> List[PipelineExecution]:
"""Get execution history for a pipeline"""
return [
execution for execution in self.executions.values()
if execution.pipeline_id == pipeline_id
]
# Usage example
data_source = DataSource(
id="user_database",
name="User Database",
type=DataSourceType.DATABASE,
format=DataFormat.JSON,
connection_config={
"host": "localhost",
"port": 5432,
"database": "users",
"username": "admin",
"password": "secret"
}
)
transformations = [
TransformationRule(
id="filter_active",
name="Filter Active Users",
type=TransformationType.FILTER,
config={"condition": "status == 'active'"},
order=1
),
TransformationRule(
id="map_fields",
name="Map User Fields",
type=TransformationType.MAP,
config={
"mapping": {
"user_name": "username",
"email_address": "email",
"created_date": "created_at"
}
},
order=2
)
]
pipeline_config = PipelineConfig(
id="user_etl_pipeline",
name="User ETL Pipeline",
description="Extract, transform, and load user data",
source=data_source,
transformations=transformations,
destination={
"type": "warehouse",
"table": "dim_users",
"connection": "data_warehouse"
},
schedule="0 2 * * *", # Daily at 2 AM
created_by="data_engineer"
)
manager = PipelineManager()
created_pipeline = manager.create_pipeline(pipeline_config)
execution = manager.execute_pipeline(pipeline_config.id)
print(f"Pipeline created: {created_pipeline.name}")
print(f"Execution started: {execution.id}")Pythongraph TD
A[Data Source] --> B[Extract]
B --> C[Transform]
C --> D[Validate]
D --> E[Load]
E --> F[Destination]
G[Pipeline Config] --> H[Transformation Rules]
G --> I[Data Quality Rules]
G --> J[Schedule]
K[Execution] --> L[Metrics]
K --> M[Logs]
K --> N[Quality Report]Configuration Management
from pydantic import BaseModel, Field, field_validator, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Dict, Optional, Union, Literal
from pathlib import Path
import os
class DatabaseConfig(BaseModel):
host: str = Field(description="Database host")
port: int = Field(default=5432, ge=1, le=65535)
database: str = Field(description="Database name")
username: str = Field(description="Database username")
password: SecretStr = Field(description="Database password")
pool_size: int = Field(default=10, ge=1, le=100)
max_overflow: int = Field(default=20, ge=0, le=100)
echo: bool = Field(default=False, description="Enable SQL logging")
@property
def url(self) -> str:
"""Generate database URL"""
return f"postgresql://{self.username}:{self.password.get_secret_value()}@{self.host}:{self.port}/{self.database}"
class RedisConfig(BaseModel):
host: str = Field(default="localhost")
port: int = Field(default=6379, ge=1, le=65535)
password: Optional[SecretStr] = None
db: int = Field(default=0, ge=0, le=15)
socket_timeout: float = Field(default=5.0, gt=0)
max_connections: int = Field(default=50, ge=1, le=1000)
class LoggingConfig(BaseModel):
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_path: Optional[Path] = None
max_file_size: str = "10MB"
backup_count: int = Field(default=5, ge=1, le=10)
json_format: bool = False
@field_validator('file_path')
@classmethod
def validate_file_path(cls, v):
if v is not None:
# Ensure directory exists
v.parent.mkdir(parents=True, exist_ok=True)
return v
class SecurityConfig(BaseModel):
secret_key: SecretStr = Field(description="Secret key for JWT tokens")
algorithm: str = Field(default="HS256")
access_token_expire_minutes: int = Field(default=30, ge=1, le=1440)
refresh_token_expire_days: int = Field(default=7, ge=1, le=30)
password_min_length: int = Field(default=8, ge=6, le=128)
max_login_attempts: int = Field(default=5, ge=1, le=10)
lockout_duration_minutes: int = Field(default=15, ge=1, le=1440)
class EmailConfig(BaseModel):
smtp_host: str
smtp_port: int = Field(default=587, ge=1, le=65535)
username: str
password: SecretStr
use_tls: bool = True
use_ssl: bool = False
from_email: str
from_name: str = "Application"
@field_validator('from_email')
@classmethod
def validate_email(cls, v):
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError('Invalid email format')
return v
class CacheConfig(BaseModel):
enabled: bool = True
default_ttl: int = Field(default=3600, ge=60, le=86400) # 1 hour to 1 day
max_size: int = Field(default=1000, ge=10, le=100000)
key_prefix: str = "app"
class APIConfig(BaseModel):
title: str = "My API"
description: str = ""
version: str = "1.0.0"
docs_url: str = "/docs"
redoc_url: str = "/redoc"
cors_origins: List[str] = Field(default_factory=lambda: ["*"])
cors_methods: List[str] = Field(default_factory=lambda: ["GET", "POST", "PUT", "DELETE"])
rate_limit: str = "100/minute"
max_request_size: int = Field(default=10485760, gt=0) # 10MB
class MonitoringConfig(BaseModel):
enabled: bool = True
prometheus_metrics: bool = True
health_check_interval: int = Field(default=30, ge=10, le=300)
alerts_enabled: bool = True
webhook_url: Optional[str] = None
class ApplicationSettings(BaseSettings):
"""Main application settings"""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_nested_delimiter="__",
case_sensitive=False,
extra="ignore"
)
# Environment
environment: Literal["development", "testing", "staging", "production"] = "development"
debug: bool = Field(default=False)
# Database
database: DatabaseConfig
# Redis
redis: RedisConfig = Field(default_factory=RedisConfig)
# Logging
logging: LoggingConfig = Field(default_factory=LoggingConfig)
# Security
security: SecurityConfig
# Email
email: Optional[EmailConfig] = None
# Cache
cache: CacheConfig = Field(default_factory=CacheConfig)
# API
api: APIConfig = Field(default_factory=APIConfig)
# Monitoring
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
# Feature flags
feature_flags: Dict[str, bool] = Field(default_factory=dict)
@field_validator('environment')
@classmethod
def validate_environment(cls, v):
if v == "production":
# Additional validation for production
pass
return v
@property
def is_production(self) -> bool:
return self.environment == "production"
@property
def is_development(self) -> bool:
return self.environment == "development"
# Configuration loader with validation
class ConfigLoader:
def __init__(self, config_file: Optional[Path] = None):
self.config_file = config_file
def load_settings(self) -> ApplicationSettings:
"""Load and validate application settings"""
try:
if self.config_file and self.config_file.exists():
# Load from file if provided
import yaml
with open(self.config_file) as f:
config_data = yaml.safe_load(f)
return ApplicationSettings(**config_data)
else:
# Load from environment variables
return ApplicationSettings()
except Exception as e:
raise ValueError(f"Failed to load configuration: {e}")
def validate_production_config(self, settings: ApplicationSettings):
"""Additional validation for production environment"""
if not settings.is_production:
return
# Ensure sensitive settings are properly configured
if settings.debug:
raise ValueError("Debug mode must be disabled in production")
if settings.security.secret_key.get_secret_value() == "change-me":
raise ValueError("Secret key must be changed in production")
if not settings.database.password.get_secret_value():
raise ValueError("Database password is required in production")
# Usage example
def main():
# Load configuration
loader = ConfigLoader()
settings = loader.load_settings()
# Validate production settings
if settings.is_production:
loader.validate_production_config(settings)
print(f"Environment: {settings.environment}")
print(f"Database URL: {settings.database.url}")
print(f"Debug mode: {settings.debug}")
print(f"Feature flags: {settings.feature_flags}")
return settings
# Example .env file content:
"""
ENVIRONMENT=development
DEBUG=true
DATABASE__HOST=localhost
DATABASE__PORT=5432
DATABASE__DATABASE=myapp
DATABASE__USERNAME=admin
DATABASE__PASSWORD=secret123
REDIS__HOST=localhost
REDIS__PORT=6379
SECURITY__SECRET_KEY=your-secret-key-here
SECURITY__ACCESS_TOKEN_EXPIRE_MINUTES=60
LOGGING__LEVEL=DEBUG
LOGGING__FILE_PATH=logs/app.log
API__TITLE=My Amazing API
API__VERSION=2.0.0
FEATURE_FLAGS__NEW_DASHBOARD=true
FEATURE_FLAGS__BETA_FEATURES=false
"""
if __name__ == "__main__":
settings = main()Pythongraph TD
A[Environment Variables] --> B[Settings Loader]
C[Config File] --> B
B --> D[Validation]
D --> E[Application Settings]
E --> F[Database Config]
E --> G[Security Config]
E --> H[API Config]
E --> I[Logging Config]
J[Production Validation] --> K{Is Production?}
K -->|Yes| L[Additional Checks]
K -->|No| M[Standard Validation]Conclusion
This comprehensive guide has covered Pydantic v2 from basic concepts to advanced real-world applications. Here are the key takeaways:
What You’ve Learned
- Foundation: Understanding Pydantic’s core concepts and how it provides runtime type validation
- Advanced Validation: Custom validators, conditional validation, and cross-field validation
- Performance: Optimization techniques and memory management strategies
- Integration: Seamless integration with FastAPI and other frameworks
- Real-World Patterns: Factory patterns, repository patterns, and event-driven architectures
- Migration: Smooth transition from v1 to v2 with compatibility layers
Best Practices Summary
- Use Type Hints Effectively: Leverage Python’s type system for better code clarity
- Validate Early: Catch data issues at the boundary of your application
- Configure Appropriately: Use
ConfigDictto optimize for your specific use case - Handle Errors Gracefully: Provide meaningful error messages for better user experience
- Optimize for Performance: Use appropriate settings for production environments
- Test Thoroughly: Validate your models with comprehensive test cases
Next Steps
- Explore the official Pydantic documentation for the latest features
- Integrate Pydantic into your existing projects
- Contribute to the Pydantic community
- Stay updated with new releases and improvements
Pydantic v2 is a powerful tool that can significantly improve the reliability and maintainability of your Python applications. By following the patterns and practices outlined in this guide, you’ll be well-equipped to build robust, type-safe applications.
Happy coding with Pydantic v2! 🐍✨
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
