Table of Contents

    1. Introduction to SQLAlchemy
    2. Installation and Setup
    3. Core vs ORM
    4. Database Connections and Engines
    5. SQLAlchemy Core
    6. SQLAlchemy ORM Fundamentals
    7. Model Definition and Relationships
    8. Sessions and Transactions
    9. Querying with ORM
    10. Advanced ORM Features
    11. Performance Optimization
    12. Migration with Alembic
    13. Testing with SQLAlchemy
    14. Best Practices and Patterns
    15. Real-world Applications

    1. Introduction to SQLAlchemy

    SQLAlchemy is Python’s most popular SQL toolkit and Object-Relational Mapping (ORM) library. It provides a full suite of well-known enterprise-level persistence patterns, designed for efficient and high-performing database access.

    What is SQLAlchemy?

    SQLAlchemy consists of two main components:

    • SQLAlchemy Core: A schema-centric model for database interaction
    • SQLAlchemy ORM: An object-relational mapper built on top of Core
    graph TD
        A[SQLAlchemy] --> B[Core]
        A --> C[ORM]
        B --> D[Schema Definition]
        B --> E[SQL Expression Language]
        B --> F[Engine & Connection Pool]
        C --> G[Declarative Models]
        C --> H[Session Management]
        C --> I[Relationship Mapping]

    Why SQLAlchemy?

    • Database Agnostic: Works with PostgreSQL, MySQL, SQLite, Oracle, and more
    • Mature and Stable: Battle-tested in production environments
    • Flexible: Choose between Core and ORM based on your needs
    • Performance: Efficient query generation and connection pooling
    • Pythonic: Clean, readable syntax that follows Python conventions

    2. Installation and Setup

    Installing SQLAlchemy

    # Basic installation
    pip install sqlalchemy
    
    # With PostgreSQL support
    pip install sqlalchemy psycopg2-binary
    
    # With MySQL support
    pip install sqlalchemy PyMySQL
    
    # With all database drivers
    pip install sqlalchemy[postgresql,mysql,oracle]
    Bash

    Project Structure

    graph LR
        A[Project Root] --> B[models/]
        A --> C[database/]
        A --> D[migrations/]
        A --> E[tests/]
        B --> F[__init__.py]
        B --> G[user.py]
        B --> H[base.py]
        C --> I[connection.py]
        C --> J[session.py]
        D --> K[alembic/]

    Basic Setup

    # database/connection.py
    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    # Database URL - SQLite for this example
    DATABASE_URL = "sqlite:///./test.db"
    
    # Create engine
    engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
    
    # Create SessionLocal class
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    # Create Base class
    Base = declarative_base()
    Python

    3. Core vs ORM

    SQLAlchemy Architecture

    graph TB
        subgraph "SQLAlchemy ORM"
            A[User Code] --> B[ORM]
            B --> C[Core]
        end
    
        subgraph "SQLAlchemy Core"
            C --> D[Schema/MetaData]
            C --> E[SQL Expression Language]
            C --> F[Engine]
        end
    
        F --> G[DBAPI]
        G --> H[Database]

    When to Use Core vs ORM

    FeatureCoreORM
    PerformanceHigherGood
    Learning CurveSteeperGentler
    SQL ControlFullAbstracted
    Object MappingManualAutomatic
    Complex QueriesEasierMore Complex
    Rapid DevelopmentSlowerFaster

    Core Example

    from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select
    
    engine = create_engine('sqlite:///example.db')
    metadata = MetaData()
    
    users = Table('users', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(50)),
        Column('email', String(120))
    )
    
    # Create tables
    metadata.create_all(engine)
    
    # Insert data
    with engine.connect() as conn:
        conn.execute(users.insert().values(name='John', email='john@example.com'))
        conn.commit()
    
    # Query data
    with engine.connect() as conn:
        result = conn.execute(select(users).where(users.c.name == 'John'))
        print(result.fetchall())
    Python

    ORM Example

    from sqlalchemy import create_engine, Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    Base = declarative_base()
    engine = create_engine('sqlite:///example.db')
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50))
        email = Column(String(120))
    
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # Insert data
    user = User(name='John', email='john@example.com')
    session.add(user)
    session.commit()
    
    # Query data
    users = session.query(User).filter(User.name == 'John').all()
    print(users)
    Python

    4. Database Connections and Engines

    Engine Creation

    graph LR
        A[create_engine] --> B[Engine]
        B --> C[Connection Pool]
        B --> D[Dialect]
        B --> E[DBAPI]
        C --> F[Connection 1]
        C --> G[Connection 2]
        C --> H[Connection N]

    Database URLs

    # SQLite
    engine = create_engine('sqlite:///path/to/database.db')
    engine = create_engine('sqlite:///:memory:')  # In-memory database
    
    # PostgreSQL
    engine = create_engine('postgresql://user:password@localhost/dbname')
    engine = create_engine('postgresql+psycopg2://user:password@localhost/dbname')
    
    # MySQL
    engine = create_engine('mysql://user:password@localhost/dbname')
    engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
    
    # Oracle
    engine = create_engine('oracle://user:password@localhost:1521/dbname')
    
    # SQL Server
    engine = create_engine('mssql+pyodbc://user:password@server/database?driver=ODBC+Driver+17+for+SQL+Server')
    Python

    Engine Configuration

    from sqlalchemy import create_engine
    from sqlalchemy.pool import StaticPool
    
    engine = create_engine(
        'sqlite:///example.db',
        # Connection pool settings
        pool_size=10,           # Number of connections to maintain
        max_overflow=20,        # Additional connections beyond pool_size
        pool_pre_ping=True,     # Validate connections before use
        pool_recycle=3600,      # Recycle connections after 1 hour
    
        # SQLite specific
        connect_args={
            "check_same_thread": False,
            "timeout": 30
        },
    
        # Echo SQL statements
        echo=True,              # Print all SQL
        echo_pool=True,         # Print connection pool events
    
        # Other options
        isolation_level="AUTOCOMMIT"
    )
    Python

    Connection Management

    # Method 1: Using engine.connect()
    with engine.connect() as conn:
        result = conn.execute(text("SELECT * FROM users"))
        print(result.fetchall())
    
    # Method 2: Using engine.begin() for transactions
    with engine.begin() as conn:
        conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
        conn.execute(text("INSERT INTO users (name) VALUES ('Bob')"))
        # Automatically commits on success, rolls back on exception
    
    # Method 3: Manual transaction control
    conn = engine.connect()
    trans = conn.begin()
    try:
        conn.execute(text("INSERT INTO users (name) VALUES ('Charlie')"))
        trans.commit()
    except:
        trans.rollback()
        raise
    finally:
        conn.close()
    Python

    5. SQLAlchemy Core

    Metadata and Tables

    from sqlalchemy import MetaData, Table, Column, Integer, String, DateTime, ForeignKey
    from datetime import datetime
    
    metadata = MetaData()
    
    # Define tables
    users_table = Table(
        'users',
        metadata,
        Column('id', Integer, primary_key=True),
        Column('username', String(50), unique=True, nullable=False),
        Column('email', String(120), unique=True, nullable=False),
        Column('created_at', DateTime, default=datetime.utcnow),
        Column('is_active', Boolean, default=True)
    )
    
    posts_table = Table(
        'posts',
        metadata,
        Column('id', Integer, primary_key=True),
        Column('title', String(200), nullable=False),
        Column('content', Text),
        Column('user_id', Integer, ForeignKey('users.id')),
        Column('created_at', DateTime, default=datetime.utcnow),
        Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    )
    
    # Create all tables
    metadata.create_all(engine)
    Python

    Core Query Building

    graph TD
        A[SQL Expression] --> B[Select]
        A --> C[Insert]
        A --> D[Update]
        A --> E[Delete]
        B --> F[Where Clauses]
        B --> G[Joins]
        B --> H[Group By]
        B --> I[Order By]
        B --> J[Limit/Offset]

    CRUD Operations with Core

    from sqlalchemy import select, insert, update, delete, and_, or_
    
    # INSERT
    insert_stmt = insert(users_table).values(
        username='john_doe',
        email='john@example.com'
    )
    
    with engine.connect() as conn:
        result = conn.execute(insert_stmt)
        user_id = result.inserted_primary_key[0]
        conn.commit()
    
    # INSERT multiple records
    insert_stmt = insert(users_table)
    data = [
        {'username': 'alice', 'email': 'alice@example.com'},
        {'username': 'bob', 'email': 'bob@example.com'},
        {'username': 'charlie', 'email': 'charlie@example.com'}
    ]
    
    with engine.connect() as conn:
        conn.execute(insert_stmt, data)
        conn.commit()
    
    # SELECT
    select_stmt = select(users_table).where(users_table.c.username == 'john_doe')
    
    with engine.connect() as conn:
        result = conn.execute(select_stmt)
        user = result.fetchone()
        print(user)
    
    # SELECT with JOIN
    join_stmt = select(
        users_table.c.username,
        posts_table.c.title
    ).select_from(
        users_table.join(posts_table, users_table.c.id == posts_table.c.user_id)
    ).where(
        users_table.c.is_active == True
    )
    
    # Complex WHERE clauses
    complex_select = select(users_table).where(
        and_(
            users_table.c.is_active == True,
            or_(
                users_table.c.username.like('%john%'),
                users_table.c.email.like('%@gmail.com')
            )
        )
    )
    
    # UPDATE
    update_stmt = update(users_table).where(
        users_table.c.username == 'john_doe'
    ).values(
        email='john.doe@example.com'
    )
    
    with engine.connect() as conn:
        result = conn.execute(update_stmt)
        print(f"Updated {result.rowcount} rows")
        conn.commit()
    
    # DELETE
    delete_stmt = delete(users_table).where(
        users_table.c.is_active == False
    )
    
    with engine.connect() as conn:
        result = conn.execute(delete_stmt)
        print(f"Deleted {result.rowcount} rows")
        conn.commit()
    Python

    Advanced Core Features

    from sqlalchemy import func, case, cast, String
    
    # Aggregate functions
    count_stmt = select(func.count(users_table.c.id))
    avg_stmt = select(func.avg(posts_table.c.id))
    
    # CASE expressions
    case_stmt = select(
        users_table.c.username,
        case(
            (users_table.c.is_active == True, 'Active'),
            else_='Inactive'
        ).label('status')
    )
    
    # Subqueries
    subquery = select(func.count(posts_table.c.id)).where(
        posts_table.c.user_id == users_table.c.id
    ).scalar_subquery()
    
    select_with_subquery = select(
        users_table.c.username,
        subquery.label('post_count')
    )
    
    # Window functions
    from sqlalchemy import text
    
    window_stmt = select(
        users_table.c.username,
        func.row_number().over(order_by=users_table.c.created_at).label('row_num')
    )
    
    # Raw SQL with text()
    raw_sql = text("""
        SELECT u.username, COUNT(p.id) as post_count
        FROM users u
        LEFT JOIN posts p ON u.id = p.user_id
        GROUP BY u.username
        HAVING COUNT(p.id) > :min_posts
    """)
    
    with engine.connect() as conn:
        result = conn.execute(raw_sql, {"min_posts": 5})
        print(result.fetchall())
    Python

    6. SQLAlchemy ORM Fundamentals

    Declarative Base

    graph TD
        A[declarative_base] --> B[Base Class]
        B --> C[User Model]
        B --> D[Post Model]
        B --> E[Category Model]
        C --> F[__tablename__]
        C --> G[Columns]
        C --> H[Relationships]
        C --> I[Methods]

    Basic Model Definition

    from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import relationship
    from datetime import datetime
    
    Base = declarative_base()
    
    class User(Base):
        __tablename__ = 'users'
    
        # Primary key
        id = Column(Integer, primary_key=True, index=True)
    
        # Basic columns
        username = Column(String(50), unique=True, nullable=False, index=True)
        email = Column(String(120), unique=True, nullable=False)
        password_hash = Column(String(255), nullable=False)
    
        # Timestamps
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
        # Boolean fields
        is_active = Column(Boolean, default=True)
        is_admin = Column(Boolean, default=False)
    
        # Relationships (will be defined later)
        posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
    
        def __repr__(self):
            return f"<User(id={self.id}, username='{self.username}')>"
    
        def __str__(self):
            return self.username
    
    class Post(Base):
        __tablename__ = 'posts'
    
        id = Column(Integer, primary_key=True, index=True)
        title = Column(String(200), nullable=False)
        content = Column(Text)
    
        # Foreign key
        user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    
        # Timestamps
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
        # Relationships
        author = relationship("User", back_populates="posts")
    
        def __repr__(self):
            return f"<Post(id={self.id}, title='{self.title[:30]}...')>"
    Python

    Column Types and Constraints

    from sqlalchemy import (
        Integer, String, Text, DateTime, Date, Time, Boolean,
        Float, Numeric, DECIMAL, BigInteger, SmallInteger,
        JSON, ARRAY, Enum, LargeBinary, Unicode, UnicodeText
    )
    from sqlalchemy.dialects.postgresql import UUID, JSONB
    from enum import Enum as PyEnum
    import uuid
    
    class UserStatus(PyEnum):
        ACTIVE = "active"
        INACTIVE = "inactive"
        BANNED = "banned"
    
    class User(Base):
        __tablename__ = 'users'
    
        # Numeric types
        id = Column(Integer, primary_key=True)
        age = Column(SmallInteger)
        salary = Column(Numeric(10, 2))  # 10 digits, 2 decimal places
        rating = Column(Float)
    
        # String types
        username = Column(String(50), nullable=False)  # VARCHAR(50)
        bio = Column(Text)  # TEXT
        profile_data = Column(JSON)  # JSON
    
        # Date/Time types
        birth_date = Column(Date)
        last_login = Column(DateTime)
        daily_login_time = Column(Time)
    
        # Special types
        status = Column(Enum(UserStatus), default=UserStatus.ACTIVE)
        profile_picture = Column(LargeBinary)  # BLOB
        external_id = Column(UUID(as_uuid=True), default=uuid.uuid4)
    
        # Constraints
        email = Column(
            String(120),
            unique=True,
            nullable=False,
            index=True
        )
    
        # Check constraints
        __table_args__ = (
            CheckConstraint('age >= 0', name='check_age_positive'),
            CheckConstraint('salary >= 0', name='check_salary_positive'),
            Index('idx_username_email', 'username', 'email'),
        )
    Python

    Model Methods and Properties

    from sqlalchemy.ext.hybrid import hybrid_property
    from sqlalchemy import func
    import hashlib
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        first_name = Column(String(50))
        last_name = Column(String(50))
        email = Column(String(120), unique=True)
        _password_hash = Column('password_hash', String(255))
    
        @hybrid_property
        def full_name(self):
            """Hybrid property for full name"""
            return f"{self.first_name} {self.last_name}"
    
        @full_name.expression
        def full_name(cls):
            """SQL expression for full_name"""
            return func.concat(cls.first_name, ' ', cls.last_name)
    
        @property
        def password(self):
            """Prevent password from being read"""
            raise AttributeError('Password is not readable')
    
        @password.setter
        def password(self, password):
            """Hash password when setting"""
            self._password_hash = hashlib.sha256(password.encode()).hexdigest()
    
        def verify_password(self, password):
            """Verify password against hash"""
            return self._password_hash == hashlib.sha256(password.encode()).hexdigest()
    
        @classmethod
        def get_by_email(cls, session, email):
            """Class method to get user by email"""
            return session.query(cls).filter(cls.email == email).first()
    
        def to_dict(self):
            """Convert model to dictionary"""
            return {
                'id': self.id,
                'first_name': self.first_name,
                'last_name': self.last_name,
                'full_name': self.full_name,
                'email': self.email
            }
    
        @staticmethod
        def hash_password(password):
            """Static method to hash password"""
            return hashlib.sha256(password.encode()).hexdigest()
    Python

    7. Model Definition and Relationships

    Relationship Types

    graph TD
        A[SQLAlchemy Relationships] --> B[One-to-Many]
        A --> C[Many-to-One]
        A --> D[One-to-One]
        A --> E[Many-to-Many]
    
        B --> F[User → Posts]
        C --> G[Post → User]
        D --> H[User → Profile]
        E --> I[Posts ↔ Tags]

    One-to-Many Relationships

    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True)
        email = Column(String(120), unique=True)
    
        # One-to-Many: One user can have many posts
        posts = relationship(
            "Post",
            back_populates="author",
            cascade="all, delete-orphan",  # Delete posts when user is deleted
            lazy="dynamic"  # Return Query object instead of loading all posts
        )
    
        # One-to-Many: One user can have many orders
        orders = relationship("Order", back_populates="customer")
    
    class Post(Base):
        __tablename__ = 'posts'
    
        id = Column(Integer, primary_key=True)
        title = Column(String(200))
        content = Column(Text)
        user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    
        # Many-to-One: Many posts belong to one user
        author = relationship("User", back_populates="posts")
    Python

    One-to-One Relationships

    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True)
    
        # One-to-One: One user has one profile
        profile = relationship(
            "UserProfile",
            back_populates="user",
            uselist=False,  # This makes it one-to-one
            cascade="all, delete-orphan"
        )
    
    class UserProfile(Base):
        __tablename__ = 'user_profiles'
    
        id = Column(Integer, primary_key=True)
        user_id = Column(Integer, ForeignKey('users.id'), unique=True)  # unique=True enforces one-to-one
        bio = Column(Text)
        avatar_url = Column(String(255))
        birth_date = Column(Date)
    
        # One-to-One: One profile belongs to one user
        user = relationship("User", back_populates="profile")
    Python

    Many-to-Many Relationships

    # Association table for many-to-many relationship
    post_tags = Table(
        'post_tags',
        Base.metadata,
        Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
        Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
    )
    
    class Post(Base):
        __tablename__ = 'posts'
    
        id = Column(Integer, primary_key=True)
        title = Column(String(200))
        content = Column(Text)
    
        # Many-to-Many: Posts can have many tags, tags can belong to many posts
        tags = relationship(
            "Tag",
            secondary=post_tags,
            back_populates="posts",
            lazy="dynamic"
        )
    
    class Tag(Base):
        __tablename__ = 'tags'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50), unique=True)
        description = Column(Text)
    
        # Many-to-Many: Tags can belong to many posts
        posts = relationship(
            "Post",
            secondary=post_tags,
            back_populates="tags",
            lazy="dynamic"
        )
    Python

    Association Objects (Many-to-Many with Extra Data)

    class UserRole(Base):
        __tablename__ = 'user_roles'
    
        user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
        role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True)
        assigned_at = Column(DateTime, default=datetime.utcnow)
        assigned_by = Column(Integer, ForeignKey('users.id'))
        is_active = Column(Boolean, default=True)
    
        # Relationships
        user = relationship("User", foreign_keys=[user_id])
        role = relationship("Role")
        assigner = relationship("User", foreign_keys=[assigned_by])
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True)
    
        # Many-to-Many with association object
        role_associations = relationship("UserRole", foreign_keys="UserRole.user_id")
    
        @property
        def roles(self):
            return [assoc.role for assoc in self.role_associations if assoc.is_active]
    
    class Role(Base):
        __tablename__ = 'roles'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50), unique=True)
        description = Column(Text)
    
        user_associations = relationship("UserRole")
    Python

    Self-Referential Relationships

    class Category(Base):
        __tablename__ = 'categories'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(100), nullable=False)
        parent_id = Column(Integer, ForeignKey('categories.id'))
    
        # Self-referential relationship
        children = relationship(
            "Category",
            back_populates="parent",
            cascade="all, delete-orphan"
        )
        parent = relationship("Category", back_populates="children", remote_side=[id])
    
        def __repr__(self):
            return f"<Category(id={self.id}, name='{self.name}')>"
    
    # Usage example
    with session:
        # Create parent category
        electronics = Category(name="Electronics")
        session.add(electronics)
        session.flush()  # Get the ID
    
        # Create child categories
        phones = Category(name="Phones", parent_id=electronics.id)
        laptops = Category(name="Laptops", parent_id=electronics.id)
    
        session.add_all([phones, laptops])
        session.commit()
    Python

    Relationship Loading Strategies

    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50))
    
        # Different loading strategies
        posts_lazy = relationship("Post", lazy="select")        # Default: lazy loading
        posts_eager = relationship("Post", lazy="joined")       # JOIN eagerly
        posts_subquery = relationship("Post", lazy="subquery")  # Subquery eagerly
        posts_dynamic = relationship("Post", lazy="dynamic")    # Return Query object
        posts_selectin = relationship("Post", lazy="selectin")  # SELECT IN eagerly
    
    # Example usage of different loading strategies
    from sqlalchemy.orm import selectinload, joinedload, subqueryload
    
    # Eager loading with query options
    users = session.query(User).options(
        joinedload(User.posts),
        selectinload(User.profile)
    ).all()
    
    # Lazy loading (default)
    user = session.query(User).first()
    posts = user.posts  # This triggers a separate query
    
    # Dynamic relationship
    user = session.query(User).first()
    recent_posts = user.posts.filter(Post.created_at > datetime(2023, 1, 1)).all()
    Python

    8. Sessions and Transactions

    Session Lifecycle

    graph TD
        A[Session Created] --> B[Objects Added/Modified]
        B --> C[Session.flush]
        C --> D[SQL Sent to DB]
        D --> E[Transaction Pending]
        E --> F[Session.commit]
        E --> G[Session.rollback]
        F --> H[Transaction Committed]
        G --> I[Changes Discarded]
        H --> J[Session Cleared]
        I --> J
        J --> K[Session Closed]

    Session Configuration

    from sqlalchemy.orm import sessionmaker, scoped_session
    
    # Basic session configuration
    SessionLocal = sessionmaker(
        bind=engine,
        autocommit=False,    # Manual transaction control
        autoflush=False,     # Manual flushing
        expire_on_commit=True  # Expire objects after commit
    )
    
    # Scoped session for thread-safety
    ScopedSession = scoped_session(SessionLocal)
    
    # Session with custom configuration
    CustomSession = sessionmaker(
        bind=engine,
        autocommit=False,
        autoflush=True,      # Auto-flush before queries
        expire_on_commit=False,  # Keep objects accessible after commit
        class_=Session       # Custom session class
    )
    Python

    Session Usage Patterns

    # Pattern 1: Context manager (Recommended)
    def create_user(username: str, email: str):
        with SessionLocal() as session:
            user = User(username=username, email=email)
            session.add(user)
            session.commit()
            session.refresh(user)  # Refresh to get database-generated values
            return user
    
    # Pattern 2: Try-except-finally
    def update_user(user_id: int, **kwargs):
        session = SessionLocal()
        try:
            user = session.query(User).filter(User.id == user_id).first()
            if user:
                for key, value in kwargs.items():
                    setattr(user, key, value)
                session.commit()
                return user
            return None
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
    
    # Pattern 3: Using scoped session
    def get_user_by_id(user_id: int):
        user = ScopedSession.query(User).filter(User.id == user_id).first()
        ScopedSession.remove()  # Clean up the session
        return user
    Python

    Transaction Management

    # Automatic transaction management
    with SessionLocal() as session:
        # Transaction starts automatically
        user = User(username="john", email="john@example.com")
        session.add(user)
    
        post = Post(title="My First Post", content="Hello World!", author=user)
        session.add(post)
    
        # Commit all changes
        session.commit()  # Transaction committed
    
    # Manual transaction management
    session = SessionLocal()
    transaction = session.begin()
    try:
        # Multiple operations
        user = User(username="alice", email="alice@example.com")
        session.add(user)
        session.flush()  # Send to DB but don't commit
    
        post = Post(title="Alice's Post", content="Content", user_id=user.id)
        session.add(post)
    
        # Commit transaction
        transaction.commit()
    except Exception as e:
        # Rollback on error
        transaction.rollback()
        print(f"Error: {e}")
    finally:
        session.close()
    
    # Nested transactions (Savepoints)
    with SessionLocal() as session:
        user = User(username="bob", email="bob@example.com")
        session.add(user)
    
        # Create savepoint
        savepoint = session.begin_nested()
        try:
            # This might fail
            duplicate_user = User(username="bob", email="duplicate@example.com")
            session.add(duplicate_user)
            session.flush()  # This will raise an error due to unique constraint
            savepoint.commit()
        except Exception:
            savepoint.rollback()  # Rollback to savepoint
            print("Duplicate username, continuing without second user")
    
        # Main transaction continues
        session.commit()
    Python

    Session State Management

    from sqlalchemy.orm import object_session
    from sqlalchemy import inspect
    
    # Object states
    def demonstrate_object_states():
        session = SessionLocal()
    
        # Transient: Object not in session
        user = User(username="temp", email="temp@example.com")
        print(f"Transient: {user in session}")  # False
    
        # Pending: Object added to session but not flushed
        session.add(user)
        print(f"Pending: {user in session}")  # True
        print(f"User ID before flush: {user.id}")  # None
    
        # Persistent: Object flushed to database
        session.flush()
        print(f"Persistent: {user in session}")  # True
        print(f"User ID after flush: {user.id}")  # Actual ID
    
        # Detached: Object removed from session
        session.expunge(user)
        print(f"Detached: {user in session}")  # False
    
        # Deleted: Object marked for deletion
        session.add(user)  # Re-attach
        session.delete(user)
        print(f"Deleted: {user in session}")  # True (still in session but marked for deletion)
    
        session.commit()
        session.close()
    
    # Session utilities
    def session_utilities():
        session = SessionLocal()
    
        user = User(username="test", email="test@example.com")
        session.add(user)
        session.flush()
    
        # Check object state
        state = inspect(user)
        print(f"Persistent: {state.persistent}")
        print(f"Pending: {state.pending}")
        print(f"Transient: {state.transient}")
        print(f"Detached: {state.detached}")
    
        # Get session from object
        obj_session = object_session(user)
        print(f"Same session: {obj_session is session}")
    
        # Session info
        print(f"New objects: {session.new}")
        print(f"Dirty objects: {session.dirty}")
        print(f"Deleted objects: {session.deleted}")
    
        # Merge detached object
        session.expunge(user)
        merged_user = session.merge(user)
        print(f"Merged user: {merged_user}")
    
        session.close()
    Python

    Session Events

    from sqlalchemy import event
    
    # Session events
    @event.listens_for(SessionLocal, 'before_commit')
    def before_commit_handler(session):
        print("About to commit session")
        for obj in session.new:
            print(f"New object: {obj}")
        for obj in session.dirty:
            print(f"Modified object: {obj}")
        for obj in session.deleted:
            print(f"Deleted object: {obj}")
    
    @event.listens_for(SessionLocal, 'after_commit')
    def after_commit_handler(session):
        print("Session committed successfully")
    
    @event.listens_for(SessionLocal, 'after_rollback')
    def after_rollback_handler(session):
        print("Session rolled back")
    
    # Instance events
    @event.listens_for(User, 'before_insert')
    def before_insert_user(mapper, connection, target):
        print(f"About to insert user: {target.username}")
        target.created_at = datetime.utcnow()
    
    @event.listens_for(User, 'after_insert')
    def after_insert_user(mapper, connection, target):
        print(f"Inserted user with ID: {target.id}")
    Python

    9. Querying with ORM

    Basic Querying

    graph TD
        A[Query Object] --> B[Filtering]
        A --> C[Ordering]
        A --> D[Limiting]
        A --> E[Grouping]
        A --> F[Joining]
        B --> G[filter]
        B --> H[filter_by]
        B --> I[where]
        C --> J[order_by]
        D --> K[limit]
        D --> L[offset]
        E --> M[group_by]
        E --> N[having]
        F --> O[join]
        F --> P[outerjoin]

    Query Construction

    # Basic queries
    def basic_queries():
        session = SessionLocal()
    
        # Get all users
        all_users = session.query(User).all()
    
        # Get first user
        first_user = session.query(User).first()
    
        # Get user by primary key
        user = session.query(User).get(1)  # Deprecated in 2.0
        user = session.get(User, 1)  # New way
    
        # Count
        user_count = session.query(User).count()
    
        # Check existence
        exists = session.query(User).filter(User.username == 'john').exists()
        has_user = session.query(exists).scalar()
    
        session.close()
    
    # Filtering
    def filtering_examples():
        session = SessionLocal()
    
        # filter() - uses column expressions
        users = session.query(User).filter(User.username == 'john').all()
        users = session.query(User).filter(User.age > 18).all()
        users = session.query(User).filter(User.email.like('%@gmail.com')).all()
    
        # filter_by() - uses keyword arguments
        users = session.query(User).filter_by(username='john').all()
        users = session.query(User).filter_by(is_active=True).all()
    
        # Multiple filters
        users = session.query(User).filter(
            User.age > 18,
            User.is_active == True,
            User.email.like('%@gmail.com')
        ).all()
    
        # OR conditions
        from sqlalchemy import or_
        users = session.query(User).filter(
            or_(
                User.username == 'john',
                User.username == 'jane'
            )
        ).all()
    
        # AND conditions (explicit)
        from sqlalchemy import and_
        users = session.query(User).filter(
            and_(
                User.age > 18,
                User.is_active == True
            )
        ).all()
    
        # IN clause
        users = session.query(User).filter(
            User.username.in_(['john', 'jane', 'bob'])
        ).all()
    
        # NOT conditions
        from sqlalchemy import not_
        users = session.query(User).filter(
            not_(User.username == 'admin')
        ).all()
    
        # IS NULL / IS NOT NULL
        users = session.query(User).filter(User.last_login.is_(None)).all()
        users = session.query(User).filter(User.last_login.isnot(None)).all()
    
        session.close()
    Python

    Advanced Filtering

    from sqlalchemy import func, case, extract, cast, String
    
    def advanced_filtering():
        session = SessionLocal()
    
        # String operations
        users = session.query(User).filter(
            User.username.startswith('j')
        ).all()
    
        users = session.query(User).filter(
            User.username.endswith('son')
        ).all()
    
        users = session.query(User).filter(
            User.username.contains('oh')
        ).all()
    
        # Case-insensitive operations
        users = session.query(User).filter(
            func.lower(User.username) == 'john'
        ).all()
    
        # Date operations
        from datetime import datetime, timedelta
    
        recent_users = session.query(User).filter(
            User.created_at > datetime.utcnow() - timedelta(days=30)
        ).all()
    
        # Extract date parts
        users_by_year = session.query(User).filter(
            extract('year', User.created_at) == 2023
        ).all()
    
        # Regex (PostgreSQL)
        users = session.query(User).filter(
            User.email.op('~')('.*@gmail\.com$')
        ).all()
    
        # CASE expressions
        user_status = session.query(
            User.username,
            case(
                (User.is_active == True, 'Active'),
                (User.is_active == False, 'Inactive'),
                else_='Unknown'
            ).label('status')
        ).all()
    
        # Subqueries
        subquery = session.query(func.avg(User.age)).subquery()
        above_avg_users = session.query(User).filter(
            User.age > subquery.c.avg_1
        ).all()
    
        session.close()
    Python

    Joins and Relationships

    def join_examples():
        session = SessionLocal()
    
        # Inner join using relationship
        users_with_posts = session.query(User).join(User.posts).all()
    
        # Explicit inner join
        users_with_posts = session.query(User).join(Post, User.id == Post.user_id).all()
    
        # Left outer join
        all_users_with_posts = session.query(User).outerjoin(User.posts).all()
    
        # Select specific columns from joined tables
        user_post_data = session.query(
            User.username,
            Post.title,
            Post.created_at
        ).join(Post).all()
    
        # Multiple joins
        user_post_tag_data = session.query(
            User.username,
            Post.title,
            Tag.name
        ).join(Post).join(Post.tags).all()
    
        # Self join (for hierarchical data)
        parent_child = session.query(
            Category.name.label('parent'),
            Category2.name.label('child')
        ).join(
            Category2,
            Category.id == Category2.parent_id,
            aliased=True
        ).all()
    
        # Subquery joins
        from sqlalchemy.orm import aliased
    
        post_count_subq = session.query(
            Post.user_id,
            func.count(Post.id).label('post_count')
        ).group_by(Post.user_id).subquery()
    
        users_with_post_count = session.query(
            User,
            post_count_subq.c.post_count
        ).outerjoin(
            post_count_subq,
            User.id == post_count_subq.c.user_id
        ).all()
    
        session.close()
    Python

    Aggregation and Grouping

    def aggregation_examples():
        session = SessionLocal()
    
        # Count by group
        posts_per_user = session.query(
            User.username,
            func.count(Post.id).label('post_count')
        ).join(Post).group_by(User.username).all()
    
        # Having clause
        prolific_users = session.query(
            User.username,
            func.count(Post.id).label('post_count')
        ).join(Post).group_by(User.username).having(
            func.count(Post.id) > 5
        ).all()
    
        # Various aggregations
        user_stats = session.query(
            func.count(User.id).label('total_users'),
            func.avg(User.age).label('avg_age'),
            func.min(User.created_at).label('first_user'),
            func.max(User.created_at).label('latest_user')
        ).first()
    
        # Group by multiple columns
        monthly_posts = session.query(
            extract('year', Post.created_at).label('year'),
            extract('month', Post.created_at).label('month'),
            func.count(Post.id).label('post_count')
        ).group_by(
            extract('year', Post.created_at),
            extract('month', Post.created_at)
        ).order_by('year', 'month').all()
    
        # Window functions
        ranked_users = session.query(
            User.username,
            func.count(Post.id).label('post_count'),
            func.rank().over(
                order_by=func.count(Post.id).desc()
            ).label('rank')
        ).join(Post).group_by(User.username).all()
    
        session.close()
    Python

    Query Optimization

    def query_optimization():
        session = SessionLocal()
    
        # Eager loading with joinedload
        from sqlalchemy.orm import joinedload, selectinload, subqueryload
    
        # Load users with their posts in one query
        users = session.query(User).options(
            joinedload(User.posts)
        ).all()
    
        # Load users with posts and tags
        users = session.query(User).options(
            joinedload(User.posts).joinedload(Post.tags)
        ).all()
    
        # Select in loading (better for one-to-many)
        users = session.query(User).options(
            selectinload(User.posts)
        ).all()
    
        # Load only specific columns
        user_names = session.query(User.username).all()
    
        # Load entities partially
        from sqlalchemy.orm import load_only
        users = session.query(User).options(
            load_only(User.username, User.email)
        ).all()
    
        # Defer loading of large columns
        from sqlalchemy.orm import defer
        users = session.query(User).options(
            defer(User.profile_picture)
        ).all()
    
        # Batch loading
        users = session.query(User).filter(
            User.id.in_([1, 2, 3, 4, 5])
        ).all()
    
        # Raw SQL for complex queries
        result = session.execute(text("""
            SELECT u.username, COUNT(p.id) as post_count
            FROM users u
            LEFT JOIN posts p ON u.id = p.user_id
            GROUP BY u.username
            ORDER BY post_count DESC
            LIMIT 10
        """))
    
        top_users = result.fetchall()
    
        session.close()
    Python

    Query Chaining and Reusability

    class UserRepository:
        def __init__(self, session):
            self.session = session
    
        def base_query(self):
            """Base query for users"""
            return self.session.query(User)
    
        def active_users(self):
            """Get only active users"""
            return self.base_query().filter(User.is_active == True)
    
        def by_email_domain(self, domain):
            """Filter by email domain"""
            return self.active_users().filter(
                User.email.like(f'%@{domain}')
            )
    
        def with_posts(self):
            """Include posts in query"""
            return self.active_users().options(
                joinedload(User.posts)
            )
    
        def recent_users(self, days=30):
            """Users created in the last N days"""
            cutoff = datetime.utcnow() - timedelta(days=days)
            return self.active_users().filter(
                User.created_at > cutoff
            )
    
        def paginate(self, query, page=1, per_page=20):
            """Add pagination to query"""
            return query.offset(
                (page - 1) * per_page
            ).limit(per_page)
    
    # Usage
    def repository_usage():
        session = SessionLocal()
        repo = UserRepository(session)
    
        # Chain methods
        gmail_users = repo.by_email_domain('gmail.com').all()
    
        recent_gmail_users = repo.by_email_domain('gmail.com').filter(
            User.created_at > datetime(2023, 1, 1)
        ).all()
    
        # Pagination
        page_1_users = repo.paginate(repo.active_users(), page=1, per_page=10).all()
    
        session.close()
    Python

    10. Advanced ORM Features

    Inheritance Mapping

    graph TD
        A[Table Inheritance] --> B[Single Table]
        A --> C[Joined Table]
        A --> D[Concrete Table]
    
        B --> E[All classes in one table]
        C --> F[Base table + child tables]
        D --> G[Separate table per class]

    Single Table Inheritance

    from sqlalchemy import Column, Integer, String, ForeignKey
    from sqlalchemy.ext.declarative import declarative_base
    
    Base = declarative_base()
    
    class Person(Base):
        __tablename__ = 'people'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50))
        type = Column(String(20))
    
        __mapper_args__ = {
            'polymorphic_identity': 'person',
            'polymorphic_on': type
        }
    
    class Employee(Person):
        employee_id = Column(String(20))
        department = Column(String(50))
    
        __mapper_args__ = {
            'polymorphic_identity': 'employee'
        }
    
    class Customer(Person):
        customer_code = Column(String(20))
        credit_limit = Column(Integer)
    
        __mapper_args__ = {
            'polymorphic_identity': 'customer'
        }
    
    # Usage
    session = SessionLocal()
    
    # Create instances
    emp = Employee(name='John Doe', employee_id='E001', department='IT')
    cust = Customer(name='Jane Smith', customer_code='C001', credit_limit=5000)
    
    session.add_all([emp, cust])
    session.commit()
    
    # Query polymorphically
    all_people = session.query(Person).all()  # Returns both employees and customers
    employees_only = session.query(Employee).all()  # Returns only employees
    Python

    Joined Table Inheritance

    class Person(Base):
        __tablename__ = 'people'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50))
        email = Column(String(120))
        type = Column(String(20))
    
        __mapper_args__ = {
            'polymorphic_identity': 'person',
            'polymorphic_on': type,
            'with_polymorphic': '*'
        }
    
    class Employee(Person):
        __tablename__ = 'employees'
    
        id = Column(Integer, ForeignKey('people.id'), primary_key=True)
        employee_id = Column(String(20))
        department = Column(String(50))
        salary = Column(Integer)
    
        __mapper_args__ = {
            'polymorphic_identity': 'employee'
        }
    
    class Customer(Person):
        __tablename__ = 'customers'
    
        id = Column(Integer, ForeignKey('people.id'), primary_key=True)
        customer_code = Column(String(20))
        credit_limit = Column(Integer)
    
        __mapper_args__ = {
            'polymorphic_identity': 'customer'
        }
    
    # This creates three tables: people, employees, customers
    # Queries automatically join the tables when needed
    Python

    Hybrid Properties and Methods

    from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
    from sqlalchemy import func, case
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        first_name = Column(String(50))
        last_name = Column(String(50))
        email = Column(String(120))
        birth_date = Column(Date)
    
        @hybrid_property
        def full_name(self):
            """Instance-level full name"""
            return f"{self.first_name} {self.last_name}"
    
        @full_name.expression
        def full_name(cls):
            """Class-level expression for SQL"""
            return func.concat(cls.first_name, ' ', cls.last_name)
    
        @hybrid_property
        def age(self):
            """Calculate age from birth date"""
            if self.birth_date:
                return (datetime.now().date() - self.birth_date).days // 365
            return None
    
        @age.expression
        def age(cls):
            """SQL expression for age calculation"""
            return func.floor(
                func.extract('days', func.now() - cls.birth_date) / 365
            )
    
        @hybrid_method
        def is_older_than(self, age_threshold):
            """Check if user is older than threshold"""
            return self.age > age_threshold
    
        @is_older_than.expression
        def is_older_than(cls, age_threshold):
            """SQL expression for age comparison"""
            return cls.age > age_threshold
    
    # Usage
    session = SessionLocal()
    
    # Instance usage
    user = session.query(User).first()
    print(user.full_name)  # "John Doe"
    print(user.age)  # 25
    
    # Query usage (generates SQL)
    adults = session.query(User).filter(User.age >= 18).all()
    seniors = session.query(User).filter(User.is_older_than(65)).all()
    
    # Can use hybrid properties in order_by, etc.
    ordered_users = session.query(User).order_by(User.full_name).all()
    Python

    Association Proxies

    from sqlalchemy.ext.associationproxy import association_proxy
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50))
    
        # Many-to-many with User-Role association
        user_roles = relationship("UserRole", back_populates="user")
    
        # Association proxy to access roles directly
        roles = association_proxy(
            'user_roles', 'role',
            creator=lambda role: UserRole(role=role)
        )
    
        # Association proxy for role names
        role_names = association_proxy('user_roles', 'role_name')
    
    class Role(Base):
        __tablename__ = 'roles'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50))
    
        user_roles = relationship("UserRole", back_populates="role")
        users = association_proxy('user_roles', 'user')
    
    class UserRole(Base):
        __tablename__ = 'user_roles'
    
        user_id = Column(Integer, ForeignKey('users.id'), primary_key=True)
        role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True)
        assigned_at = Column(DateTime, default=datetime.utcnow)
    
        user = relationship("User", back_populates="user_roles")
        role = relationship("Role", back_populates="user_roles")
    
        @property
        def role_name(self):
            return self.role.name if self.role else None
    
    # Usage
    session = SessionLocal()
    
    user = User(username='john')
    admin_role = Role(name='admin')
    user_role = Role(name='user')
    
    # Direct assignment through association proxy
    user.roles.append(admin_role)
    user.roles.append(user_role)
    
    session.add_all([user, admin_role, user_role])
    session.commit()
    
    # Access role names
    print(user.role_names)  # ['admin', 'user']
    Python

    Validators and Events

    from sqlalchemy import event
    from sqlalchemy.orm import validates
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50))
        email = Column(String(120))
        age = Column(Integer)
    
        @validates('email')
        def validate_email(self, key, address):
            """Validate email format"""
            import re
            if not re.match(r'^[^@]+@[^@]+\.[^@]+$', address):
                raise ValueError('Invalid email address')
            return address.lower()
    
        @validates('age')
        def validate_age(self, key, age):
            """Validate age range"""
            if age < 0 or age > 150:
                raise ValueError('Age must be between 0 and 150')
            return age
    
        @validates('username')
        def validate_username(self, key, username):
            """Validate username format"""
            if len(username) < 3:
                raise ValueError('Username must be at least 3 characters')
            if not username.isalnum():
                raise ValueError('Username must be alphanumeric')
            return username
    
    # Model events
    @event.listens_for(User, 'before_insert')
    def set_created_timestamp(mapper, connection, target):
        """Set created timestamp before insert"""
        if not hasattr(target, 'created_at'):
            target.created_at = datetime.utcnow()
    
    @event.listens_for(User, 'before_update')
    def set_updated_timestamp(mapper, connection, target):
        """Set updated timestamp before update"""
        target.updated_at = datetime.utcnow()
    
    @event.listens_for(User.email, 'set')
    def email_changed(target, value, oldvalue, initiator):
        """Log email changes"""
        if oldvalue != value and oldvalue is not None:
            print(f"Email changed from {oldvalue} to {value}")
    Python

    11. Performance Optimization

    Query Performance

    graph TD
        A[Query Performance] --> B[Eager Loading]
        A --> C[Query Optimization]
        A --> D[Connection Pooling]
        A --> E[Caching]
    
        B --> F[joinedload]
        B --> G[selectinload]
        B --> H[subqueryload]
    
        C --> I[Index Usage]
        C --> J[Batch Operations]
        C --> K[Raw SQL]
    
        E --> L[Query Cache]
        E --> M[Session Cache]

    Eager Loading Strategies

    from sqlalchemy.orm import joinedload, selectinload, subqueryload, contains_eager
    
    # Different loading strategies comparison
    def loading_strategies():
        session = SessionLocal()
    
        # Lazy loading (default) - N+1 problem
        users = session.query(User).all()
        for user in users:
            print(f"{user.username}: {len(user.posts)}")  # Each access triggers a query
    
        # Joined loading - single query with LEFT JOIN
        users = session.query(User).options(
            joinedload(User.posts)
        ).all()
    
        # Select-in loading - two queries (better for one-to-many)
        users = session.query(User).options(
            selectinload(User.posts)
        ).all()
    
        # Subquery loading - nested query
        users = session.query(User).options(
            subqueryload(User.posts)
        ).all()
    
        # Contains eager - for explicit joins
        users = session.query(User).join(User.posts).options(
            contains_eager(User.posts)
        ).all()
    
        # Mixed loading strategies
        users = session.query(User).options(
            joinedload(User.profile),      # One-to-one: use joined
            selectinload(User.posts),      # One-to-many: use selectin
            selectinload(User.posts).joinedload(Post.tags)  # Chain loading
        ).all()
    
        session.close()
    
    # Optimize specific queries
    def optimized_queries():
        session = SessionLocal()
    
        # Load only needed columns
        from sqlalchemy.orm import load_only
        users = session.query(User).options(
            load_only(User.id, User.username, User.email)
        ).all()
    
        # Defer large columns
        from sqlalchemy.orm import defer
        users = session.query(User).options(
            defer(User.profile_picture),
            defer(User.large_text_field)
        ).all()
    
        # Batch queries with IN clause
        user_ids = [1, 2, 3, 4, 5]
        users = session.query(User).filter(User.id.in_(user_ids)).all()
    
        # Use exists() for checking existence
        has_active_user = session.query(
            session.query(User).filter(User.is_active == True).exists()
        ).scalar()
    
        session.close()
    Python

    Bulk Operations

    def bulk_operations():
        session = SessionLocal()
    
        # Bulk insert (fastest for many records)
        users_data = [
            {'username': f'user_{i}', 'email': f'user_{i}@example.com'} 
            for i in range(1000)
        ]
        session.bulk_insert_mappings(User, users_data)
    
        # Bulk update
        session.bulk_update_mappings(User, [
            {'id': 1, 'username': 'updated_user_1'},
            {'id': 2, 'username': 'updated_user_2'},
        ])
    
        # Using Core for bulk operations (even faster)
        from sqlalchemy import insert, update
    
        # Bulk insert with Core
        insert_stmt = insert(User.__table__)
        session.execute(insert_stmt, users_data)
    
        # Bulk update with Core
        update_stmt = update(User.__table__).where(
            User.id.in_([1, 2, 3])
        ).values(is_active=False)
        session.execute(update_stmt)
    
        session.commit()
        session.close()
    Python

    Connection Pool Optimization

    from sqlalchemy import create_engine
    from sqlalchemy.pool import QueuePool, NullPool
    
    # Optimized engine configuration
    def create_optimized_engine(database_url):
        return create_engine(
            database_url,
            # Pool settings
            poolclass=QueuePool,
            pool_size=20,                 # Number of permanent connections
            max_overflow=30,              # Additional connections
            pool_pre_ping=True,           # Test connections before use
            pool_recycle=3600,            # Recycle connections after 1 hour
    
            # Connection settings
            connect_args={
                "application_name": "my_app",
                "options": "-c default_transaction_isolation=read_committed"
            },
    
            # Performance settings
            echo=False,                   # Disable SQL logging in production
            future=True                   # Use SQLAlchemy 2.0 style
        )
    
    # For high-concurrency applications
    def create_async_engine(database_url):
        from sqlalchemy.ext.asyncio import create_async_engine
    
        return create_async_engine(
            database_url.replace('postgresql://', 'postgresql+asyncpg://'),
            pool_size=50,
            max_overflow=100,
            pool_pre_ping=True
        )
    Python

    Query Caching

    from sqlalchemy.orm import Query
    import hashlib
    import pickle
    
    class CachedQuery:
        def __init__(self, cache_backend=None):
            self.cache = cache_backend or {}
    
        def _cache_key(self, query):
            """Generate cache key from query"""
            query_str = str(query.statement.compile(compile_kwargs={"literal_binds": True}))
            return hashlib.md5(query_str.encode()).hexdigest()
    
        def get_or_execute(self, query, timeout=300):
            """Get result from cache or execute query"""
            cache_key = self._cache_key(query)
    
            # Try to get from cache
            if cache_key in self.cache:
                return pickle.loads(self.cache[cache_key])
    
            # Execute query and cache result
            result = query.all()
            self.cache[cache_key] = pickle.dumps(result)
    
            return result
    
    # Usage with Redis cache
    def setup_redis_cache():
        import redis
    
        redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
        class RedisCache:
            def __init__(self, client):
                self.client = client
    
            def __contains__(self, key):
                return self.client.exists(key)
    
            def __getitem__(self, key):
                return self.client.get(key)
    
            def __setitem__(self, key, value):
                self.client.setex(key, 300, value)  # 5-minute expiry
    
        return CachedQuery(RedisCache(redis_client))
    
    # Application-level caching
    def cached_user_by_email(email, cache=None):
        if cache and email in cache:
            return cache[email]
    
        session = SessionLocal()
        user = session.query(User).filter(User.email == email).first()
        session.close()
    
        if cache and user:
            cache[email] = user
    
        return user
    Python

    Database Indexes

    from sqlalchemy import Index, text
    
    class User(Base):
        __tablename__ = 'users'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True)
        email = Column(String(120), unique=True)
        first_name = Column(String(50))
        last_name = Column(String(50))
        created_at = Column(DateTime, default=datetime.utcnow)
        is_active = Column(Boolean, default=True)
    
        # Table-level indexes
        __table_args__ = (
            # Composite index for common query patterns
            Index('idx_user_name', 'first_name', 'last_name'),
    
            # Partial index for active users only
            Index('idx_active_users', 'created_at', postgresql_where=text('is_active = true')),
    
            # Functional index for case-insensitive searches
            Index('idx_username_lower', text('lower(username)')),
    
            # Covering index (includes additional columns)
            Index('idx_email_covering', 'email', 'first_name', 'last_name', 'is_active'),
        )
    
    # Create indexes programmatically
    def create_custom_indexes(engine):
        # GIN index for full-text search (PostgreSQL)
        gin_index = text("""
            CREATE INDEX IF NOT EXISTS idx_user_fulltext 
            ON users USING gin(to_tsvector('english', first_name || ' ' || last_name))
        """)
    
        # Hash index for exact matches (PostgreSQL)
        hash_index = text("""
            CREATE INDEX IF NOT EXISTS idx_user_email_hash 
            ON users USING hash(email)
        """)
    
        with engine.connect() as conn:
            conn.execute(gin_index)
            conn.execute(hash_index)
            conn.commit()
    Python

    12. Migration with Alembic

    REF:- https://blog.altgr.in/?p=943

    Alembic Setup

    graph TD
        A[Alembic] --> B[alembic init]
        B --> C[alembic.ini]
        B --> D[alembic/]
        D --> E[env.py]
        D --> F[script.py.mako]
        D --> G[versions/]
    
        A --> H[Commands]
        H --> I[revision]
        H --> J[upgrade]
        H --> K[downgrade]
        H --> L[history]

    Initial Setup

    # Install Alembic
    pip install alembic
    
    # Initialize Alembic in your project
    alembic init alembic
    
    # Edit alembic.ini to set database URL
    # sqlalchemy.url = postgresql://user:pass@localhost/dbname
    Bash

    Alembic Configuration

    # alembic/env.py
    from logging.config import fileConfig
    from sqlalchemy import engine_from_config, pool
    from alembic import context
    import os
    import sys
    
    # Add your project to the path
    sys.path.append(os.path.dirname(os.path.dirname(__file__)))
    
    # Import your models
    from models.base import Base
    from models.user import User
    from models.post import Post
    
    # This is the Alembic Config object
    config = context.config
    
    # Set up loggers
    if config.config_file_name is not None:
        fileConfig(config.config_file_name)
    
    # Target metadata
    target_metadata = Base.metadata
    
    def get_url():
        """Get database URL from environment or config"""
        return os.getenv("DATABASE_URL") or config.get_main_option("sqlalchemy.url")
    
    def run_migrations_offline():
        """Run migrations in 'offline' mode."""
        url = get_url()
        context.configure(
            url=url,
            target_metadata=target_metadata,
            literal_binds=True,
            dialect_opts={"paramstyle": "named"},
        )
    
        with context.begin_transaction():
            context.run_migrations()
    
    def run_migrations_online():
        """Run migrations in 'online' mode."""
        configuration = config.get_section(config.config_ini_section)
        configuration["sqlalchemy.url"] = get_url()
    
        connectable = engine_from_config(
            configuration,
            prefix="sqlalchemy.",
            poolclass=pool.NullPool,
        )
    
        with connectable.connect() as connection:
            context.configure(
                connection=connection, target_metadata=target_metadata
            )
    
            with context.begin_transaction():
                context.run_migrations()
    
    if context.is_offline_mode():
        run_migrations_offline()
    else:
        run_migrations_online()
    Python

    Creating Migrations

    # Create a new migration
    alembic revision --autogenerate -m "Create user table"
    
    # Create empty migration
    alembic revision -m "Custom migration"
    
    # Check current revision
    alembic current
    
    # Show migration history
    alembic history --verbose
    
    # Upgrade to latest
    alembic upgrade head
    
    # Upgrade to specific revision
    alembic upgrade +2  # Move forward 2 revisions
    alembic upgrade ae1027a6acf  # Upgrade to specific revision
    
    # Downgrade
    alembic downgrade -1  # Move back 1 revision
    alembic downgrade base  # Downgrade to beginning
    Bash

    Custom Migration Scripts

    # Example migration file: versions/001_create_user_table.py
    """Create user table
    
    Revision ID: ae1027a6acf
    Revises: 
    Create Date: 2023-01-01 12:00:00.000000
    
    """
    from alembic import op
    import sqlalchemy as sa
    from sqlalchemy.dialects import postgresql
    
    # revision identifiers
    revision = 'ae1027a6acf'
    down_revision = None
    branch_labels = None
    depends_on = None
    
    def upgrade():
        # Create users table
        op.create_table('users',
            sa.Column('id', sa.Integer(), nullable=False),
            sa.Column('username', sa.String(length=50), nullable=False),
            sa.Column('email', sa.String(length=120), nullable=False),
            sa.Column('password_hash', sa.String(length=255), nullable=False),
            sa.Column('created_at', sa.DateTime(), nullable=True),
            sa.Column('updated_at', sa.DateTime(), nullable=True),
            sa.Column('is_active', sa.Boolean(), nullable=True),
            sa.PrimaryKeyConstraint('id')
        )
    
        # Create indexes
        op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
        op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True)
    
        # Add check constraint
        op.create_check_constraint(
            'check_email_format',
            'users',
            sa.text("email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'")
        )
    
    def downgrade():
        # Drop constraints and indexes
        op.drop_constraint('check_email_format', 'users', type_='check')
        op.drop_index(op.f('ix_users_username'), table_name='users')
        op.drop_index(op.f('ix_users_email'), table_name='users')
    
        # Drop table
        op.drop_table('users')
    Python

    Complex Migration Examples

    # Data migration example
    def upgrade():
        # Create new column
        op.add_column('users', sa.Column('full_name', sa.String(100), nullable=True))
    
        # Update data using raw SQL
        connection = op.get_bind()
        connection.execute(
            sa.text("""
                UPDATE users 
                SET full_name = first_name || ' ' || last_name 
                WHERE first_name IS NOT NULL AND last_name IS NOT NULL
            """)
        )
    
        # Make column non-nullable after populating data
        op.alter_column('users', 'full_name', nullable=False)
    
    # Table rename and restructure
    def upgrade():
        # Rename table
        op.rename_table('old_posts', 'posts')
    
        # Add new columns
        op.add_column('posts', sa.Column('slug', sa.String(200), nullable=True))
    
        # Rename column
        op.alter_column('posts', 'content', new_column_name='body')
    
        # Change column type
        op.alter_column('posts', 'views', 
                        type_=sa.BigInteger(), 
                        existing_type=sa.Integer())
    
    # Foreign key modifications
    def upgrade():
        # Drop foreign key
        op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')
    
        # Add new foreign key with different options
        op.create_foreign_key(
            'fk_posts_author_id', 'posts', 'users',
            ['author_id'], ['id'],
            ondelete='CASCADE',
            onupdate='RESTRICT'
        )
    Python

    Environment-Specific Migrations

    # alembic/env.py - Environment-specific configuration
    import os
    
    def run_migrations_online():
        """Run migrations in 'online' mode."""
        configuration = config.get_section(config.config_ini_section)
    
        # Environment-specific settings
        env = os.getenv('ENVIRONMENT', 'development')
    
        if env == 'production':
            # Production settings
            configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
            configuration["sqlalchemy.pool_timeout"] = "60"
            configuration["sqlalchemy.pool_recycle"] = "3600"
        elif env == 'test':
            # Test settings
            configuration["sqlalchemy.url"] = "sqlite:///:memory:"
        else:
            # Development settings
            configuration["sqlalchemy.url"] = "sqlite:///development.db"
    
        connectable = engine_from_config(configuration, prefix="sqlalchemy.")
    
        with connectable.connect() as connection:
            context.configure(
                connection=connection,
                target_metadata=target_metadata,
                compare_type=True,  # Compare column types
                compare_server_default=True  # Compare default values
            )
    
            with context.begin_transaction():
                context.run_migrations()
    Python

    13. Testing with SQLAlchemy

    Test Database Setup

    graph TD
        A[Test Setup] --> B[In-Memory DB]
        A --> C[Test Fixtures]
        A --> D[Factory Pattern]
        A --> E[Transaction Rollback]
    
        B --> F[SQLite :memory:]
        C --> G[pytest fixtures]
        D --> H[Factory Boy]
        E --> I[Session Rollback]

    Basic Test Configuration

    # tests/conftest.py
    import pytest
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from models.base import Base
    
    # Test database URL
    TEST_DATABASE_URL = "sqlite:///:memory:"
    
    @pytest.fixture(scope="session")
    def engine():
        """Create test database engine"""
        engine = create_engine(
            TEST_DATABASE_URL,
            connect_args={"check_same_thread": False},
            echo=False  # Set to True for SQL debugging
        )
        Base.metadata.create_all(engine)
        yield engine
        engine.dispose()
    
    @pytest.fixture(scope="function")
    def session(engine):
        """Create database session for each test"""
        SessionLocal = sessionmaker(bind=engine)
        session = SessionLocal()
    
        # Start a transaction
        transaction = session.begin()
    
        yield session
    
        # Rollback transaction after test
        transaction.rollback()
        session.close()
    
    @pytest.fixture
    def sample_user(session):
        """Create a sample user for testing"""
        from models.user import User
    
        user = User(
            username="testuser",
            email="test@example.com",
            password="testpass123"
        )
        session.add(user)
        session.commit()
        session.refresh(user)
        return user
    Python

    Model Testing

    # tests/test_models.py
    import pytest
    from datetime import datetime
    from models.user import User
    from models.post import Post
    
    class TestUser:
        def test_create_user(self, session):
            """Test user creation"""
            user = User(
                username="newuser",
                email="new@example.com",
                password="password123"
            )
            session.add(user)
            session.commit()
    
            assert user.id is not None
            assert user.username == "newuser"
            assert user.email == "new@example.com"
            assert user.created_at is not None
    
        def test_user_validation(self, session):
            """Test user validation"""
            # Test invalid email
            with pytest.raises(ValueError, match="Invalid email"):
                user = User(
                    username="user",
                    email="invalid-email",
                    password="pass"
                )
                session.add(user)
                session.commit()
    
        def test_user_relationships(self, session, sample_user):
            """Test user-post relationship"""
            post = Post(
                title="Test Post",
                content="This is a test post",
                author=sample_user
            )
            session.add(post)
            session.commit()
    
            assert len(sample_user.posts) == 1
            assert sample_user.posts[0].title == "Test Post"
            assert post.author == sample_user
    
        def test_user_methods(self, session):
            """Test user custom methods"""
            user = User(
                username="john",
                email="john@example.com",
                password="secret123"
            )
            session.add(user)
            session.commit()
    
            # Test password verification
            assert user.verify_password("secret123") is True
            assert user.verify_password("wrong") is False
    
            # Test class method
            found_user = User.get_by_email(session, "john@example.com")
            assert found_user == user
    Python

    Repository Testing

    # tests/test_repositories.py
    import pytest
    from repositories.user_repository import UserRepository
    from models.user import User
    
    class TestUserRepository:
        @pytest.fixture
        def user_repo(self, session):
            return UserRepository(session)
    
        def test_create_user(self, user_repo):
            """Test user creation through repository"""
            user_data = {
                'username': 'newuser',
                'email': 'new@example.com',
                'password': 'password123'
            }
    
            user = user_repo.create(user_data)
    
            assert user.id is not None
            assert user.username == 'newuser'
    
        def test_get_by_id(self, user_repo, sample_user):
            """Test getting user by ID"""
            user = user_repo.get_by_id(sample_user.id)
            assert user == sample_user
    
        def test_get_by_email(self, user_repo, sample_user):
            """Test getting user by email"""
            user = user_repo.get_by_email(sample_user.email)
            assert user == sample_user
    
        def test_update_user(self, user_repo, sample_user):
            """Test user update"""
            updated_user = user_repo.update(
                sample_user.id,
                {'username': 'updated_username'}
            )
    
            assert updated_user.username == 'updated_username'
    
        def test_delete_user(self, user_repo, sample_user):
            """Test user deletion"""
            user_id = sample_user.id
            user_repo.delete(user_id)
    
            deleted_user = user_repo.get_by_id(user_id)
            assert deleted_user is None
    
        def test_list_users_with_pagination(self, user_repo):
            """Test user listing with pagination"""
            # Create multiple users
            for i in range(15):
                user_repo.create({
                    'username': f'user_{i}',
                    'email': f'user_{i}@example.com',
                    'password': 'password'
                })
    
            # Test pagination
            page_1 = user_repo.list_users(page=1, per_page=10)
            page_2 = user_repo.list_users(page=2, per_page=10)
    
            assert len(page_1) == 10
            assert len(page_2) == 5
    Python

    Testing with Factory Boy

    # tests/factories.py
    import factory
    from factory.alchemy import SQLAlchemyModelFactory
    from models.user import User
    from models.post import Post
    from datetime import datetime
    
    class UserFactory(SQLAlchemyModelFactory):
        class Meta:
            model = User
            sqlalchemy_session_persistence = "commit"
    
        username = factory.Sequence(lambda n: f"user_{n}")
        email = factory.LazyAttribute(lambda obj: f"{obj.username}@example.com")
        password = "defaultpassword"
        first_name = factory.Faker('first_name')
        last_name = factory.Faker('last_name')
        is_active = True
        created_at = factory.LazyFunction(datetime.utcnow)
    
    class PostFactory(SQLAlchemyModelFactory):
        class Meta:
            model = Post
            sqlalchemy_session_persistence = "commit"
    
        title = factory.Faker('sentence', nb_words=4)
        content = factory.Faker('text', max_nb_chars=500)
        author = factory.SubFactory(UserFactory)
        created_at = factory.LazyFunction(datetime.utcnow)
    
    # Usage in tests
    def test_with_factories(session):
        # Set the session for factories
        UserFactory._meta.sqlalchemy_session = session
        PostFactory._meta.sqlalchemy_session = session
    
        # Create test data
        user = UserFactory()
        posts = PostFactory.create_batch(5, author=user)
    
        assert len(user.posts) == 5
        assert all(post.author == user for post in posts)
    
    # Advanced factory usage
    class AdminUserFactory(UserFactory):
        is_admin = True
        username = factory.Sequence(lambda n: f"admin_{n}")
    
    class PublishedPostFactory(PostFactory):
        is_published = True
        published_at = factory.LazyFunction(datetime.utcnow)
    Python

    Integration Testing

    # tests/test_integration.py
    import pytest
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from models.base import Base
    
    class TestIntegration:
        @pytest.fixture(scope="class")
        def integration_engine(self):
            """Use a real database for integration tests"""
            engine = create_engine("postgresql://test:test@localhost/test_db")
            Base.metadata.create_all(engine)
            yield engine
            Base.metadata.drop_all(engine)
    
        @pytest.fixture
        def integration_session(self, integration_engine):
            SessionLocal = sessionmaker(bind=integration_engine)
            session = SessionLocal()
            yield session
            session.close()
    
        def test_full_user_workflow(self, integration_session):
            """Test complete user workflow"""
            from services.user_service import UserService
    
            user_service = UserService(integration_session)
    
            # Create user
            user = user_service.register_user(
                username="integrationuser",
                email="integration@example.com",
                password="password123"
            )
    
            # Verify user exists
            found_user = user_service.get_user_by_email("integration@example.com")
            assert found_user == user
    
            # Update user
            updated_user = user_service.update_user_profile(
                user.id,
                first_name="John",
                last_name="Doe"
            )
            assert updated_user.first_name == "John"
    
            # Create posts
            from services.post_service import PostService
            post_service = PostService(integration_session)
    
            post = post_service.create_post(
                user_id=user.id,
                title="Integration Test Post",
                content="This is a test post for integration testing"
            )
    
            # Verify relationships
            integration_session.refresh(user)
            assert len(user.posts) == 1
            assert user.posts[0] == post
    
        def test_database_constraints(self, integration_session):
            """Test database-level constraints"""
            from models.user import User
    
            # Test unique constraint
            user1 = User(username="unique", email="unique@example.com")
            integration_session.add(user1)
            integration_session.commit()
    
            # This should fail due to unique constraint
            user2 = User(username="unique", email="different@example.com")
            integration_session.add(user2)
    
            with pytest.raises(Exception):  # Database integrity error
                integration_session.commit()
    Python

    14. Best Practices and Patterns

    Repository Pattern

    # repositories/base_repository.py
    from abc import ABC, abstractmethod
    from typing import TypeVar, Generic, List, Optional
    from sqlalchemy.orm import Session
    
    T = TypeVar('T')
    
    class BaseRepository(Generic[T], ABC):
        def __init__(self, session: Session, model_class: type):
            self.session = session
            self.model_class = model_class
    
        def create(self, **kwargs) -> T:
            """Create new entity"""
            entity = self.model_class(**kwargs)
            self.session.add(entity)
            self.session.commit()
            self.session.refresh(entity)
            return entity
    
        def get_by_id(self, entity_id: int) -> Optional[T]:
            """Get entity by ID"""
            return self.session.query(self.model_class).filter(
                self.model_class.id == entity_id
            ).first()
    
        def get_all(self, limit: int = 100, offset: int = 0) -> List[T]:
            """Get all entities with pagination"""
            return self.session.query(self.model_class).offset(offset).limit(limit).all()
    
        def update(self, entity_id: int, **kwargs) -> Optional[T]:
            """Update entity"""
            entity = self.get_by_id(entity_id)
            if entity:
                for key, value in kwargs.items():
                    setattr(entity, key, value)
                self.session.commit()
                self.session.refresh(entity)
            return entity
    
        def delete(self, entity_id: int) -> bool:
            """Delete entity"""
            entity = self.get_by_id(entity_id)
            if entity:
                self.session.delete(entity)
                self.session.commit()
                return True
            return False
    
    # repositories/user_repository.py
    from models.user import User
    from .base_repository import BaseRepository
    
    class UserRepository(BaseRepository[User]):
        def __init__(self, session: Session):
            super().__init__(session, User)
    
        def get_by_email(self, email: str) -> Optional[User]:
            """Get user by email"""
            return self.session.query(User).filter(User.email == email).first()
    
        def get_by_username(self, username: str) -> Optional[User]:
            """Get user by username"""
            return self.session.query(User).filter(User.username == username).first()
    
        def get_active_users(self) -> List[User]:
            """Get all active users"""
            return self.session.query(User).filter(User.is_active == True).all()
    
        def search_users(self, query: str) -> List[User]:
            """Search users by name or email"""
            search_term = f"%{query}%"
            return self.session.query(User).filter(
                or_(
                    User.username.ilike(search_term),
                    User.email.ilike(search_term),
                    User.first_name.ilike(search_term),
                    User.last_name.ilike(search_term)
                )
            ).all()
    Python

    Service Layer Pattern

    # services/base_service.py
    from abc import ABC
    from sqlalchemy.orm import Session
    
    class BaseService(ABC):
        def __init__(self, session: Session):
            self.session = session
    
    # services/user_service.py
    from typing import Optional, List
    from repositories.user_repository import UserRepository
    from .base_service import BaseService
    import hashlib
    
    class UserService(BaseService):
        def __init__(self, session: Session):
            super().__init__(session)
            self.user_repo = UserRepository(session)
    
        def register_user(self, username: str, email: str, password: str) -> User:
            """Register a new user"""
            # Check if user already exists
            if self.user_repo.get_by_email(email):
                raise ValueError("User with this email already exists")
    
            if self.user_repo.get_by_username(username):
                raise ValueError("Username already taken")
    
            # Hash password
            password_hash = hashlib.sha256(password.encode()).hexdigest()
    
            # Create user
            user = self.user_repo.create(
                username=username,
                email=email,
                password_hash=password_hash
            )
    
            return user
    
        def authenticate_user(self, email: str, password: str) -> Optional[User]:
            """Authenticate user login"""
            user = self.user_repo.get_by_email(email)
            if user and user.verify_password(password):
                return user
            return None
    
        def update_user_profile(self, user_id: int, **kwargs) -> Optional[User]:
            """Update user profile"""
            # Remove sensitive fields
            sensitive_fields = ['password_hash', 'id']
            for field in sensitive_fields:
                kwargs.pop(field, None)
    
            return self.user_repo.update(user_id, **kwargs)
    
        def deactivate_user(self, user_id: int) -> bool:
            """Deactivate user account"""
            return self.user_repo.update(user_id, is_active=False) is not None
    Python

    Unit of Work Pattern

    # patterns/unit_of_work.py
    from contextlib import contextmanager
    from sqlalchemy.orm import Session
    from repositories.user_repository import UserRepository
    from repositories.post_repository import PostRepository
    
    class UnitOfWork:
        def __init__(self, session: Session):
            self.session = session
            self._users = None
            self._posts = None
    
        @property
        def users(self) -> UserRepository:
            if self._users is None:
                self._users = UserRepository(self.session)
            return self._users
    
        @property
        def posts(self) -> PostRepository:
            if self._posts is None:
                self._posts = PostRepository(self.session)
            return self._posts
    
        def commit(self):
            """Commit all changes"""
            self.session.commit()
    
        def rollback(self):
            """Rollback all changes"""
            self.session.rollback()
    
    @contextmanager
    def unit_of_work(session: Session):
        """Context manager for unit of work"""
        uow = UnitOfWork(session)
        try:
            yield uow
            uow.commit()
        except Exception:
            uow.rollback()
            raise
    
    # Usage
    def transfer_posts(from_user_id: int, to_user_id: int, session: Session):
        """Transfer all posts from one user to another"""
        with unit_of_work(session) as uow:
            from_user = uow.users.get_by_id(from_user_id)
            to_user = uow.users.get_by_id(to_user_id)
    
            if not from_user or not to_user:
                raise ValueError("One or both users not found")
    
            # Transfer posts
            posts = uow.posts.get_by_user_id(from_user_id)
            for post in posts:
                uow.posts.update(post.id, user_id=to_user_id)
    
            # Update user stats
            uow.users.update(from_user_id, post_count=0)
            uow.users.update(to_user_id, post_count=len(posts))
    
            # All changes committed together or rolled back on error
    Python

    Configuration Management

    # config/database.py
    import os
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.pool import QueuePool
    
    class DatabaseConfig:
        """Database configuration management"""
    
        def __init__(self):
            self.environment = os.getenv('ENVIRONMENT', 'development')
            self.database_url = self._get_database_url()
            self.engine_options = self._get_engine_options()
    
        def _get_database_url(self) -> str:
            """Get database URL based on environment"""
            if self.environment == 'production':
                return os.getenv('DATABASE_URL')
            elif self.environment == 'test':
                return os.getenv('TEST_DATABASE_URL', 'sqlite:///:memory:')
            else:
                return os.getenv('DEV_DATABASE_URL', 'sqlite:///development.db')
    
        def _get_engine_options(self) -> dict:
            """Get engine options based on environment"""
            base_options = {
                'poolclass': QueuePool,
                'pool_pre_ping': True,
                'echo': False
            }
    
            if self.environment == 'production':
                base_options.update({
                    'pool_size': 20,
                    'max_overflow': 30,
                    'pool_recycle': 3600,
                    'connect_args': {
                        'connect_timeout': 10,
                        'application_name': 'myapp_prod'
                    }
                })
            elif self.environment == 'test':
                base_options.update({
                    'pool_size': 5,
                    'max_overflow': 10,
                    'connect_args': {'check_same_thread': False}
                })
            else:
                base_options.update({
                    'echo': True,  # Enable SQL logging in development
                    'pool_size': 5,
                    'max_overflow': 10
                })
    
            return base_options
    
        def create_engine(self):
            """Create configured database engine"""
            return create_engine(self.database_url, **self.engine_options)
    
        def create_session_factory(self, engine):
            """Create session factory"""
            return sessionmaker(
                bind=engine,
                autocommit=False,
                autoflush=False,
                expire_on_commit=False
            )
    
    # Usage
    config = DatabaseConfig()
    engine = config.create_engine()
    SessionLocal = config.create_session_factory(engine)
    Python

    Error Handling and Logging

    # utils/exceptions.py
    class DatabaseError(Exception):
        """Base database exception"""
        pass
    
    class ValidationError(DatabaseError):
        """Data validation error"""
        pass
    
    class NotFoundError(DatabaseError):
        """Entity not found error"""
        pass
    
    class DuplicateError(DatabaseError):
        """Duplicate entity error"""
        pass
    
    # utils/logging_config.py
    import logging
    from sqlalchemy import event
    from sqlalchemy.engine import Engine
    import time
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Log slow queries
    @event.listens_for(Engine, "before_cursor_execute")
    def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        context._query_start_time = time.time()
    
    @event.listens_for(Engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        total = time.time() - context._query_start_time
    
        if total > 0.5:  # Log queries slower than 500ms
            logger.warning(f"Slow query: {total:.2f}s - {statement[:100]}...")
    
    # Enhanced repository with error handling
    class EnhancedUserRepository(UserRepository):
        def create(self, **kwargs) -> User:
            """Create user with error handling"""
            try:
                return super().create(**kwargs)
            except IntegrityError as e:
                self.session.rollback()
                if 'email' in str(e):
                    raise DuplicateError("Email already exists")
                elif 'username' in str(e):
                    raise DuplicateError("Username already exists")
                else:
                    raise DatabaseError(f"Database error: {str(e)}")
            except Exception as e:
                self.session.rollback()
                logger.error(f"Unexpected error creating user: {str(e)}")
                raise DatabaseError("Failed to create user")
    
        def get_by_id(self, user_id: int) -> User:
            """Get user by ID with error handling"""
            user = super().get_by_id(user_id)
            if not user:
                raise NotFoundError(f"User with ID {user_id} not found")
            return user
    Python

    15. Real-world Applications

    E-commerce Application

    # models/ecommerce.py
    from sqlalchemy import Column, Integer, String, Text, DECIMAL, DateTime, Boolean, ForeignKey, Table
    from sqlalchemy.orm import relationship
    from datetime import datetime
    from models.base import Base
    
    # Association table for order items
    order_items = Table(
        'order_items',
        Base.metadata,
        Column('order_id', Integer, ForeignKey('orders.id'), primary_key=True),
        Column('product_id', Integer, ForeignKey('products.id'), primary_key=True),
        Column('quantity', Integer, nullable=False),
        Column('price', DECIMAL(10, 2), nullable=False)  # Price at time of order
    )
    
    class Customer(Base):
        __tablename__ = 'customers'
    
        id = Column(Integer, primary_key=True)
        first_name = Column(String(50), nullable=False)
        last_name = Column(String(50), nullable=False)
        email = Column(String(120), unique=True, nullable=False)
        phone = Column(String(20))
        created_at = Column(DateTime, default=datetime.utcnow)
    
        # Relationships
        addresses = relationship("Address", back_populates="customer", cascade="all, delete-orphan")
        orders = relationship("Order", back_populates="customer")
    
    class Address(Base):
        __tablename__ = 'addresses'
    
        id = Column(Integer, primary_key=True)
        customer_id = Column(Integer, ForeignKey('customers.id'), nullable=False)
        street_address = Column(String(200), nullable=False)
        city = Column(String(100), nullable=False)
        state = Column(String(50), nullable=False)
        postal_code = Column(String(20), nullable=False)
        country = Column(String(50), nullable=False)
        is_default = Column(Boolean, default=False)
    
        # Relationships
        customer = relationship("Customer", back_populates="addresses")
    
    class Category(Base):
        __tablename__ = 'categories'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(100), nullable=False, unique=True)
        description = Column(Text)
        parent_id = Column(Integer, ForeignKey('categories.id'))
    
        # Self-referential relationship
        children = relationship("Category", back_populates="parent", cascade="all, delete-orphan")
        parent = relationship("Category", back_populates="children", remote_side=[id])
    
        # Products in this category
        products = relationship("Product", back_populates="category")
    
    class Product(Base):
        __tablename__ = 'products'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(200), nullable=False)
        description = Column(Text)
        price = Column(DECIMAL(10, 2), nullable=False)
        stock_quantity = Column(Integer, default=0)
        category_id = Column(Integer, ForeignKey('categories.id'))
        sku = Column(String(50), unique=True, nullable=False)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
    
        # Relationships
        category = relationship("Category", back_populates="products")
        orders = relationship("Order", secondary=order_items, back_populates="products")
    
    class Order(Base):
        __tablename__ = 'orders'
    
        id = Column(Integer, primary_key=True)
        customer_id = Column(Integer, ForeignKey('customers.id'), nullable=False)
        order_date = Column(DateTime, default=datetime.utcnow)
        total_amount = Column(DECIMAL(10, 2), nullable=False)
        status = Column(String(20), default='pending')  # pending, processing, shipped, delivered, cancelled
        shipping_address_id = Column(Integer, ForeignKey('addresses.id'))
    
        # Relationships
        customer = relationship("Customer", back_populates="orders")
        products = relationship("Product", secondary=order_items, back_populates="orders")
        shipping_address = relationship("Address")
    Python

    Blog Application

    # models/blog.py
    from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Table
    from sqlalchemy.orm import relationship
    from sqlalchemy.ext.hybrid import hybrid_property
    from datetime import datetime
    from models.base import Base
    
    # Many-to-many relationship between posts and tags
    post_tags = Table(
        'post_tags',
        Base.metadata,
        Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
        Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
    )
    
    class Author(Base):
        __tablename__ = 'authors'
    
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True, nullable=False)
        email = Column(String(120), unique=True, nullable=False)
        first_name = Column(String(50))
        last_name = Column(String(50))
        bio = Column(Text)
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, default=datetime.utcnow)
    
        # Relationships
        posts = relationship("Post", back_populates="author")
        comments = relationship("Comment", back_populates="author")
    
        @hybrid_property
        def full_name(self):
            if self.first_name and self.last_name:
                return f"{self.first_name} {self.last_name}"
            return self.username
    
    class Category(Base):
        __tablename__ = 'categories'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(100), unique=True, nullable=False)
        slug = Column(String(100), unique=True, nullable=False)
        description = Column(Text)
    
        # Relationships
        posts = relationship("Post", back_populates="category")
    
    class Tag(Base):
        __tablename__ = 'tags'
    
        id = Column(Integer, primary_key=True)
        name = Column(String(50), unique=True, nullable=False)
        slug = Column(String(50), unique=True, nullable=False)
    
        # Relationships
        posts = relationship("Post", secondary=post_tags, back_populates="tags")
    
    class Post(Base):
        __tablename__ = 'posts'
    
        id = Column(Integer, primary_key=True)
        title = Column(String(200), nullable=False)
        slug = Column(String(200), unique=True, nullable=False)
        content = Column(Text, nullable=False)
        excerpt = Column(Text)
        author_id = Column(Integer, ForeignKey('authors.id'), nullable=False)
        category_id = Column(Integer, ForeignKey('categories.id'))
        is_published = Column(Boolean, default=False)
        published_at = Column(DateTime)
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
        view_count = Column(Integer, default=0)
    
        # Relationships
        author = relationship("Author", back_populates="posts")
        category = relationship("Category", back_populates="posts")
        tags = relationship("Tag", secondary=post_tags, back_populates="posts")
        comments = relationship("Comment", back_populates="post", cascade="all, delete-orphan")
    
        @hybrid_property
        def is_published_and_visible(self):
            return self.is_published and self.published_at is not None
    
    class Comment(Base):
        __tablename__ = 'comments'
    
        id = Column(Integer, primary_key=True)
        post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
        author_id = Column(Integer, ForeignKey('authors.id'), nullable=False)
        content = Column(Text, nullable=False)
        is_approved = Column(Boolean, default=False)
        created_at = Column(DateTime, default=datetime.utcnow)
        parent_id = Column(Integer, ForeignKey('comments.id'))  # For threaded comments
    
        # Relationships
        post = relationship("Post", back_populates="comments")
        author = relationship("Author", back_populates="comments")
        replies = relationship("Comment", cascade="all, delete-orphan")
    
    # services/blog_service.py
    from sqlalchemy.orm import Session
    from repositories.blog_repository import PostRepository, AuthorRepository
    from utils.slug import generate_slug
    
    class BlogService:
        def __init__(self, session: Session):
            self.session = session
            self.post_repo = PostRepository(session)
            self.author_repo = AuthorRepository(session)
    
        def create_post(self, author_id: int, title: str, content: str, **kwargs) -> Post:
            """Create a new blog post"""
            # Generate slug from title
            slug = generate_slug(title)
    
            # Ensure slug is unique
            counter = 1
            original_slug = slug
            while self.post_repo.get_by_slug(slug):
                slug = f"{original_slug}-{counter}"
                counter += 1
    
            post_data = {
                'title': title,
                'slug': slug,
                'content': content,
                'author_id': author_id,
                **kwargs
            }
    
            return self.post_repo.create(**post_data)
    
        def publish_post(self, post_id: int) -> Post:
            """Publish a post"""
            return self.post_repo.update(post_id, 
                                       is_published=True, 
                                       published_at=datetime.utcnow())
    
        def get_published_posts(self, limit: int = 10, offset: int = 0):
            """Get published posts with pagination"""
            return self.post_repo.get_published_posts(limit=limit, offset=offset)
    
        def search_posts(self, query: str):
            """Search posts by title and content"""
            return self.post_repo.search_posts(query)
    Python

    FastAPI Integration

    # api/main.py
    from fastapi import FastAPI, Depends, HTTPException, status
    from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
    from sqlalchemy.orm import Session
    from database.connection import SessionLocal, engine
    from services.user_service import UserService
    from services.blog_service import BlogService
    from models.base import Base
    import jwt
    
    # Create tables
    Base.metadata.create_all(bind=engine)
    
    app = FastAPI(title="Blog API", version="1.0.0")
    security = HTTPBearer()
    
    # Dependency to get database session
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()
    
    # Authentication dependency
    def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), 
                        db: Session = Depends(get_db)):
        try:
            payload = jwt.decode(credentials.credentials, "secret_key", algorithms=["HS256"])
            user_id = payload.get("user_id")
            if user_id is None:
                raise HTTPException(status_code=401, detail="Invalid token")
    
            user_service = UserService(db)
            user = user_service.get_user_by_id(user_id)
            if user is None:
                raise HTTPException(status_code=401, detail="User not found")
            return user
        except jwt.PyJWTError:
            raise HTTPException(status_code=401, detail="Invalid token")
    
    # API endpoints
    @app.post("/auth/register")
    def register(user_data: dict, db: Session = Depends(get_db)):
        """Register a new user"""
        user_service = UserService(db)
        try:
            user = user_service.register_user(**user_data)
            return {"message": "User registered successfully", "user_id": user.id}
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
    
    @app.post("/auth/login")
    def login(credentials: dict, db: Session = Depends(get_db)):
        """User login"""
        user_service = UserService(db)
        user = user_service.authenticate_user(
            credentials["email"], 
            credentials["password"]
        )
    
        if not user:
            raise HTTPException(status_code=401, detail="Invalid credentials")
    
        # Generate JWT token
        token = jwt.encode({"user_id": user.id}, "secret_key", algorithm="HS256")
        return {"access_token": token, "token_type": "bearer"}
    
    @app.get("/posts")
    def get_posts(limit: int = 10, offset: int = 0, db: Session = Depends(get_db)):
        """Get published posts"""
        blog_service = BlogService(db)
        posts = blog_service.get_published_posts(limit=limit, offset=offset)
        return {"posts": [post.to_dict() for post in posts]}
    
    @app.post("/posts")
    def create_post(post_data: dict, 
                   current_user = Depends(get_current_user),
                   db: Session = Depends(get_db)):
        """Create a new post"""
        blog_service = BlogService(db)
        post = blog_service.create_post(
            author_id=current_user.id,
            **post_data
        )
        return {"message": "Post created successfully", "post_id": post.id}
    
    @app.get("/posts/{post_slug}")
    def get_post(post_slug: str, db: Session = Depends(get_db)):
        """Get post by slug"""
        blog_service = BlogService(db)
        post = blog_service.get_post_by_slug(post_slug)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")
        return post.to_dict()
    
    @app.put("/posts/{post_id}/publish")
    def publish_post(post_id: int,
                    current_user = Depends(get_current_user),
                    db: Session = Depends(get_db)):
        """Publish a post"""
        blog_service = BlogService(db)
    
        # Check if user owns the post
        post = blog_service.get_post_by_id(post_id)
        if not post or post.author_id != current_user.id:
            raise HTTPException(status_code=404, detail="Post not found")
    
        published_post = blog_service.publish_post(post_id)
        return {"message": "Post published successfully"}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    Python

    Performance Monitoring

    # monitoring/performance.py
    import time
    import logging
    from functools import wraps
    from sqlalchemy import event
    from sqlalchemy.engine import Engine
    from contextlib import contextmanager
    
    # Performance monitoring decorator
    def monitor_performance(operation_name: str):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    duration = time.time() - start_time
                    logging.info(f"{operation_name} completed in {duration:.3f}s")
                    return result
                except Exception as e:
                    duration = time.time() - start_time
                    logging.error(f"{operation_name} failed after {duration:.3f}s: {str(e)}")
                    raise
            return wrapper
        return decorator
    
    # Database query monitoring
    class QueryProfiler:
        def __init__(self):
            self.queries = []
            self.slow_query_threshold = 0.1  # 100ms
    
        def log_query(self, statement, duration, parameters=None):
            query_info = {
                'statement': statement,
                'duration': duration,
                'parameters': parameters,
                'timestamp': time.time()
            }
            self.queries.append(query_info)
    
            if duration > self.slow_query_threshold:
                logging.warning(f"Slow query detected ({duration:.3f}s): {statement[:100]}...")
    
        def get_stats(self):
            if not self.queries:
                return {"total_queries": 0}
    
            durations = [q['duration'] for q in self.queries]
            return {
                "total_queries": len(self.queries),
                "avg_duration": sum(durations) / len(durations),
                "max_duration": max(durations),
                "slow_queries": len([d for d in durations if d > self.slow_query_threshold])
            }
    
    # Global profiler instance
    profiler = QueryProfiler()
    
    @event.listens_for(Engine, "before_cursor_execute")
    def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        context._query_start_time = time.time()
    
    @event.listens_for(Engine, "after_cursor_execute")
    def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
        duration = time.time() - context._query_start_time
        profiler.log_query(statement, duration, parameters)
    
    # Context manager for monitoring service operations
    @contextmanager
    def monitor_service_operation(operation_name: str):
        start_time = time.time()
        initial_query_count = len(profiler.queries)
    
        try:
            yield
            duration = time.time() - start_time
            query_count = len(profiler.queries) - initial_query_count
    
            logging.info(f"{operation_name}: {duration:.3f}s, {query_count} queries")
    
            if query_count > 10:  # Flag operations with many queries
                logging.warning(f"{operation_name} executed {query_count} queries - potential N+1 problem")
    
        except Exception as e:
            duration = time.time() - start_time
            logging.error(f"{operation_name} failed after {duration:.3f}s: {str(e)}")
            raise
    
    # Usage in services
    class MonitoredUserService(UserService):
        @monitor_performance("create_user")
        def register_user(self, username: str, email: str, password: str):
            with monitor_service_operation("user_registration"):
                return super().register_user(username, email, password)
    
        @monitor_performance("get_user_posts")
        def get_user_with_posts(self, user_id: int):
            with monitor_service_operation("get_user_with_posts"):
                return self.user_repo.get_by_id_with_posts(user_id)
    Python

    This completes the comprehensive SQLAlchemy guide covering everything from beginner concepts to expert-level patterns and real-world applications. The guide includes practical examples, best practices, and production-ready code patterns that can be applied to build robust database-driven applications.


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *