Table of Contents
- Introduction to SQLAlchemy
- Installation and Setup
- Core vs ORM
- Database Connections and Engines
- SQLAlchemy Core
- SQLAlchemy ORM Fundamentals
- Model Definition and Relationships
- Sessions and Transactions
- Querying with ORM
- Advanced ORM Features
- Performance Optimization
- Migration with Alembic
- Testing with SQLAlchemy
- Best Practices and Patterns
- Real-world Applications
1. Introduction to SQLAlchemy
SQLAlchemy is Python’s most popular SQL toolkit and Object-Relational Mapping (ORM) library. It provides a full suite of well-known enterprise-level persistence patterns, designed for efficient and high-performing database access.
What is SQLAlchemy?
SQLAlchemy consists of two main components:
- SQLAlchemy Core: A schema-centric model for database interaction
- SQLAlchemy ORM: An object-relational mapper built on top of Core
graph TD
A[SQLAlchemy] --> B[Core]
A --> C[ORM]
B --> D[Schema Definition]
B --> E[SQL Expression Language]
B --> F[Engine & Connection Pool]
C --> G[Declarative Models]
C --> H[Session Management]
C --> I[Relationship Mapping]Why SQLAlchemy?
- Database Agnostic: Works with PostgreSQL, MySQL, SQLite, Oracle, and more
- Mature and Stable: Battle-tested in production environments
- Flexible: Choose between Core and ORM based on your needs
- Performance: Efficient query generation and connection pooling
- Pythonic: Clean, readable syntax that follows Python conventions
2. Installation and Setup
Installing SQLAlchemy
# Basic installation
pip install sqlalchemy
# With PostgreSQL support
pip install sqlalchemy psycopg2-binary
# With MySQL support
pip install sqlalchemy PyMySQL
# With all database drivers
pip install sqlalchemy[postgresql,mysql,oracle]BashProject Structure
graph LR
A[Project Root] --> B[models/]
A --> C[database/]
A --> D[migrations/]
A --> E[tests/]
B --> F[__init__.py]
B --> G[user.py]
B --> H[base.py]
C --> I[connection.py]
C --> J[session.py]
D --> K[alembic/]Basic Setup
# database/connection.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Database URL - SQLite for this example
DATABASE_URL = "sqlite:///./test.db"
# Create engine
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()Python3. Core vs ORM
SQLAlchemy Architecture
graph TB
subgraph "SQLAlchemy ORM"
A[User Code] --> B[ORM]
B --> C[Core]
end
subgraph "SQLAlchemy Core"
C --> D[Schema/MetaData]
C --> E[SQL Expression Language]
C --> F[Engine]
end
F --> G[DBAPI]
G --> H[Database]When to Use Core vs ORM
| Feature | Core | ORM |
|---|---|---|
| Performance | Higher | Good |
| Learning Curve | Steeper | Gentler |
| SQL Control | Full | Abstracted |
| Object Mapping | Manual | Automatic |
| Complex Queries | Easier | More Complex |
| Rapid Development | Slower | Faster |
Core Example
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select
engine = create_engine('sqlite:///example.db')
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(120))
)
# Create tables
metadata.create_all(engine)
# Insert data
with engine.connect() as conn:
conn.execute(users.insert().values(name='John', email='john@example.com'))
conn.commit()
# Query data
with engine.connect() as conn:
result = conn.execute(select(users).where(users.c.name == 'John'))
print(result.fetchall())PythonORM Example
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
engine = create_engine('sqlite:///example.db')
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(120))
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# Insert data
user = User(name='John', email='john@example.com')
session.add(user)
session.commit()
# Query data
users = session.query(User).filter(User.name == 'John').all()
print(users)Python4. Database Connections and Engines
Engine Creation
graph LR
A[create_engine] --> B[Engine]
B --> C[Connection Pool]
B --> D[Dialect]
B --> E[DBAPI]
C --> F[Connection 1]
C --> G[Connection 2]
C --> H[Connection N]Database URLs
# SQLite
engine = create_engine('sqlite:///path/to/database.db')
engine = create_engine('sqlite:///:memory:') # In-memory database
# PostgreSQL
engine = create_engine('postgresql://user:password@localhost/dbname')
engine = create_engine('postgresql+psycopg2://user:password@localhost/dbname')
# MySQL
engine = create_engine('mysql://user:password@localhost/dbname')
engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# Oracle
engine = create_engine('oracle://user:password@localhost:1521/dbname')
# SQL Server
engine = create_engine('mssql+pyodbc://user:password@server/database?driver=ODBC+Driver+17+for+SQL+Server')PythonEngine Configuration
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
engine = create_engine(
'sqlite:///example.db',
# Connection pool settings
pool_size=10, # Number of connections to maintain
max_overflow=20, # Additional connections beyond pool_size
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections after 1 hour
# SQLite specific
connect_args={
"check_same_thread": False,
"timeout": 30
},
# Echo SQL statements
echo=True, # Print all SQL
echo_pool=True, # Print connection pool events
# Other options
isolation_level="AUTOCOMMIT"
)PythonConnection Management
# Method 1: Using engine.connect()
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users"))
print(result.fetchall())
# Method 2: Using engine.begin() for transactions
with engine.begin() as conn:
conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
conn.execute(text("INSERT INTO users (name) VALUES ('Bob')"))
# Automatically commits on success, rolls back on exception
# Method 3: Manual transaction control
conn = engine.connect()
trans = conn.begin()
try:
conn.execute(text("INSERT INTO users (name) VALUES ('Charlie')"))
trans.commit()
except:
trans.rollback()
raise
finally:
conn.close()Python5. SQLAlchemy Core
Metadata and Tables
from sqlalchemy import MetaData, Table, Column, Integer, String, DateTime, ForeignKey
from datetime import datetime
metadata = MetaData()
# Define tables
users_table = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('username', String(50), unique=True, nullable=False),
Column('email', String(120), unique=True, nullable=False),
Column('created_at', DateTime, default=datetime.utcnow),
Column('is_active', Boolean, default=True)
)
posts_table = Table(
'posts',
metadata,
Column('id', Integer, primary_key=True),
Column('title', String(200), nullable=False),
Column('content', Text),
Column('user_id', Integer, ForeignKey('users.id')),
Column('created_at', DateTime, default=datetime.utcnow),
Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
)
# Create all tables
metadata.create_all(engine)PythonCore Query Building
graph TD
A[SQL Expression] --> B[Select]
A --> C[Insert]
A --> D[Update]
A --> E[Delete]
B --> F[Where Clauses]
B --> G[Joins]
B --> H[Group By]
B --> I[Order By]
B --> J[Limit/Offset]CRUD Operations with Core
from sqlalchemy import select, insert, update, delete, and_, or_
# INSERT
insert_stmt = insert(users_table).values(
username='john_doe',
email='john@example.com'
)
with engine.connect() as conn:
result = conn.execute(insert_stmt)
user_id = result.inserted_primary_key[0]
conn.commit()
# INSERT multiple records
insert_stmt = insert(users_table)
data = [
{'username': 'alice', 'email': 'alice@example.com'},
{'username': 'bob', 'email': 'bob@example.com'},
{'username': 'charlie', 'email': 'charlie@example.com'}
]
with engine.connect() as conn:
conn.execute(insert_stmt, data)
conn.commit()
# SELECT
select_stmt = select(users_table).where(users_table.c.username == 'john_doe')
with engine.connect() as conn:
result = conn.execute(select_stmt)
user = result.fetchone()
print(user)
# SELECT with JOIN
join_stmt = select(
users_table.c.username,
posts_table.c.title
).select_from(
users_table.join(posts_table, users_table.c.id == posts_table.c.user_id)
).where(
users_table.c.is_active == True
)
# Complex WHERE clauses
complex_select = select(users_table).where(
and_(
users_table.c.is_active == True,
or_(
users_table.c.username.like('%john%'),
users_table.c.email.like('%@gmail.com')
)
)
)
# UPDATE
update_stmt = update(users_table).where(
users_table.c.username == 'john_doe'
).values(
email='john.doe@example.com'
)
with engine.connect() as conn:
result = conn.execute(update_stmt)
print(f"Updated {result.rowcount} rows")
conn.commit()
# DELETE
delete_stmt = delete(users_table).where(
users_table.c.is_active == False
)
with engine.connect() as conn:
result = conn.execute(delete_stmt)
print(f"Deleted {result.rowcount} rows")
conn.commit()PythonAdvanced Core Features
from sqlalchemy import func, case, cast, String
# Aggregate functions
count_stmt = select(func.count(users_table.c.id))
avg_stmt = select(func.avg(posts_table.c.id))
# CASE expressions
case_stmt = select(
users_table.c.username,
case(
(users_table.c.is_active == True, 'Active'),
else_='Inactive'
).label('status')
)
# Subqueries
subquery = select(func.count(posts_table.c.id)).where(
posts_table.c.user_id == users_table.c.id
).scalar_subquery()
select_with_subquery = select(
users_table.c.username,
subquery.label('post_count')
)
# Window functions
from sqlalchemy import text
window_stmt = select(
users_table.c.username,
func.row_number().over(order_by=users_table.c.created_at).label('row_num')
)
# Raw SQL with text()
raw_sql = text("""
SELECT u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.username
HAVING COUNT(p.id) > :min_posts
""")
with engine.connect() as conn:
result = conn.execute(raw_sql, {"min_posts": 5})
print(result.fetchall())Python6. SQLAlchemy ORM Fundamentals
Declarative Base
graph TD
A[declarative_base] --> B[Base Class]
B --> C[User Model]
B --> D[Post Model]
B --> E[Category Model]
C --> F[__tablename__]
C --> G[Columns]
C --> H[Relationships]
C --> I[Methods]Basic Model Definition
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
# Primary key
id = Column(Integer, primary_key=True, index=True)
# Basic columns
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(120), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Boolean fields
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
# Relationships (will be defined later)
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
def __str__(self):
return self.username
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text)
# Foreign key
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title='{self.title[:30]}...')>"PythonColumn Types and Constraints
from sqlalchemy import (
Integer, String, Text, DateTime, Date, Time, Boolean,
Float, Numeric, DECIMAL, BigInteger, SmallInteger,
JSON, ARRAY, Enum, LargeBinary, Unicode, UnicodeText
)
from sqlalchemy.dialects.postgresql import UUID, JSONB
from enum import Enum as PyEnum
import uuid
class UserStatus(PyEnum):
ACTIVE = "active"
INACTIVE = "inactive"
BANNED = "banned"
class User(Base):
__tablename__ = 'users'
# Numeric types
id = Column(Integer, primary_key=True)
age = Column(SmallInteger)
salary = Column(Numeric(10, 2)) # 10 digits, 2 decimal places
rating = Column(Float)
# String types
username = Column(String(50), nullable=False) # VARCHAR(50)
bio = Column(Text) # TEXT
profile_data = Column(JSON) # JSON
# Date/Time types
birth_date = Column(Date)
last_login = Column(DateTime)
daily_login_time = Column(Time)
# Special types
status = Column(Enum(UserStatus), default=UserStatus.ACTIVE)
profile_picture = Column(LargeBinary) # BLOB
external_id = Column(UUID(as_uuid=True), default=uuid.uuid4)
# Constraints
email = Column(
String(120),
unique=True,
nullable=False,
index=True
)
# Check constraints
__table_args__ = (
CheckConstraint('age >= 0', name='check_age_positive'),
CheckConstraint('salary >= 0', name='check_salary_positive'),
Index('idx_username_email', 'username', 'email'),
)PythonModel Methods and Properties
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import func
import hashlib
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
first_name = Column(String(50))
last_name = Column(String(50))
email = Column(String(120), unique=True)
_password_hash = Column('password_hash', String(255))
@hybrid_property
def full_name(self):
"""Hybrid property for full name"""
return f"{self.first_name} {self.last_name}"
@full_name.expression
def full_name(cls):
"""SQL expression for full_name"""
return func.concat(cls.first_name, ' ', cls.last_name)
@property
def password(self):
"""Prevent password from being read"""
raise AttributeError('Password is not readable')
@password.setter
def password(self, password):
"""Hash password when setting"""
self._password_hash = hashlib.sha256(password.encode()).hexdigest()
def verify_password(self, password):
"""Verify password against hash"""
return self._password_hash == hashlib.sha256(password.encode()).hexdigest()
@classmethod
def get_by_email(cls, session, email):
"""Class method to get user by email"""
return session.query(cls).filter(cls.email == email).first()
def to_dict(self):
"""Convert model to dictionary"""
return {
'id': self.id,
'first_name': self.first_name,
'last_name': self.last_name,
'full_name': self.full_name,
'email': self.email
}
@staticmethod
def hash_password(password):
"""Static method to hash password"""
return hashlib.sha256(password.encode()).hexdigest()Python7. Model Definition and Relationships
Relationship Types
graph TD
A[SQLAlchemy Relationships] --> B[One-to-Many]
A --> C[Many-to-One]
A --> D[One-to-One]
A --> E[Many-to-Many]
B --> F[User → Posts]
C --> G[Post → User]
D --> H[User → Profile]
E --> I[Posts ↔ Tags]One-to-Many Relationships
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(String(120), unique=True)
# One-to-Many: One user can have many posts
posts = relationship(
"Post",
back_populates="author",
cascade="all, delete-orphan", # Delete posts when user is deleted
lazy="dynamic" # Return Query object instead of loading all posts
)
# One-to-Many: One user can have many orders
orders = relationship("Order", back_populates="customer")
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
# Many-to-One: Many posts belong to one user
author = relationship("User", back_populates="posts")PythonOne-to-One Relationships
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
# One-to-One: One user has one profile
profile = relationship(
"UserProfile",
back_populates="user",
uselist=False, # This makes it one-to-one
cascade="all, delete-orphan"
)
class UserProfile(Base):
__tablename__ = 'user_profiles'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), unique=True) # unique=True enforces one-to-one
bio = Column(Text)
avatar_url = Column(String(255))
birth_date = Column(Date)
# One-to-One: One profile belongs to one user
user = relationship("User", back_populates="profile")PythonMany-to-Many Relationships
# Association table for many-to-many relationship
post_tags = Table(
'post_tags',
Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200))
content = Column(Text)
# Many-to-Many: Posts can have many tags, tags can belong to many posts
tags = relationship(
"Tag",
secondary=post_tags,
back_populates="posts",
lazy="dynamic"
)
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
description = Column(Text)
# Many-to-Many: Tags can belong to many posts
posts = relationship(
"Post",
secondary=post_tags,
back_populates="tags",
lazy="dynamic"
)PythonAssociation Objects (Many-to-Many with Extra Data)
class UserRole(Base):
__tablename__ = 'user_roles'
user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True)
assigned_at = Column(DateTime, default=datetime.utcnow)
assigned_by = Column(Integer, ForeignKey('users.id'))
is_active = Column(Boolean, default=True)
# Relationships
user = relationship("User", foreign_keys=[user_id])
role = relationship("Role")
assigner = relationship("User", foreign_keys=[assigned_by])
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
# Many-to-Many with association object
role_associations = relationship("UserRole", foreign_keys="UserRole.user_id")
@property
def roles(self):
return [assoc.role for assoc in self.role_associations if assoc.is_active]
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
description = Column(Text)
user_associations = relationship("UserRole")PythonSelf-Referential Relationships
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
parent_id = Column(Integer, ForeignKey('categories.id'))
# Self-referential relationship
children = relationship(
"Category",
back_populates="parent",
cascade="all, delete-orphan"
)
parent = relationship("Category", back_populates="children", remote_side=[id])
def __repr__(self):
return f"<Category(id={self.id}, name='{self.name}')>"
# Usage example
with session:
# Create parent category
electronics = Category(name="Electronics")
session.add(electronics)
session.flush() # Get the ID
# Create child categories
phones = Category(name="Phones", parent_id=electronics.id)
laptops = Category(name="Laptops", parent_id=electronics.id)
session.add_all([phones, laptops])
session.commit()PythonRelationship Loading Strategies
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
# Different loading strategies
posts_lazy = relationship("Post", lazy="select") # Default: lazy loading
posts_eager = relationship("Post", lazy="joined") # JOIN eagerly
posts_subquery = relationship("Post", lazy="subquery") # Subquery eagerly
posts_dynamic = relationship("Post", lazy="dynamic") # Return Query object
posts_selectin = relationship("Post", lazy="selectin") # SELECT IN eagerly
# Example usage of different loading strategies
from sqlalchemy.orm import selectinload, joinedload, subqueryload
# Eager loading with query options
users = session.query(User).options(
joinedload(User.posts),
selectinload(User.profile)
).all()
# Lazy loading (default)
user = session.query(User).first()
posts = user.posts # This triggers a separate query
# Dynamic relationship
user = session.query(User).first()
recent_posts = user.posts.filter(Post.created_at > datetime(2023, 1, 1)).all()Python8. Sessions and Transactions
Session Lifecycle
graph TD
A[Session Created] --> B[Objects Added/Modified]
B --> C[Session.flush]
C --> D[SQL Sent to DB]
D --> E[Transaction Pending]
E --> F[Session.commit]
E --> G[Session.rollback]
F --> H[Transaction Committed]
G --> I[Changes Discarded]
H --> J[Session Cleared]
I --> J
J --> K[Session Closed]Session Configuration
from sqlalchemy.orm import sessionmaker, scoped_session
# Basic session configuration
SessionLocal = sessionmaker(
bind=engine,
autocommit=False, # Manual transaction control
autoflush=False, # Manual flushing
expire_on_commit=True # Expire objects after commit
)
# Scoped session for thread-safety
ScopedSession = scoped_session(SessionLocal)
# Session with custom configuration
CustomSession = sessionmaker(
bind=engine,
autocommit=False,
autoflush=True, # Auto-flush before queries
expire_on_commit=False, # Keep objects accessible after commit
class_=Session # Custom session class
)PythonSession Usage Patterns
# Pattern 1: Context manager (Recommended)
def create_user(username: str, email: str):
with SessionLocal() as session:
user = User(username=username, email=email)
session.add(user)
session.commit()
session.refresh(user) # Refresh to get database-generated values
return user
# Pattern 2: Try-except-finally
def update_user(user_id: int, **kwargs):
session = SessionLocal()
try:
user = session.query(User).filter(User.id == user_id).first()
if user:
for key, value in kwargs.items():
setattr(user, key, value)
session.commit()
return user
return None
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# Pattern 3: Using scoped session
def get_user_by_id(user_id: int):
user = ScopedSession.query(User).filter(User.id == user_id).first()
ScopedSession.remove() # Clean up the session
return userPythonTransaction Management
# Automatic transaction management
with SessionLocal() as session:
# Transaction starts automatically
user = User(username="john", email="john@example.com")
session.add(user)
post = Post(title="My First Post", content="Hello World!", author=user)
session.add(post)
# Commit all changes
session.commit() # Transaction committed
# Manual transaction management
session = SessionLocal()
transaction = session.begin()
try:
# Multiple operations
user = User(username="alice", email="alice@example.com")
session.add(user)
session.flush() # Send to DB but don't commit
post = Post(title="Alice's Post", content="Content", user_id=user.id)
session.add(post)
# Commit transaction
transaction.commit()
except Exception as e:
# Rollback on error
transaction.rollback()
print(f"Error: {e}")
finally:
session.close()
# Nested transactions (Savepoints)
with SessionLocal() as session:
user = User(username="bob", email="bob@example.com")
session.add(user)
# Create savepoint
savepoint = session.begin_nested()
try:
# This might fail
duplicate_user = User(username="bob", email="duplicate@example.com")
session.add(duplicate_user)
session.flush() # This will raise an error due to unique constraint
savepoint.commit()
except Exception:
savepoint.rollback() # Rollback to savepoint
print("Duplicate username, continuing without second user")
# Main transaction continues
session.commit()PythonSession State Management
from sqlalchemy.orm import object_session
from sqlalchemy import inspect
# Object states
def demonstrate_object_states():
session = SessionLocal()
# Transient: Object not in session
user = User(username="temp", email="temp@example.com")
print(f"Transient: {user in session}") # False
# Pending: Object added to session but not flushed
session.add(user)
print(f"Pending: {user in session}") # True
print(f"User ID before flush: {user.id}") # None
# Persistent: Object flushed to database
session.flush()
print(f"Persistent: {user in session}") # True
print(f"User ID after flush: {user.id}") # Actual ID
# Detached: Object removed from session
session.expunge(user)
print(f"Detached: {user in session}") # False
# Deleted: Object marked for deletion
session.add(user) # Re-attach
session.delete(user)
print(f"Deleted: {user in session}") # True (still in session but marked for deletion)
session.commit()
session.close()
# Session utilities
def session_utilities():
session = SessionLocal()
user = User(username="test", email="test@example.com")
session.add(user)
session.flush()
# Check object state
state = inspect(user)
print(f"Persistent: {state.persistent}")
print(f"Pending: {state.pending}")
print(f"Transient: {state.transient}")
print(f"Detached: {state.detached}")
# Get session from object
obj_session = object_session(user)
print(f"Same session: {obj_session is session}")
# Session info
print(f"New objects: {session.new}")
print(f"Dirty objects: {session.dirty}")
print(f"Deleted objects: {session.deleted}")
# Merge detached object
session.expunge(user)
merged_user = session.merge(user)
print(f"Merged user: {merged_user}")
session.close()PythonSession Events
from sqlalchemy import event
# Session events
@event.listens_for(SessionLocal, 'before_commit')
def before_commit_handler(session):
print("About to commit session")
for obj in session.new:
print(f"New object: {obj}")
for obj in session.dirty:
print(f"Modified object: {obj}")
for obj in session.deleted:
print(f"Deleted object: {obj}")
@event.listens_for(SessionLocal, 'after_commit')
def after_commit_handler(session):
print("Session committed successfully")
@event.listens_for(SessionLocal, 'after_rollback')
def after_rollback_handler(session):
print("Session rolled back")
# Instance events
@event.listens_for(User, 'before_insert')
def before_insert_user(mapper, connection, target):
print(f"About to insert user: {target.username}")
target.created_at = datetime.utcnow()
@event.listens_for(User, 'after_insert')
def after_insert_user(mapper, connection, target):
print(f"Inserted user with ID: {target.id}")Python9. Querying with ORM
Basic Querying
graph TD
A[Query Object] --> B[Filtering]
A --> C[Ordering]
A --> D[Limiting]
A --> E[Grouping]
A --> F[Joining]
B --> G[filter]
B --> H[filter_by]
B --> I[where]
C --> J[order_by]
D --> K[limit]
D --> L[offset]
E --> M[group_by]
E --> N[having]
F --> O[join]
F --> P[outerjoin]Query Construction
# Basic queries
def basic_queries():
session = SessionLocal()
# Get all users
all_users = session.query(User).all()
# Get first user
first_user = session.query(User).first()
# Get user by primary key
user = session.query(User).get(1) # Deprecated in 2.0
user = session.get(User, 1) # New way
# Count
user_count = session.query(User).count()
# Check existence
exists = session.query(User).filter(User.username == 'john').exists()
has_user = session.query(exists).scalar()
session.close()
# Filtering
def filtering_examples():
session = SessionLocal()
# filter() - uses column expressions
users = session.query(User).filter(User.username == 'john').all()
users = session.query(User).filter(User.age > 18).all()
users = session.query(User).filter(User.email.like('%@gmail.com')).all()
# filter_by() - uses keyword arguments
users = session.query(User).filter_by(username='john').all()
users = session.query(User).filter_by(is_active=True).all()
# Multiple filters
users = session.query(User).filter(
User.age > 18,
User.is_active == True,
User.email.like('%@gmail.com')
).all()
# OR conditions
from sqlalchemy import or_
users = session.query(User).filter(
or_(
User.username == 'john',
User.username == 'jane'
)
).all()
# AND conditions (explicit)
from sqlalchemy import and_
users = session.query(User).filter(
and_(
User.age > 18,
User.is_active == True
)
).all()
# IN clause
users = session.query(User).filter(
User.username.in_(['john', 'jane', 'bob'])
).all()
# NOT conditions
from sqlalchemy import not_
users = session.query(User).filter(
not_(User.username == 'admin')
).all()
# IS NULL / IS NOT NULL
users = session.query(User).filter(User.last_login.is_(None)).all()
users = session.query(User).filter(User.last_login.isnot(None)).all()
session.close()PythonAdvanced Filtering
from sqlalchemy import func, case, extract, cast, String
def advanced_filtering():
session = SessionLocal()
# String operations
users = session.query(User).filter(
User.username.startswith('j')
).all()
users = session.query(User).filter(
User.username.endswith('son')
).all()
users = session.query(User).filter(
User.username.contains('oh')
).all()
# Case-insensitive operations
users = session.query(User).filter(
func.lower(User.username) == 'john'
).all()
# Date operations
from datetime import datetime, timedelta
recent_users = session.query(User).filter(
User.created_at > datetime.utcnow() - timedelta(days=30)
).all()
# Extract date parts
users_by_year = session.query(User).filter(
extract('year', User.created_at) == 2023
).all()
# Regex (PostgreSQL)
users = session.query(User).filter(
User.email.op('~')('.*@gmail\.com$')
).all()
# CASE expressions
user_status = session.query(
User.username,
case(
(User.is_active == True, 'Active'),
(User.is_active == False, 'Inactive'),
else_='Unknown'
).label('status')
).all()
# Subqueries
subquery = session.query(func.avg(User.age)).subquery()
above_avg_users = session.query(User).filter(
User.age > subquery.c.avg_1
).all()
session.close()PythonJoins and Relationships
def join_examples():
session = SessionLocal()
# Inner join using relationship
users_with_posts = session.query(User).join(User.posts).all()
# Explicit inner join
users_with_posts = session.query(User).join(Post, User.id == Post.user_id).all()
# Left outer join
all_users_with_posts = session.query(User).outerjoin(User.posts).all()
# Select specific columns from joined tables
user_post_data = session.query(
User.username,
Post.title,
Post.created_at
).join(Post).all()
# Multiple joins
user_post_tag_data = session.query(
User.username,
Post.title,
Tag.name
).join(Post).join(Post.tags).all()
# Self join (for hierarchical data)
parent_child = session.query(
Category.name.label('parent'),
Category2.name.label('child')
).join(
Category2,
Category.id == Category2.parent_id,
aliased=True
).all()
# Subquery joins
from sqlalchemy.orm import aliased
post_count_subq = session.query(
Post.user_id,
func.count(Post.id).label('post_count')
).group_by(Post.user_id).subquery()
users_with_post_count = session.query(
User,
post_count_subq.c.post_count
).outerjoin(
post_count_subq,
User.id == post_count_subq.c.user_id
).all()
session.close()PythonAggregation and Grouping
def aggregation_examples():
session = SessionLocal()
# Count by group
posts_per_user = session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.username).all()
# Having clause
prolific_users = session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.username).having(
func.count(Post.id) > 5
).all()
# Various aggregations
user_stats = session.query(
func.count(User.id).label('total_users'),
func.avg(User.age).label('avg_age'),
func.min(User.created_at).label('first_user'),
func.max(User.created_at).label('latest_user')
).first()
# Group by multiple columns
monthly_posts = session.query(
extract('year', Post.created_at).label('year'),
extract('month', Post.created_at).label('month'),
func.count(Post.id).label('post_count')
).group_by(
extract('year', Post.created_at),
extract('month', Post.created_at)
).order_by('year', 'month').all()
# Window functions
ranked_users = session.query(
User.username,
func.count(Post.id).label('post_count'),
func.rank().over(
order_by=func.count(Post.id).desc()
).label('rank')
).join(Post).group_by(User.username).all()
session.close()PythonQuery Optimization
def query_optimization():
session = SessionLocal()
# Eager loading with joinedload
from sqlalchemy.orm import joinedload, selectinload, subqueryload
# Load users with their posts in one query
users = session.query(User).options(
joinedload(User.posts)
).all()
# Load users with posts and tags
users = session.query(User).options(
joinedload(User.posts).joinedload(Post.tags)
).all()
# Select in loading (better for one-to-many)
users = session.query(User).options(
selectinload(User.posts)
).all()
# Load only specific columns
user_names = session.query(User.username).all()
# Load entities partially
from sqlalchemy.orm import load_only
users = session.query(User).options(
load_only(User.username, User.email)
).all()
# Defer loading of large columns
from sqlalchemy.orm import defer
users = session.query(User).options(
defer(User.profile_picture)
).all()
# Batch loading
users = session.query(User).filter(
User.id.in_([1, 2, 3, 4, 5])
).all()
# Raw SQL for complex queries
result = session.execute(text("""
SELECT u.username, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
GROUP BY u.username
ORDER BY post_count DESC
LIMIT 10
"""))
top_users = result.fetchall()
session.close()PythonQuery Chaining and Reusability
class UserRepository:
def __init__(self, session):
self.session = session
def base_query(self):
"""Base query for users"""
return self.session.query(User)
def active_users(self):
"""Get only active users"""
return self.base_query().filter(User.is_active == True)
def by_email_domain(self, domain):
"""Filter by email domain"""
return self.active_users().filter(
User.email.like(f'%@{domain}')
)
def with_posts(self):
"""Include posts in query"""
return self.active_users().options(
joinedload(User.posts)
)
def recent_users(self, days=30):
"""Users created in the last N days"""
cutoff = datetime.utcnow() - timedelta(days=days)
return self.active_users().filter(
User.created_at > cutoff
)
def paginate(self, query, page=1, per_page=20):
"""Add pagination to query"""
return query.offset(
(page - 1) * per_page
).limit(per_page)
# Usage
def repository_usage():
session = SessionLocal()
repo = UserRepository(session)
# Chain methods
gmail_users = repo.by_email_domain('gmail.com').all()
recent_gmail_users = repo.by_email_domain('gmail.com').filter(
User.created_at > datetime(2023, 1, 1)
).all()
# Pagination
page_1_users = repo.paginate(repo.active_users(), page=1, per_page=10).all()
session.close()Python10. Advanced ORM Features
Inheritance Mapping
graph TD
A[Table Inheritance] --> B[Single Table]
A --> C[Joined Table]
A --> D[Concrete Table]
B --> E[All classes in one table]
C --> F[Base table + child tables]
D --> G[Separate table per class]Single Table Inheritance
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Person(Base):
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
name = Column(String(50))
type = Column(String(20))
__mapper_args__ = {
'polymorphic_identity': 'person',
'polymorphic_on': type
}
class Employee(Person):
employee_id = Column(String(20))
department = Column(String(50))
__mapper_args__ = {
'polymorphic_identity': 'employee'
}
class Customer(Person):
customer_code = Column(String(20))
credit_limit = Column(Integer)
__mapper_args__ = {
'polymorphic_identity': 'customer'
}
# Usage
session = SessionLocal()
# Create instances
emp = Employee(name='John Doe', employee_id='E001', department='IT')
cust = Customer(name='Jane Smith', customer_code='C001', credit_limit=5000)
session.add_all([emp, cust])
session.commit()
# Query polymorphically
all_people = session.query(Person).all() # Returns both employees and customers
employees_only = session.query(Employee).all() # Returns only employeesPythonJoined Table Inheritance
class Person(Base):
__tablename__ = 'people'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(120))
type = Column(String(20))
__mapper_args__ = {
'polymorphic_identity': 'person',
'polymorphic_on': type,
'with_polymorphic': '*'
}
class Employee(Person):
__tablename__ = 'employees'
id = Column(Integer, ForeignKey('people.id'), primary_key=True)
employee_id = Column(String(20))
department = Column(String(50))
salary = Column(Integer)
__mapper_args__ = {
'polymorphic_identity': 'employee'
}
class Customer(Person):
__tablename__ = 'customers'
id = Column(Integer, ForeignKey('people.id'), primary_key=True)
customer_code = Column(String(20))
credit_limit = Column(Integer)
__mapper_args__ = {
'polymorphic_identity': 'customer'
}
# This creates three tables: people, employees, customers
# Queries automatically join the tables when neededPythonHybrid Properties and Methods
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
from sqlalchemy import func, case
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
first_name = Column(String(50))
last_name = Column(String(50))
email = Column(String(120))
birth_date = Column(Date)
@hybrid_property
def full_name(self):
"""Instance-level full name"""
return f"{self.first_name} {self.last_name}"
@full_name.expression
def full_name(cls):
"""Class-level expression for SQL"""
return func.concat(cls.first_name, ' ', cls.last_name)
@hybrid_property
def age(self):
"""Calculate age from birth date"""
if self.birth_date:
return (datetime.now().date() - self.birth_date).days // 365
return None
@age.expression
def age(cls):
"""SQL expression for age calculation"""
return func.floor(
func.extract('days', func.now() - cls.birth_date) / 365
)
@hybrid_method
def is_older_than(self, age_threshold):
"""Check if user is older than threshold"""
return self.age > age_threshold
@is_older_than.expression
def is_older_than(cls, age_threshold):
"""SQL expression for age comparison"""
return cls.age > age_threshold
# Usage
session = SessionLocal()
# Instance usage
user = session.query(User).first()
print(user.full_name) # "John Doe"
print(user.age) # 25
# Query usage (generates SQL)
adults = session.query(User).filter(User.age >= 18).all()
seniors = session.query(User).filter(User.is_older_than(65)).all()
# Can use hybrid properties in order_by, etc.
ordered_users = session.query(User).order_by(User.full_name).all()PythonAssociation Proxies
from sqlalchemy.ext.associationproxy import association_proxy
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
# Many-to-many with User-Role association
user_roles = relationship("UserRole", back_populates="user")
# Association proxy to access roles directly
roles = association_proxy(
'user_roles', 'role',
creator=lambda role: UserRole(role=role)
)
# Association proxy for role names
role_names = association_proxy('user_roles', 'role_name')
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True)
name = Column(String(50))
user_roles = relationship("UserRole", back_populates="role")
users = association_proxy('user_roles', 'user')
class UserRole(Base):
__tablename__ = 'user_roles'
user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True)
assigned_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="user_roles")
role = relationship("Role", back_populates="user_roles")
@property
def role_name(self):
return self.role.name if self.role else None
# Usage
session = SessionLocal()
user = User(username='john')
admin_role = Role(name='admin')
user_role = Role(name='user')
# Direct assignment through association proxy
user.roles.append(admin_role)
user.roles.append(user_role)
session.add_all([user, admin_role, user_role])
session.commit()
# Access role names
print(user.role_names) # ['admin', 'user']PythonValidators and Events
from sqlalchemy import event
from sqlalchemy.orm import validates
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50))
email = Column(String(120))
age = Column(Integer)
@validates('email')
def validate_email(self, key, address):
"""Validate email format"""
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', address):
raise ValueError('Invalid email address')
return address.lower()
@validates('age')
def validate_age(self, key, age):
"""Validate age range"""
if age < 0 or age > 150:
raise ValueError('Age must be between 0 and 150')
return age
@validates('username')
def validate_username(self, key, username):
"""Validate username format"""
if len(username) < 3:
raise ValueError('Username must be at least 3 characters')
if not username.isalnum():
raise ValueError('Username must be alphanumeric')
return username
# Model events
@event.listens_for(User, 'before_insert')
def set_created_timestamp(mapper, connection, target):
"""Set created timestamp before insert"""
if not hasattr(target, 'created_at'):
target.created_at = datetime.utcnow()
@event.listens_for(User, 'before_update')
def set_updated_timestamp(mapper, connection, target):
"""Set updated timestamp before update"""
target.updated_at = datetime.utcnow()
@event.listens_for(User.email, 'set')
def email_changed(target, value, oldvalue, initiator):
"""Log email changes"""
if oldvalue != value and oldvalue is not None:
print(f"Email changed from {oldvalue} to {value}")Python11. Performance Optimization
Query Performance
graph TD
A[Query Performance] --> B[Eager Loading]
A --> C[Query Optimization]
A --> D[Connection Pooling]
A --> E[Caching]
B --> F[joinedload]
B --> G[selectinload]
B --> H[subqueryload]
C --> I[Index Usage]
C --> J[Batch Operations]
C --> K[Raw SQL]
E --> L[Query Cache]
E --> M[Session Cache]Eager Loading Strategies
from sqlalchemy.orm import joinedload, selectinload, subqueryload, contains_eager
# Different loading strategies comparison
def loading_strategies():
session = SessionLocal()
# Lazy loading (default) - N+1 problem
users = session.query(User).all()
for user in users:
print(f"{user.username}: {len(user.posts)}") # Each access triggers a query
# Joined loading - single query with LEFT JOIN
users = session.query(User).options(
joinedload(User.posts)
).all()
# Select-in loading - two queries (better for one-to-many)
users = session.query(User).options(
selectinload(User.posts)
).all()
# Subquery loading - nested query
users = session.query(User).options(
subqueryload(User.posts)
).all()
# Contains eager - for explicit joins
users = session.query(User).join(User.posts).options(
contains_eager(User.posts)
).all()
# Mixed loading strategies
users = session.query(User).options(
joinedload(User.profile), # One-to-one: use joined
selectinload(User.posts), # One-to-many: use selectin
selectinload(User.posts).joinedload(Post.tags) # Chain loading
).all()
session.close()
# Optimize specific queries
def optimized_queries():
session = SessionLocal()
# Load only needed columns
from sqlalchemy.orm import load_only
users = session.query(User).options(
load_only(User.id, User.username, User.email)
).all()
# Defer large columns
from sqlalchemy.orm import defer
users = session.query(User).options(
defer(User.profile_picture),
defer(User.large_text_field)
).all()
# Batch queries with IN clause
user_ids = [1, 2, 3, 4, 5]
users = session.query(User).filter(User.id.in_(user_ids)).all()
# Use exists() for checking existence
has_active_user = session.query(
session.query(User).filter(User.is_active == True).exists()
).scalar()
session.close()PythonBulk Operations
def bulk_operations():
session = SessionLocal()
# Bulk insert (fastest for many records)
users_data = [
{'username': f'user_{i}', 'email': f'user_{i}@example.com'}
for i in range(1000)
]
session.bulk_insert_mappings(User, users_data)
# Bulk update
session.bulk_update_mappings(User, [
{'id': 1, 'username': 'updated_user_1'},
{'id': 2, 'username': 'updated_user_2'},
])
# Using Core for bulk operations (even faster)
from sqlalchemy import insert, update
# Bulk insert with Core
insert_stmt = insert(User.__table__)
session.execute(insert_stmt, users_data)
# Bulk update with Core
update_stmt = update(User.__table__).where(
User.id.in_([1, 2, 3])
).values(is_active=False)
session.execute(update_stmt)
session.commit()
session.close()PythonConnection Pool Optimization
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool, NullPool
# Optimized engine configuration
def create_optimized_engine(database_url):
return create_engine(
database_url,
# Pool settings
poolclass=QueuePool,
pool_size=20, # Number of permanent connections
max_overflow=30, # Additional connections
pool_pre_ping=True, # Test connections before use
pool_recycle=3600, # Recycle connections after 1 hour
# Connection settings
connect_args={
"application_name": "my_app",
"options": "-c default_transaction_isolation=read_committed"
},
# Performance settings
echo=False, # Disable SQL logging in production
future=True # Use SQLAlchemy 2.0 style
)
# For high-concurrency applications
def create_async_engine(database_url):
from sqlalchemy.ext.asyncio import create_async_engine
return create_async_engine(
database_url.replace('postgresql://', 'postgresql+asyncpg://'),
pool_size=50,
max_overflow=100,
pool_pre_ping=True
)PythonQuery Caching
from sqlalchemy.orm import Query
import hashlib
import pickle
class CachedQuery:
def __init__(self, cache_backend=None):
self.cache = cache_backend or {}
def _cache_key(self, query):
"""Generate cache key from query"""
query_str = str(query.statement.compile(compile_kwargs={"literal_binds": True}))
return hashlib.md5(query_str.encode()).hexdigest()
def get_or_execute(self, query, timeout=300):
"""Get result from cache or execute query"""
cache_key = self._cache_key(query)
# Try to get from cache
if cache_key in self.cache:
return pickle.loads(self.cache[cache_key])
# Execute query and cache result
result = query.all()
self.cache[cache_key] = pickle.dumps(result)
return result
# Usage with Redis cache
def setup_redis_cache():
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class RedisCache:
def __init__(self, client):
self.client = client
def __contains__(self, key):
return self.client.exists(key)
def __getitem__(self, key):
return self.client.get(key)
def __setitem__(self, key, value):
self.client.setex(key, 300, value) # 5-minute expiry
return CachedQuery(RedisCache(redis_client))
# Application-level caching
def cached_user_by_email(email, cache=None):
if cache and email in cache:
return cache[email]
session = SessionLocal()
user = session.query(User).filter(User.email == email).first()
session.close()
if cache and user:
cache[email] = user
return userPythonDatabase Indexes
from sqlalchemy import Index, text
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
email = Column(String(120), unique=True)
first_name = Column(String(50))
last_name = Column(String(50))
created_at = Column(DateTime, default=datetime.utcnow)
is_active = Column(Boolean, default=True)
# Table-level indexes
__table_args__ = (
# Composite index for common query patterns
Index('idx_user_name', 'first_name', 'last_name'),
# Partial index for active users only
Index('idx_active_users', 'created_at', postgresql_where=text('is_active = true')),
# Functional index for case-insensitive searches
Index('idx_username_lower', text('lower(username)')),
# Covering index (includes additional columns)
Index('idx_email_covering', 'email', 'first_name', 'last_name', 'is_active'),
)
# Create indexes programmatically
def create_custom_indexes(engine):
# GIN index for full-text search (PostgreSQL)
gin_index = text("""
CREATE INDEX IF NOT EXISTS idx_user_fulltext
ON users USING gin(to_tsvector('english', first_name || ' ' || last_name))
""")
# Hash index for exact matches (PostgreSQL)
hash_index = text("""
CREATE INDEX IF NOT EXISTS idx_user_email_hash
ON users USING hash(email)
""")
with engine.connect() as conn:
conn.execute(gin_index)
conn.execute(hash_index)
conn.commit()Python12. Migration with Alembic
REF:- https://blog.altgr.in/?p=943
Alembic Setup
graph TD
A[Alembic] --> B[alembic init]
B --> C[alembic.ini]
B --> D[alembic/]
D --> E[env.py]
D --> F[script.py.mako]
D --> G[versions/]
A --> H[Commands]
H --> I[revision]
H --> J[upgrade]
H --> K[downgrade]
H --> L[history]Initial Setup
# Install Alembic
pip install alembic
# Initialize Alembic in your project
alembic init alembic
# Edit alembic.ini to set database URL
# sqlalchemy.url = postgresql://user:pass@localhost/dbnameBashAlembic Configuration
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
import sys
# Add your project to the path
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# Import your models
from models.base import Base
from models.user import User
from models.post import Post
# This is the Alembic Config object
config = context.config
# Set up loggers
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Target metadata
target_metadata = Base.metadata
def get_url():
"""Get database URL from environment or config"""
return os.getenv("DATABASE_URL") or config.get_main_option("sqlalchemy.url")
def run_migrations_offline():
"""Run migrations in 'offline' mode."""
url = get_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode."""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = get_url()
connectable = engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()PythonCreating Migrations
# Create a new migration
alembic revision --autogenerate -m "Create user table"
# Create empty migration
alembic revision -m "Custom migration"
# Check current revision
alembic current
# Show migration history
alembic history --verbose
# Upgrade to latest
alembic upgrade head
# Upgrade to specific revision
alembic upgrade +2 # Move forward 2 revisions
alembic upgrade ae1027a6acf # Upgrade to specific revision
# Downgrade
alembic downgrade -1 # Move back 1 revision
alembic downgrade base # Downgrade to beginningBashCustom Migration Scripts
# Example migration file: versions/001_create_user_table.py
"""Create user table
Revision ID: ae1027a6acf
Revises:
Create Date: 2023-01-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers
revision = 'ae1027a6acf'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# Create users table
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=50), nullable=False),
sa.Column('email', sa.String(length=120), nullable=False),
sa.Column('password_hash', sa.String(length=255), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# Create indexes
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
# Add check constraint
op.create_check_constraint(
'check_email_format',
'users',
sa.text("email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'")
)
def downgrade():
# Drop constraints and indexes
op.drop_constraint('check_email_format', 'users', type_='check')
op.drop_index(op.f('ix_users_username'), table_name='users')
op.drop_index(op.f('ix_users_email'), table_name='users')
# Drop table
op.drop_table('users')PythonComplex Migration Examples
# Data migration example
def upgrade():
# Create new column
op.add_column('users', sa.Column('full_name', sa.String(100), nullable=True))
# Update data using raw SQL
connection = op.get_bind()
connection.execute(
sa.text("""
UPDATE users
SET full_name = first_name || ' ' || last_name
WHERE first_name IS NOT NULL AND last_name IS NOT NULL
""")
)
# Make column non-nullable after populating data
op.alter_column('users', 'full_name', nullable=False)
# Table rename and restructure
def upgrade():
# Rename table
op.rename_table('old_posts', 'posts')
# Add new columns
op.add_column('posts', sa.Column('slug', sa.String(200), nullable=True))
# Rename column
op.alter_column('posts', 'content', new_column_name='body')
# Change column type
op.alter_column('posts', 'views',
type_=sa.BigInteger(),
existing_type=sa.Integer())
# Foreign key modifications
def upgrade():
# Drop foreign key
op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')
# Add new foreign key with different options
op.create_foreign_key(
'fk_posts_author_id', 'posts', 'users',
['author_id'], ['id'],
ondelete='CASCADE',
onupdate='RESTRICT'
)PythonEnvironment-Specific Migrations
# alembic/env.py - Environment-specific configuration
import os
def run_migrations_online():
"""Run migrations in 'online' mode."""
configuration = config.get_section(config.config_ini_section)
# Environment-specific settings
env = os.getenv('ENVIRONMENT', 'development')
if env == 'production':
# Production settings
configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
configuration["sqlalchemy.pool_timeout"] = "60"
configuration["sqlalchemy.pool_recycle"] = "3600"
elif env == 'test':
# Test settings
configuration["sqlalchemy.url"] = "sqlite:///:memory:"
else:
# Development settings
configuration["sqlalchemy.url"] = "sqlite:///development.db"
connectable = engine_from_config(configuration, prefix="sqlalchemy.")
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True, # Compare column types
compare_server_default=True # Compare default values
)
with context.begin_transaction():
context.run_migrations()Python13. Testing with SQLAlchemy
Test Database Setup
graph TD
A[Test Setup] --> B[In-Memory DB]
A --> C[Test Fixtures]
A --> D[Factory Pattern]
A --> E[Transaction Rollback]
B --> F[SQLite :memory:]
C --> G[pytest fixtures]
D --> H[Factory Boy]
E --> I[Session Rollback]Basic Test Configuration
# tests/conftest.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models.base import Base
# Test database URL
TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="session")
def engine():
"""Create test database engine"""
engine = create_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
echo=False # Set to True for SQL debugging
)
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture(scope="function")
def session(engine):
"""Create database session for each test"""
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# Start a transaction
transaction = session.begin()
yield session
# Rollback transaction after test
transaction.rollback()
session.close()
@pytest.fixture
def sample_user(session):
"""Create a sample user for testing"""
from models.user import User
user = User(
username="testuser",
email="test@example.com",
password="testpass123"
)
session.add(user)
session.commit()
session.refresh(user)
return userPythonModel Testing
# tests/test_models.py
import pytest
from datetime import datetime
from models.user import User
from models.post import Post
class TestUser:
def test_create_user(self, session):
"""Test user creation"""
user = User(
username="newuser",
email="new@example.com",
password="password123"
)
session.add(user)
session.commit()
assert user.id is not None
assert user.username == "newuser"
assert user.email == "new@example.com"
assert user.created_at is not None
def test_user_validation(self, session):
"""Test user validation"""
# Test invalid email
with pytest.raises(ValueError, match="Invalid email"):
user = User(
username="user",
email="invalid-email",
password="pass"
)
session.add(user)
session.commit()
def test_user_relationships(self, session, sample_user):
"""Test user-post relationship"""
post = Post(
title="Test Post",
content="This is a test post",
author=sample_user
)
session.add(post)
session.commit()
assert len(sample_user.posts) == 1
assert sample_user.posts[0].title == "Test Post"
assert post.author == sample_user
def test_user_methods(self, session):
"""Test user custom methods"""
user = User(
username="john",
email="john@example.com",
password="secret123"
)
session.add(user)
session.commit()
# Test password verification
assert user.verify_password("secret123") is True
assert user.verify_password("wrong") is False
# Test class method
found_user = User.get_by_email(session, "john@example.com")
assert found_user == userPythonRepository Testing
# tests/test_repositories.py
import pytest
from repositories.user_repository import UserRepository
from models.user import User
class TestUserRepository:
@pytest.fixture
def user_repo(self, session):
return UserRepository(session)
def test_create_user(self, user_repo):
"""Test user creation through repository"""
user_data = {
'username': 'newuser',
'email': 'new@example.com',
'password': 'password123'
}
user = user_repo.create(user_data)
assert user.id is not None
assert user.username == 'newuser'
def test_get_by_id(self, user_repo, sample_user):
"""Test getting user by ID"""
user = user_repo.get_by_id(sample_user.id)
assert user == sample_user
def test_get_by_email(self, user_repo, sample_user):
"""Test getting user by email"""
user = user_repo.get_by_email(sample_user.email)
assert user == sample_user
def test_update_user(self, user_repo, sample_user):
"""Test user update"""
updated_user = user_repo.update(
sample_user.id,
{'username': 'updated_username'}
)
assert updated_user.username == 'updated_username'
def test_delete_user(self, user_repo, sample_user):
"""Test user deletion"""
user_id = sample_user.id
user_repo.delete(user_id)
deleted_user = user_repo.get_by_id(user_id)
assert deleted_user is None
def test_list_users_with_pagination(self, user_repo):
"""Test user listing with pagination"""
# Create multiple users
for i in range(15):
user_repo.create({
'username': f'user_{i}',
'email': f'user_{i}@example.com',
'password': 'password'
})
# Test pagination
page_1 = user_repo.list_users(page=1, per_page=10)
page_2 = user_repo.list_users(page=2, per_page=10)
assert len(page_1) == 10
assert len(page_2) == 5PythonTesting with Factory Boy
# tests/factories.py
import factory
from factory.alchemy import SQLAlchemyModelFactory
from models.user import User
from models.post import Post
from datetime import datetime
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
sqlalchemy_session_persistence = "commit"
username = factory.Sequence(lambda n: f"user_{n}")
email = factory.LazyAttribute(lambda obj: f"{obj.username}@example.com")
password = "defaultpassword"
first_name = factory.Faker('first_name')
last_name = factory.Faker('last_name')
is_active = True
created_at = factory.LazyFunction(datetime.utcnow)
class PostFactory(SQLAlchemyModelFactory):
class Meta:
model = Post
sqlalchemy_session_persistence = "commit"
title = factory.Faker('sentence', nb_words=4)
content = factory.Faker('text', max_nb_chars=500)
author = factory.SubFactory(UserFactory)
created_at = factory.LazyFunction(datetime.utcnow)
# Usage in tests
def test_with_factories(session):
# Set the session for factories
UserFactory._meta.sqlalchemy_session = session
PostFactory._meta.sqlalchemy_session = session
# Create test data
user = UserFactory()
posts = PostFactory.create_batch(5, author=user)
assert len(user.posts) == 5
assert all(post.author == user for post in posts)
# Advanced factory usage
class AdminUserFactory(UserFactory):
is_admin = True
username = factory.Sequence(lambda n: f"admin_{n}")
class PublishedPostFactory(PostFactory):
is_published = True
published_at = factory.LazyFunction(datetime.utcnow)PythonIntegration Testing
# tests/test_integration.py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models.base import Base
class TestIntegration:
@pytest.fixture(scope="class")
def integration_engine(self):
"""Use a real database for integration tests"""
engine = create_engine("postgresql://test:test@localhost/test_db")
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
@pytest.fixture
def integration_session(self, integration_engine):
SessionLocal = sessionmaker(bind=integration_engine)
session = SessionLocal()
yield session
session.close()
def test_full_user_workflow(self, integration_session):
"""Test complete user workflow"""
from services.user_service import UserService
user_service = UserService(integration_session)
# Create user
user = user_service.register_user(
username="integrationuser",
email="integration@example.com",
password="password123"
)
# Verify user exists
found_user = user_service.get_user_by_email("integration@example.com")
assert found_user == user
# Update user
updated_user = user_service.update_user_profile(
user.id,
first_name="John",
last_name="Doe"
)
assert updated_user.first_name == "John"
# Create posts
from services.post_service import PostService
post_service = PostService(integration_session)
post = post_service.create_post(
user_id=user.id,
title="Integration Test Post",
content="This is a test post for integration testing"
)
# Verify relationships
integration_session.refresh(user)
assert len(user.posts) == 1
assert user.posts[0] == post
def test_database_constraints(self, integration_session):
"""Test database-level constraints"""
from models.user import User
# Test unique constraint
user1 = User(username="unique", email="unique@example.com")
integration_session.add(user1)
integration_session.commit()
# This should fail due to unique constraint
user2 = User(username="unique", email="different@example.com")
integration_session.add(user2)
with pytest.raises(Exception): # Database integrity error
integration_session.commit()Python14. Best Practices and Patterns
Repository Pattern
# repositories/base_repository.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, List, Optional
from sqlalchemy.orm import Session
T = TypeVar('T')
class BaseRepository(Generic[T], ABC):
def __init__(self, session: Session, model_class: type):
self.session = session
self.model_class = model_class
def create(self, **kwargs) -> T:
"""Create new entity"""
entity = self.model_class(**kwargs)
self.session.add(entity)
self.session.commit()
self.session.refresh(entity)
return entity
def get_by_id(self, entity_id: int) -> Optional[T]:
"""Get entity by ID"""
return self.session.query(self.model_class).filter(
self.model_class.id == entity_id
).first()
def get_all(self, limit: int = 100, offset: int = 0) -> List[T]:
"""Get all entities with pagination"""
return self.session.query(self.model_class).offset(offset).limit(limit).all()
def update(self, entity_id: int, **kwargs) -> Optional[T]:
"""Update entity"""
entity = self.get_by_id(entity_id)
if entity:
for key, value in kwargs.items():
setattr(entity, key, value)
self.session.commit()
self.session.refresh(entity)
return entity
def delete(self, entity_id: int) -> bool:
"""Delete entity"""
entity = self.get_by_id(entity_id)
if entity:
self.session.delete(entity)
self.session.commit()
return True
return False
# repositories/user_repository.py
from models.user import User
from .base_repository import BaseRepository
class UserRepository(BaseRepository[User]):
def __init__(self, session: Session):
super().__init__(session, User)
def get_by_email(self, email: str) -> Optional[User]:
"""Get user by email"""
return self.session.query(User).filter(User.email == email).first()
def get_by_username(self, username: str) -> Optional[User]:
"""Get user by username"""
return self.session.query(User).filter(User.username == username).first()
def get_active_users(self) -> List[User]:
"""Get all active users"""
return self.session.query(User).filter(User.is_active == True).all()
def search_users(self, query: str) -> List[User]:
"""Search users by name or email"""
search_term = f"%{query}%"
return self.session.query(User).filter(
or_(
User.username.ilike(search_term),
User.email.ilike(search_term),
User.first_name.ilike(search_term),
User.last_name.ilike(search_term)
)
).all()PythonService Layer Pattern
# services/base_service.py
from abc import ABC
from sqlalchemy.orm import Session
class BaseService(ABC):
def __init__(self, session: Session):
self.session = session
# services/user_service.py
from typing import Optional, List
from repositories.user_repository import UserRepository
from .base_service import BaseService
import hashlib
class UserService(BaseService):
def __init__(self, session: Session):
super().__init__(session)
self.user_repo = UserRepository(session)
def register_user(self, username: str, email: str, password: str) -> User:
"""Register a new user"""
# Check if user already exists
if self.user_repo.get_by_email(email):
raise ValueError("User with this email already exists")
if self.user_repo.get_by_username(username):
raise ValueError("Username already taken")
# Hash password
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Create user
user = self.user_repo.create(
username=username,
email=email,
password_hash=password_hash
)
return user
def authenticate_user(self, email: str, password: str) -> Optional[User]:
"""Authenticate user login"""
user = self.user_repo.get_by_email(email)
if user and user.verify_password(password):
return user
return None
def update_user_profile(self, user_id: int, **kwargs) -> Optional[User]:
"""Update user profile"""
# Remove sensitive fields
sensitive_fields = ['password_hash', 'id']
for field in sensitive_fields:
kwargs.pop(field, None)
return self.user_repo.update(user_id, **kwargs)
def deactivate_user(self, user_id: int) -> bool:
"""Deactivate user account"""
return self.user_repo.update(user_id, is_active=False) is not NonePythonUnit of Work Pattern
# patterns/unit_of_work.py
from contextlib import contextmanager
from sqlalchemy.orm import Session
from repositories.user_repository import UserRepository
from repositories.post_repository import PostRepository
class UnitOfWork:
def __init__(self, session: Session):
self.session = session
self._users = None
self._posts = None
@property
def users(self) -> UserRepository:
if self._users is None:
self._users = UserRepository(self.session)
return self._users
@property
def posts(self) -> PostRepository:
if self._posts is None:
self._posts = PostRepository(self.session)
return self._posts
def commit(self):
"""Commit all changes"""
self.session.commit()
def rollback(self):
"""Rollback all changes"""
self.session.rollback()
@contextmanager
def unit_of_work(session: Session):
"""Context manager for unit of work"""
uow = UnitOfWork(session)
try:
yield uow
uow.commit()
except Exception:
uow.rollback()
raise
# Usage
def transfer_posts(from_user_id: int, to_user_id: int, session: Session):
"""Transfer all posts from one user to another"""
with unit_of_work(session) as uow:
from_user = uow.users.get_by_id(from_user_id)
to_user = uow.users.get_by_id(to_user_id)
if not from_user or not to_user:
raise ValueError("One or both users not found")
# Transfer posts
posts = uow.posts.get_by_user_id(from_user_id)
for post in posts:
uow.posts.update(post.id, user_id=to_user_id)
# Update user stats
uow.users.update(from_user_id, post_count=0)
uow.users.update(to_user_id, post_count=len(posts))
# All changes committed together or rolled back on errorPythonConfiguration Management
# config/database.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
class DatabaseConfig:
"""Database configuration management"""
def __init__(self):
self.environment = os.getenv('ENVIRONMENT', 'development')
self.database_url = self._get_database_url()
self.engine_options = self._get_engine_options()
def _get_database_url(self) -> str:
"""Get database URL based on environment"""
if self.environment == 'production':
return os.getenv('DATABASE_URL')
elif self.environment == 'test':
return os.getenv('TEST_DATABASE_URL', 'sqlite:///:memory:')
else:
return os.getenv('DEV_DATABASE_URL', 'sqlite:///development.db')
def _get_engine_options(self) -> dict:
"""Get engine options based on environment"""
base_options = {
'poolclass': QueuePool,
'pool_pre_ping': True,
'echo': False
}
if self.environment == 'production':
base_options.update({
'pool_size': 20,
'max_overflow': 30,
'pool_recycle': 3600,
'connect_args': {
'connect_timeout': 10,
'application_name': 'myapp_prod'
}
})
elif self.environment == 'test':
base_options.update({
'pool_size': 5,
'max_overflow': 10,
'connect_args': {'check_same_thread': False}
})
else:
base_options.update({
'echo': True, # Enable SQL logging in development
'pool_size': 5,
'max_overflow': 10
})
return base_options
def create_engine(self):
"""Create configured database engine"""
return create_engine(self.database_url, **self.engine_options)
def create_session_factory(self, engine):
"""Create session factory"""
return sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
# Usage
config = DatabaseConfig()
engine = config.create_engine()
SessionLocal = config.create_session_factory(engine)PythonError Handling and Logging
# utils/exceptions.py
class DatabaseError(Exception):
"""Base database exception"""
pass
class ValidationError(DatabaseError):
"""Data validation error"""
pass
class NotFoundError(DatabaseError):
"""Entity not found error"""
pass
class DuplicateError(DatabaseError):
"""Duplicate entity error"""
pass
# utils/logging_config.py
import logging
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Log slow queries
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - context._query_start_time
if total > 0.5: # Log queries slower than 500ms
logger.warning(f"Slow query: {total:.2f}s - {statement[:100]}...")
# Enhanced repository with error handling
class EnhancedUserRepository(UserRepository):
def create(self, **kwargs) -> User:
"""Create user with error handling"""
try:
return super().create(**kwargs)
except IntegrityError as e:
self.session.rollback()
if 'email' in str(e):
raise DuplicateError("Email already exists")
elif 'username' in str(e):
raise DuplicateError("Username already exists")
else:
raise DatabaseError(f"Database error: {str(e)}")
except Exception as e:
self.session.rollback()
logger.error(f"Unexpected error creating user: {str(e)}")
raise DatabaseError("Failed to create user")
def get_by_id(self, user_id: int) -> User:
"""Get user by ID with error handling"""
user = super().get_by_id(user_id)
if not user:
raise NotFoundError(f"User with ID {user_id} not found")
return userPython15. Real-world Applications
E-commerce Application
# models/ecommerce.py
from sqlalchemy import Column, Integer, String, Text, DECIMAL, DateTime, Boolean, ForeignKey, Table
from sqlalchemy.orm import relationship
from datetime import datetime
from models.base import Base
# Association table for order items
order_items = Table(
'order_items',
Base.metadata,
Column('order_id', Integer, ForeignKey('orders.id'), primary_key=True),
Column('product_id', Integer, ForeignKey('products.id'), primary_key=True),
Column('quantity', Integer, nullable=False),
Column('price', DECIMAL(10, 2), nullable=False) # Price at time of order
)
class Customer(Base):
__tablename__ = 'customers'
id = Column(Integer, primary_key=True)
first_name = Column(String(50), nullable=False)
last_name = Column(String(50), nullable=False)
email = Column(String(120), unique=True, nullable=False)
phone = Column(String(20))
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
addresses = relationship("Address", back_populates="customer", cascade="all, delete-orphan")
orders = relationship("Order", back_populates="customer")
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey('customers.id'), nullable=False)
street_address = Column(String(200), nullable=False)
city = Column(String(100), nullable=False)
state = Column(String(50), nullable=False)
postal_code = Column(String(20), nullable=False)
country = Column(String(50), nullable=False)
is_default = Column(Boolean, default=False)
# Relationships
customer = relationship("Customer", back_populates="addresses")
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True)
description = Column(Text)
parent_id = Column(Integer, ForeignKey('categories.id'))
# Self-referential relationship
children = relationship("Category", back_populates="parent", cascade="all, delete-orphan")
parent = relationship("Category", back_populates="children", remote_side=[id])
# Products in this category
products = relationship("Product", back_populates="category")
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
description = Column(Text)
price = Column(DECIMAL(10, 2), nullable=False)
stock_quantity = Column(Integer, default=0)
category_id = Column(Integer, ForeignKey('categories.id'))
sku = Column(String(50), unique=True, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
category = relationship("Category", back_populates="products")
orders = relationship("Order", secondary=order_items, back_populates="products")
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey('customers.id'), nullable=False)
order_date = Column(DateTime, default=datetime.utcnow)
total_amount = Column(DECIMAL(10, 2), nullable=False)
status = Column(String(20), default='pending') # pending, processing, shipped, delivered, cancelled
shipping_address_id = Column(Integer, ForeignKey('addresses.id'))
# Relationships
customer = relationship("Customer", back_populates="orders")
products = relationship("Product", secondary=order_items, back_populates="orders")
shipping_address = relationship("Address")PythonBlog Application
# models/blog.py
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Table
from sqlalchemy.orm import relationship
from sqlalchemy.ext.hybrid import hybrid_property
from datetime import datetime
from models.base import Base
# Many-to-many relationship between posts and tags
post_tags = Table(
'post_tags',
Base.metadata,
Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
first_name = Column(String(50))
last_name = Column(String(50))
bio = Column(Text)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
posts = relationship("Post", back_populates="author")
comments = relationship("Comment", back_populates="author")
@hybrid_property
def full_name(self):
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
return self.username
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(100), unique=True, nullable=False)
slug = Column(String(100), unique=True, nullable=False)
description = Column(Text)
# Relationships
posts = relationship("Post", back_populates="category")
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
slug = Column(String(50), unique=True, nullable=False)
# Relationships
posts = relationship("Post", secondary=post_tags, back_populates="tags")
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
slug = Column(String(200), unique=True, nullable=False)
content = Column(Text, nullable=False)
excerpt = Column(Text)
author_id = Column(Integer, ForeignKey('authors.id'), nullable=False)
category_id = Column(Integer, ForeignKey('categories.id'))
is_published = Column(Boolean, default=False)
published_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
view_count = Column(Integer, default=0)
# Relationships
author = relationship("Author", back_populates="posts")
category = relationship("Category", back_populates="posts")
tags = relationship("Tag", secondary=post_tags, back_populates="posts")
comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
@hybrid_property
def is_published_and_visible(self):
return self.is_published and self.published_at is not None
class Comment(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
author_id = Column(Integer, ForeignKey('authors.id'), nullable=False)
content = Column(Text, nullable=False)
is_approved = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
parent_id = Column(Integer, ForeignKey('comments.id')) # For threaded comments
# Relationships
post = relationship("Post", back_populates="comments")
author = relationship("Author", back_populates="comments")
replies = relationship("Comment", cascade="all, delete-orphan")
# services/blog_service.py
from sqlalchemy.orm import Session
from repositories.blog_repository import PostRepository, AuthorRepository
from utils.slug import generate_slug
class BlogService:
def __init__(self, session: Session):
self.session = session
self.post_repo = PostRepository(session)
self.author_repo = AuthorRepository(session)
def create_post(self, author_id: int, title: str, content: str, **kwargs) -> Post:
"""Create a new blog post"""
# Generate slug from title
slug = generate_slug(title)
# Ensure slug is unique
counter = 1
original_slug = slug
while self.post_repo.get_by_slug(slug):
slug = f"{original_slug}-{counter}"
counter += 1
post_data = {
'title': title,
'slug': slug,
'content': content,
'author_id': author_id,
**kwargs
}
return self.post_repo.create(**post_data)
def publish_post(self, post_id: int) -> Post:
"""Publish a post"""
return self.post_repo.update(post_id,
is_published=True,
published_at=datetime.utcnow())
def get_published_posts(self, limit: int = 10, offset: int = 0):
"""Get published posts with pagination"""
return self.post_repo.get_published_posts(limit=limit, offset=offset)
def search_posts(self, query: str):
"""Search posts by title and content"""
return self.post_repo.search_posts(query)PythonFastAPI Integration
# api/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from database.connection import SessionLocal, engine
from services.user_service import UserService
from services.blog_service import BlogService
from models.base import Base
import jwt
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Blog API", version="1.0.0")
security = HTTPBearer()
# Dependency to get database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Authentication dependency
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)):
try:
payload = jwt.decode(credentials.credentials, "secret_key", algorithms=["HS256"])
user_id = payload.get("user_id")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
user_service = UserService(db)
user = user_service.get_user_by_id(user_id)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# API endpoints
@app.post("/auth/register")
def register(user_data: dict, db: Session = Depends(get_db)):
"""Register a new user"""
user_service = UserService(db)
try:
user = user_service.register_user(**user_data)
return {"message": "User registered successfully", "user_id": user.id}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/auth/login")
def login(credentials: dict, db: Session = Depends(get_db)):
"""User login"""
user_service = UserService(db)
user = user_service.authenticate_user(
credentials["email"],
credentials["password"]
)
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Generate JWT token
token = jwt.encode({"user_id": user.id}, "secret_key", algorithm="HS256")
return {"access_token": token, "token_type": "bearer"}
@app.get("/posts")
def get_posts(limit: int = 10, offset: int = 0, db: Session = Depends(get_db)):
"""Get published posts"""
blog_service = BlogService(db)
posts = blog_service.get_published_posts(limit=limit, offset=offset)
return {"posts": [post.to_dict() for post in posts]}
@app.post("/posts")
def create_post(post_data: dict,
current_user = Depends(get_current_user),
db: Session = Depends(get_db)):
"""Create a new post"""
blog_service = BlogService(db)
post = blog_service.create_post(
author_id=current_user.id,
**post_data
)
return {"message": "Post created successfully", "post_id": post.id}
@app.get("/posts/{post_slug}")
def get_post(post_slug: str, db: Session = Depends(get_db)):
"""Get post by slug"""
blog_service = BlogService(db)
post = blog_service.get_post_by_slug(post_slug)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post.to_dict()
@app.put("/posts/{post_id}/publish")
def publish_post(post_id: int,
current_user = Depends(get_current_user),
db: Session = Depends(get_db)):
"""Publish a post"""
blog_service = BlogService(db)
# Check if user owns the post
post = blog_service.get_post_by_id(post_id)
if not post or post.author_id != current_user.id:
raise HTTPException(status_code=404, detail="Post not found")
published_post = blog_service.publish_post(post_id)
return {"message": "Post published successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)PythonPerformance Monitoring
# monitoring/performance.py
import time
import logging
from functools import wraps
from sqlalchemy import event
from sqlalchemy.engine import Engine
from contextlib import contextmanager
# Performance monitoring decorator
def monitor_performance(operation_name: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
logging.info(f"{operation_name} completed in {duration:.3f}s")
return result
except Exception as e:
duration = time.time() - start_time
logging.error(f"{operation_name} failed after {duration:.3f}s: {str(e)}")
raise
return wrapper
return decorator
# Database query monitoring
class QueryProfiler:
def __init__(self):
self.queries = []
self.slow_query_threshold = 0.1 # 100ms
def log_query(self, statement, duration, parameters=None):
query_info = {
'statement': statement,
'duration': duration,
'parameters': parameters,
'timestamp': time.time()
}
self.queries.append(query_info)
if duration > self.slow_query_threshold:
logging.warning(f"Slow query detected ({duration:.3f}s): {statement[:100]}...")
def get_stats(self):
if not self.queries:
return {"total_queries": 0}
durations = [q['duration'] for q in self.queries]
return {
"total_queries": len(self.queries),
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"slow_queries": len([d for d in durations if d > self.slow_query_threshold])
}
# Global profiler instance
profiler = QueryProfiler()
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
profiler.log_query(statement, duration, parameters)
# Context manager for monitoring service operations
@contextmanager
def monitor_service_operation(operation_name: str):
start_time = time.time()
initial_query_count = len(profiler.queries)
try:
yield
duration = time.time() - start_time
query_count = len(profiler.queries) - initial_query_count
logging.info(f"{operation_name}: {duration:.3f}s, {query_count} queries")
if query_count > 10: # Flag operations with many queries
logging.warning(f"{operation_name} executed {query_count} queries - potential N+1 problem")
except Exception as e:
duration = time.time() - start_time
logging.error(f"{operation_name} failed after {duration:.3f}s: {str(e)}")
raise
# Usage in services
class MonitoredUserService(UserService):
@monitor_performance("create_user")
def register_user(self, username: str, email: str, password: str):
with monitor_service_operation("user_registration"):
return super().register_user(username, email, password)
@monitor_performance("get_user_posts")
def get_user_with_posts(self, user_id: int):
with monitor_service_operation("get_user_with_posts"):
return self.user_repo.get_by_id_with_posts(user_id)PythonThis completes the comprehensive SQLAlchemy guide covering everything from beginner concepts to expert-level patterns and real-world applications. The guide includes practical examples, best practices, and production-ready code patterns that can be applied to build robust database-driven applications.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
