A Comprehensive Guide to Distributed Task Processing

    Table of Contents

    Part I: Foundations (Beginner)

    1. Introduction to Celery
      • What is Celery?
      • Why Use Celery?
      • When to Use Celery
      • Celery Architecture Overview
    2. Installation and Setup
      • Prerequisites
      • Installing Celery
      • Message Brokers (Redis, RabbitMQ)
      • Development Environment Setup
    3. Your First Celery Application
      • Creating a Basic Task
      • Running Workers
      • Calling Tasks
      • Understanding the Workflow
    4. Basic Celery Concepts
      • Tasks and Task Objects
      • Workers and Concurrency
      • Brokers and Backends
      • Serialization Basics

    Part II: Building Skills (Intermediate)

    1. Task Routing and Organization
      • Task Discovery
      • Routing Tasks to Specific Workers
      • Task Naming and Organization
      • Multiple Queues
    2. Configuration and Settings
      • Celery Configuration
      • Broker Settings
      • Result Backend Configuration
      • Worker Configuration
    3. Error Handling and Retries
      • Exception Handling in Tasks
      • Automatic Retries
      • Custom Retry Logic
      • Dead Letter Queues
    4. Monitoring and Debugging
      • Celery Events
      • Flower Monitoring
      • Logging Best Practices
      • Debugging Failed Tasks
    5. Workflows and Chains
      • Task Signatures
      • Chains and Groups
      • Chords and Maps
      • Complex Workflows

    Part III: Advanced Techniques (Advanced)

    1. Performance Optimization
      • Worker Optimization
      • Memory Management
      • Concurrency Models
      • Bulk Operations
    2. Security and Authentication
      • Message Encryption
      • SSL/TLS Configuration
      • Authentication Mechanisms
      • Security Best Practices
    3. Custom Components
      • Custom Serializers
      • Custom Task Classes
      • Custom Result Backends
      • Middleware and Signals
    4. Scaling and Distribution
      • Horizontal Scaling
      • Load Balancing
      • Geographic Distribution
      • Container Orchestration

    Part IV: Expert Level

    1. Production Deployment
      • Deployment Strategies
      • Process Management
      • High Availability Setup
      • Disaster Recovery
    2. Advanced Patterns
      • Event-Driven Architecture
      • Saga Pattern
      • Circuit Breakers
      • Rate Limiting
    3. Integration and Ecosystem
      • Django Integration
      • Flask Integration
      • FastAPI Integration
      • Third-party Extensions
    4. Troubleshooting and Maintenance
      • Common Issues and Solutions
      • Performance Profiling
      • Memory Leaks
      • Network Issues
    5. Real-World Case Studies
      • E-commerce Order Processing
      • Data Pipeline Automation
      • Image Processing Service
      • Notification System

    Appendices

    A. Configuration Reference B. CLI Commands Reference C. Troubleshooting Guide D. Resources and Further Reading


    Chapter 1: Introduction to Celery

    What is Celery?

    Celery is a distributed task queue system for Python that enables you to execute tasks asynchronously across multiple machines or processes. It’s designed to handle millions of tasks per minute while providing flexibility, reliability, and ease of use.

    graph TD
        A[Client Application] -->|Submit Task| B[Message Broker]
        B -->|Distribute| C[Worker 1]
        B -->|Distribute| D[Worker 2]
        B -->|Distribute| E[Worker N]
        C -->|Store Result| F[Result Backend]
        D -->|Store Result| F
        E -->|Store Result| F
        F -->|Retrieve Result| A
    
        style A fill:#e1f5fe
        style B fill:#fff3e0
        style C fill:#e8f5e8
        style D fill:#e8f5e8
        style E fill:#e8f5e8
        style F fill:#fce4ec

    Key Features

    • Asynchronous Task Execution: Execute time-consuming tasks in the background
    • Distributed Processing: Scale across multiple machines and processes
    • Multiple Broker Support: Redis, RabbitMQ, Amazon SQS, and more
    • Flexible Routing: Route tasks to specific workers or queues
    • Monitoring and Management: Built-in monitoring tools and web interfaces
    • Fault Tolerance: Automatic retries and error handling
    • Integration Ready: Works seamlessly with Django, Flask, and other frameworks

    Why Use Celery?

    1. Improved User Experience

    Long-running tasks can make web applications unresponsive. Celery allows you to offload these tasks to background workers, keeping your application fast and responsive.

    sequenceDiagram
        participant User
        participant WebApp
        participant Celery
        participant Worker
    
        User->>WebApp: Upload large file
        WebApp->>Celery: Queue processing task
        WebApp->>User: Immediate response (202 Accepted)
        Celery->>Worker: Assign task
        Worker->>Worker: Process file
        Worker->>WebApp: Task complete notification
        WebApp->>User: Send completion notification

    2. Scalability

    Celery makes it easy to scale your application horizontally by adding more worker processes or machines as your workload grows.

    3. Reliability

    With features like automatic retries, dead letter queues, and result persistence, Celery ensures that important tasks are completed even in the face of failures.

    4. Flexibility

    Support for multiple message brokers, serialization formats, and result backends gives you the flexibility to choose the best tools for your specific use case.

    When to Use Celery

    Perfect Use Cases

    1. Web Application Tasks
      • Sending emails
      • Image/video processing
      • Report generation
      • Data imports/exports
    2. Data Processing
      • ETL pipelines
      • Machine learning training
      • Batch processing
      • Data synchronization
    3. System Integration
      • API calls to external services
      • File processing
      • Backup operations
      • Cleanup tasks
    4. Scheduled Tasks
      • Periodic reports
      • Database maintenance
      • Cache warming
      • Health checks

    When NOT to Use Celery

    • Simple Scripts: For one-off scripts or simple applications
    • Real-time Requirements: When sub-second response times are critical
    • Low Latency Needs: When task overhead exceeds processing time
    • Simple Cron Jobs: When basic scheduling is sufficient

    Celery Architecture Overview

    Celery follows a distributed architecture pattern with several key components:

    graph TB
        subgraph "Client/Producer"
            A[Application Code]
            B[Celery Client]
        end
    
        subgraph "Message Infrastructure"
            C[Message BrokerRedis/RabbitMQ]
            D[Result BackendRedis/Database]
        end
    
        subgraph "Workers"
            E[Worker Process 1]
            F[Worker Process 2]
            G[Worker Process N]
        end
    
        subgraph "Monitoring"
            H[Flower]
            I[Celery Events]
            J[Management Commands]
        end
    
        A --> B
        B -->|Send Task| C
        C -->|Get Task| E
        C -->|Get Task| F
        C -->|Get Task| G
        E -->|Store Result| D
        F -->|Store Result| D
        G -->|Store Result| D
        D -->|Get Result| B
        B --> A
    
        C -.->|Monitor| H
        E -.->|Events| I
        F -.->|Events| I
        G -.->|Events| I
        I --> H
        J -.->|Control| E
        J -.->|Control| F
        J -.->|Control| G
    
        style A fill:#e3f2fd
        style C fill:#fff8e1
        style D fill:#f3e5f5
        style E fill:#e8f5e8
        style F fill:#e8f5e8
        style G fill:#e8f5e8
        style H fill:#fce4ec

    Core Components

    1. Producer/Client: Your application that creates and sends tasks
    2. Message Broker: Stores and routes messages between producers and workers
    3. Workers: Processes that execute tasks
    4. Result Backend: Stores task results (optional)
    5. Monitoring Tools: Tools for observing and managing the system

    Chapter 2: Installation and Setup

    Prerequisites

    Before installing Celery, ensure you have:

    • Python 3.7 or higher
    • A message broker (Redis or RabbitMQ recommended)
    • Basic knowledge of Python and command-line usage

    Installing Celery

    Basic Installation

    pip install celery
    Bash

    With Redis Support

    pip install celery[redis]
    Bash

    With RabbitMQ Support

    pip install celery[librabbitmq]
    Bash

    Complete Installation (All Extras)

    pip install celery[redis,auth,msgpack]
    Bash

    Message Brokers

    Celery requires a message broker to send and receive messages. Here are the most popular options:

    Redis Setup

    Redis is often the easiest broker to get started with:

    # Install Redis
    # Ubuntu/Debian
    sudo apt-get install redis-server
    
    # macOS
    brew install redis
    
    # Windows (using Docker)
    docker run -d -p 6379:6379 redis:latest
    Bash

    Redis Configuration for Celery:

    # celeryconfig.py
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/0'
    Python

    RabbitMQ Setup

    RabbitMQ offers more advanced features for complex scenarios:

    # Ubuntu/Debian
    sudo apt-get install rabbitmq-server
    
    # macOS
    brew install rabbitmq
    
    # Windows (using Docker)
    docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    Bash

    RabbitMQ Configuration for Celery:

    # celeryconfig.py
    broker_url = 'pyamqp://guest@localhost//'
    result_backend = 'rpc://'
    Python
    graph LR
        subgraph "Message Broker Options"
            A[RedisSimple, Fast]
            B[RabbitMQFeature Rich]
            C[Amazon SQSCloud Native]
            D[Apache KafkaHigh Throughput]
        end
    
        subgraph "Use Cases"
            E[Development& Simple Apps]
            F[EnterpriseApplications]
            G[AWSDeployments]
            H[Big DataPipelines]
        end
    
        A --> E
        B --> F
        C --> G
        D --> H
    
        style A fill:#ffebee
        style B fill:#e8f5e8
        style C fill:#fff3e0
        style D fill:#e3f2fd

    Development Environment Setup

    Project Structure

    Create a well-organized project structure:

    my_celery_project/
    ├── app/
       ├── __init__.py
       ├── celery.py          # Celery configuration
       ├── tasks.py           # Task definitions
       └── config.py          # Application config
    ├── requirements.txt
    ├── docker-compose.yml     # For broker services
    └── README.md
    Bash

    Basic Celery App Setup

    Create the main Celery application:

    # app/celery.py
    from celery import Celery
    
    # Create Celery instance
    app = Celery('my_celery_project')
    
    # Load configuration
    app.config_from_object('app.config')
    
    # Auto-discover tasks
    app.autodiscover_tasks(['app'])
    
    if __name__ == '__main__':
        app.start()
    Python

    Configuration File

    # app/config.py
    from kombu import Queue
    
    # Broker settings
    broker_url = 'redis://localhost:6379/0'
    result_backend = 'redis://localhost:6379/0'
    
    # Task settings
    task_serializer = 'json'
    accept_content = ['json']
    result_serializer = 'json'
    timezone = 'UTC'
    enable_utc = True
    
    # Worker settings
    worker_prefetch_multiplier = 1
    task_acks_late = True
    
    # Queue configuration
    task_routes = {
        'app.tasks.send_email': {'queue': 'email'},
        'app.tasks.process_image': {'queue': 'images'},
    }
    
    task_default_queue = 'default'
    task_queues = (
        Queue('default'),
        Queue('email'),
        Queue('images'),
    )
    Python

    Docker Compose for Development

    # docker-compose.yml
    version: '3.8'
    
    services:
      redis:
        image: redis:7-alpine
        ports:
          - "6379:6379"
    
      rabbitmq:
        image: rabbitmq:3-management-alpine
        ports:
          - "5672:5672"
          - "15672:15672"
        environment:
          RABBITMQ_DEFAULT_USER: guest
          RABBITMQ_DEFAULT_PASS: guest
    
      flower:
        image: mher/flower:0.9.7
        ports:
          - "5555:5555"
        environment:
          CELERY_BROKER_URL: redis://redis:6379/0
          FLOWER_PORT: 5555
        depends_on:
          - redis
    YAML

    Requirements File

    # requirements.txt
    celery[redis]==5.3.4
    flower==2.0.1
    redis==5.0.1
    kombu==5.3.4
    TOML

    Chapter 3: Your First Celery Application

    Creating a Basic Task

    Let’s create your first Celery task step by step:

    # app/tasks.py
    from .celery import app
    import time
    
    @app.task
    def add_numbers(x, y):
        """A simple task that adds two numbers."""
        time.sleep(2)  # Simulate some work
        return x + y
    
    @app.task
    def send_notification(message, recipient):
        """Simulate sending a notification."""
        print(f"Sending '{message}' to {recipient}")
        time.sleep(3)  # Simulate network delay
        return f"Notification sent to {recipient}"
    
    @app.task(bind=True)
    def long_running_task(self, duration):
        """A task that reports its progress."""
        for i in range(duration):
            time.sleep(1)
            self.update_state(
                state='PROGRESS',
                meta={'current': i + 1, 'total': duration}
            )
        return {'current': duration, 'total': duration, 'status': 'Complete!'}
    Python

    Understanding the Task Flow

    sequenceDiagram
        participant Client as Client Code
        participant Broker as Redis Broker
        participant Worker as Celery Worker
        participant Backend as Result Backend
    
        Client->>Broker: Send task message
        Note over Client: Returns AsyncResult immediately
    
        Worker->>Broker: Poll for tasks
        Broker->>Worker: Deliver task
    
        Worker->>Worker: Execute task
        Worker->>Backend: Store result
    
        Client->>Backend: Check result
        Backend->>Client: Return result

    Running Workers

    Start a Celery worker to process tasks:

    # Basic worker
    celery -A app.celery worker --loglevel=info
    
    # Worker with specific queues
    celery -A app.celery worker --loglevel=info -Q email,images
    
    # Worker with concurrency control
    celery -A app.celery worker --loglevel=info --concurrency=4
    
    # Development worker with auto-reload
    celery -A app.celery worker --loglevel=info --reload
    Bash

    Worker Output Explanation

     -------------- celery@hostname v5.3.4 (emerald-rush)
    --- ***** ----- 
    -- ******* ---- Linux-5.15.0 2023-10-01 10:00:00
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         app:0x7f8b8c0a1234
    - ** ---------- .> transport:   redis://localhost:6379/0
    - ** ---------- .> results:     redis://localhost:6379/0
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    [tasks]
      . app.tasks.add_numbers
      . app.tasks.send_notification
      . app.tasks.long_running_task
    
    [2023-10-01 10:00:00,000: INFO/MainProcess] Connected to redis://localhost:6379/0
    [2023-10-01 10:00:00,000: INFO/MainProcess] mingle: searching for neighbors
    [2023-10-01 10:00:00,000: INFO/MainProcess] mingle: all alone
    [2023-10-01 10:00:00,000: INFO/MainProcess] celery@hostname ready.
    ANSI

    Calling Tasks

    There are several ways to call Celery tasks:

    1. Basic Task Calling

    # client.py
    from app.tasks import add_numbers, send_notification
    
    # Asynchronous call (non-blocking)
    result = add_numbers.delay(4, 4)
    print(f"Task ID: {result.id}")
    print(f"Task State: {result.state}")
    
    # Get result (blocking)
    answer = result.get(timeout=10)
    print(f"Result: {answer}")
    
    # Non-blocking result check
    if result.ready():
        print(f"Task completed: {result.result}")
    else:
        print("Task still running...")
    Python

    2. Using apply_async for More Control

    from app.tasks import send_notification
    from datetime import datetime, timedelta
    
    # Schedule task for later
    eta = datetime.now() + timedelta(minutes=5)
    result = send_notification.apply_async(
        args=['Hello!', 'user@example.com'],
        eta=eta,
        retry=True,
        retry_policy={
            'max_retries': 3,
            'interval_start': 0,
            'interval_step': 0.2,
            'interval_max': 0.2,
        }
    )
    Python

    3. Monitoring Task Progress

    from app.tasks import long_running_task
    import time
    
    # Start a long-running task
    result = long_running_task.delay(10)
    
    # Monitor progress
    while not result.ready():
        if result.state == 'PROGRESS':
            meta = result.info
            print(f"Progress: {meta['current']}/{meta['total']}")
        time.sleep(1)
    
    print(f"Final result: {result.result}")
    Python

    Task States and Results

    stateDiagram-v2
        [*] --> PENDING
        PENDING --> STARTED : Worker picks up task
        STARTED --> PROGRESS : Task reports progress
        PROGRESS --> PROGRESS : Multiple progress updates
        STARTED --> SUCCESS : Task completes successfully
        PROGRESS --> SUCCESS : Task completes successfully
        STARTED --> FAILURE : Task raises exception
        PROGRESS --> FAILURE : Task raises exception
        STARTED --> RETRY : Task requests retry
        PROGRESS --> RETRY : Task requests retry
        RETRY --> STARTED : Retry attempt
        SUCCESS --> [*]
        FAILURE --> [*]
    
        SUCCESS : Result available
        FAILURE : Exception info available
        PROGRESS : Meta info available

    Task State Examples

    from app.tasks import add_numbers
    
    result = add_numbers.delay(4, 4)
    
    # Check various states
    print(f"State: {result.state}")
    print(f"Info: {result.info}")
    
    # State-specific information
    if result.state == 'PENDING':
        print("Task is waiting to be processed")
    elif result.state == 'PROGRESS':
        print(f"Progress: {result.info}")
    elif result.state == 'SUCCESS':
        print(f"Result: {result.result}")
    elif result.state == 'FAILURE':
        print(f"Error: {result.info}")
    Python

    Error Handling in Tasks

    # app/tasks.py
    from celery.exceptions import Retry
    import random
    
    @app.task(bind=True)
    def unreliable_task(self):
        """A task that sometimes fails."""
        if random.random() < 0.5:
            raise Exception("Random failure occurred")
        return "Task completed successfully"
    
    @app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
    def auto_retry_task(self):
        """A task with automatic retry on exceptions."""
        if random.random() < 0.7:
            raise Exception("Task failed, will retry automatically")
        return "Success after retries"
    
    @app.task(bind=True)
    def manual_retry_task(self, data):
        """A task with manual retry logic."""
        try:
            # Simulate processing
            if random.random() < 0.5:
                raise Exception("Processing failed")
            return f"Processed: {data}"
        except Exception as exc:
            # Retry with exponential backoff
            raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    Python

    Complete Example Application

    Here’s a complete working example:

    # complete_example.py
    from celery import Celery
    import time
    import random
    
    # Create Celery app
    app = Celery('complete_example')
    app.conf.update(
        broker_url='redis://localhost:6379/0',
        result_backend='redis://localhost:6379/0',
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
    )
    
    @app.task
    def process_order(order_id, items):
        """Process an e-commerce order."""
        print(f"Processing order {order_id} with {len(items)} items")
    
        # Simulate processing time
        time.sleep(2)
    
        # Calculate total
        total = sum(item['price'] * item['quantity'] for item in items)
    
        # Simulate occasional failure
        if random.random() < 0.1:
            raise Exception(f"Payment failed for order {order_id}")
    
        return {
            'order_id': order_id,
            'total': total,
            'status': 'completed',
            'processed_at': time.time()
        }
    
    @app.task
    def send_confirmation_email(order_data):
        """Send order confirmation email."""
        print(f"Sending confirmation for order {order_data['order_id']}")
        time.sleep(1)  # Simulate email sending
        return f"Email sent for order {order_data['order_id']}"
    
    # Usage example
    if __name__ == '__main__':
        # Simulate order processing
        order_items = [
            {'name': 'Widget A', 'price': 10.00, 'quantity': 2},
            {'name': 'Widget B', 'price': 15.00, 'quantity': 1},
        ]
    
        # Process order asynchronously
        order_result = process_order.delay('ORD-001', order_items)
    
        print(f"Order processing started: {order_result.id}")
    
        # Wait for result
        try:
            order_data = order_result.get(timeout=10)
            print(f"Order processed: {order_data}")
    
            # Send confirmation email
            email_result = send_confirmation_email.delay(order_data)
            email_response = email_result.get(timeout=5)
            print(f"Email result: {email_response}")
    
        except Exception as e:
            print(f"Order processing failed: {e}")
    Python

    To run this example:

    1. Start Redis: redis-server
    2. Start worker: celery -A complete_example worker --loglevel=info
    3. Run client: python complete_example.py

    This completes the foundation chapters of our Celery book. The progression shows how to move from basic concepts to practical applications with proper error handling and monitoring.


    Chapter 4: Basic Celery Concepts

    Tasks and Task Objects

    Understanding Celery Tasks

    A Celery task is a Python function decorated with @app.task. When called, it becomes a task object that can be executed asynchronously.

    classDiagram
        class Task {
            +name: str
            +request: Context
            +retry(): void
            +apply_async(): AsyncResult
            +delay(): AsyncResult
            +si(): Signature
            +s(): Signature
        }
    
        class AsyncResult {
            +id: str
            +state: str
            +result: Any
            +info: dict
            +ready(): bool
            +get(): Any
            +forget(): void
        }
    
        class Signature {
            +task: str
            +args: tuple
            +kwargs: dict
            +apply_async(): AsyncResult
            +clone(): Signature
        }
    
        Task --> AsyncResult : creates
        Task --> Signature : creates

    Task Types and Decorators

    # app/task_types.py
    from celery import Task
    from .celery import app
    import time
    
    # Basic task
    @app.task
    def simple_task(name):
        return f"Hello, {name}!"
    
    # Task with custom name
    @app.task(name='custom.task.name')
    def named_task():
        return "Task with custom name"
    
    # Bound task (has access to self)
    @app.task(bind=True)
    def bound_task(self, x, y):
        print(f"Task ID: {self.request.id}")
        print(f"Task name: {self.name}")
        return x + y
    
    # Task with custom base class
    class CallbackTask(Task):
        """Custom task class with callbacks."""
    
        def on_success(self, retval, task_id, args, kwargs):
            print(f"Task {task_id} succeeded: {retval}")
    
        def on_failure(self, exc, task_id, args, kwargs, einfo):
            print(f"Task {task_id} failed: {exc}")
    
    @app.task(base=CallbackTask)
    def callback_task(value):
        if value < 0:
            raise ValueError("Value must be positive")
        return value * 2
    
    # Task with multiple decorators
    @app.task(
        bind=True,
        autoretry_for=(Exception,),
        retry_kwargs={'max_retries': 3, 'countdown': 60},
        retry_backoff=True,
        retry_jitter=True
    )
    def robust_task(self, data):
        # Simulate unreliable operation
        import random
        if random.random() < 0.3:
            raise Exception("Random failure")
        return f"Processed: {data}"
    Python

    Task Context and Request Object

    # app/context_tasks.py
    @app.task(bind=True)
    def inspect_context(self):
        """Task that inspects its execution context."""
        request = self.request
    
        context_info = {
            'task_id': request.id,
            'task_name': self.name,
            'args': request.args,
            'kwargs': request.kwargs,
            'retries': request.retries,
            'is_eager': request.is_eager,
            'eta': request.eta,
            'expires': request.expires,
            'hostname': request.hostname,
            'delivery_info': request.delivery_info,
        }
    
        return context_info
    
    # Usage
    result = inspect_context.delay()
    print(result.get())
    Python

    Workers and Concurrency

    Worker Architecture

    graph TB
        subgraph "Celery Worker Process"
            A[Main Process] --> B[Pool Process]
            A --> C[Consumer Thread]
            A --> D[Event Loop]
    
            B --> E[Worker 1]
            B --> F[Worker 2]
            B --> G[Worker N]
    
            C --> H[Message Queue]
            D --> I[Periodic Tasks]
            D --> J[Monitoring]
        end
    
        subgraph "Concurrency Models"
            K[Prefork - multiprocessing]
            L[Eventlet - green threads]
            M[Gevent - green threads]
            N[Threads - threading]
            O[Solo - single process]
        end
    
        B -.-> K
        E -.-> L
        F -.-> M
        G -.-> N
        A -.-> O
    
        style A fill:#e3f2fd
        style B fill:#e8f5e8
        style C fill:#fff3e0
        style D fill:#fce4ec

    Concurrency Configuration

    # worker_config.py
    from celery import Celery
    
    app = Celery('worker_demo')
    
    # Prefork (default) - CPU-bound tasks
    # Good for: CPU-intensive tasks, fault isolation
    app.conf.update(
        worker_concurrency=4,
        worker_pool='prefork',
        worker_prefetch_multiplier=1,
    )
    
    # Eventlet - I/O-bound tasks
    # Good for: High concurrency I/O operations
    app.conf.update(
        worker_concurrency=1000,
        worker_pool='eventlet',
        worker_prefetch_multiplier=1,
    )
    
    # Gevent - I/O-bound tasks
    # Good for: Network operations, database queries
    app.conf.update(
        worker_concurrency=1000,
        worker_pool='gevent',
        worker_prefetch_multiplier=1,
    )
    
    # Threads - Balanced approach
    # Good for: Mixed workloads
    app.conf.update(
        worker_concurrency=10,
        worker_pool='threads',
        worker_prefetch_multiplier=1,
    )
    Python

    Worker Pool Comparison

    Pool TypeBest ForProsCons
    PreforkCPU-bound tasksFault isolation, True parallelismMemory overhead, Slow startup
    EventletI/O-bound tasksHigh concurrency, Low memoryNo true parallelism, Library compatibility
    GeventNetwork operationsGood I/O performanceNo true parallelism, C extension issues
    ThreadsMixed workloadsBalanced approachGIL limitations, Shared state issues
    SoloDevelopment/TestingSimple debuggingNo concurrency

    Starting Workers with Different Configurations

    # Default prefork worker
    celery -A app worker --loglevel=info
    
    # High concurrency with eventlet
    celery -A app worker --pool=eventlet --concurrency=1000
    
    # Gevent worker for I/O operations
    celery -A app worker --pool=gevent --concurrency=500
    
    # Thread-based worker
    celery -A app worker --pool=threads --concurrency=10
    
    # Single process for debugging
    celery -A app worker --pool=solo
    
    # Multiple workers with different pools
    celery -A app worker --pool=prefork --concurrency=4 -n worker1@%h &
    celery -A app worker --pool=eventlet --concurrency=1000 -n worker2@%h &
    Bash

    Brokers and Backends

    Message Broker Deep Dive

    graph TB
        subgraph "Producer Side"
            A[Application] --> B[Celery Client]
            B --> C[Serializer]
            C --> D[Message]
        end
    
        subgraph "Broker"
            E[Queue Manager]
            F[Routing Logic]
            G[Persistence Layer]
            H[Exchange/Topics]
        end
    
        subgraph "Consumer Side"
            I[Worker]
            J[Deserializer]
            K[Task Execution]
            L[Result Storage]
        end
    
        D --> E
        E --> F
        F --> G
        G --> H
        H --> I
        I --> J
        J --> K
        K --> L
    
        style E fill:#fff3e0
        style F fill:#e8f5e8
        style G fill:#fce4ec
        style H fill:#e3f2fd

    Broker Configuration Comparison

    # Redis Configuration
    redis_config = {
        'broker_url': 'redis://localhost:6379/0',
        'result_backend': 'redis://localhost:6379/0',
    
        # Redis-specific settings
        'redis_max_connections': 20,
        'redis_retry_on_timeout': True,
        'redis_socket_connect_timeout': 30,
        'redis_socket_timeout': 30,
    
        # Connection pool settings
        'broker_pool_limit': 10,
        'broker_connection_timeout': 4,
        'broker_connection_retry_on_startup': True,
    }
    
    # RabbitMQ Configuration
    rabbitmq_config = {
        'broker_url': 'pyamqp://guest@localhost//',
        'result_backend': 'rpc://',
    
        # RabbitMQ-specific settings
        'broker_heartbeat': 30,
        'broker_connection_timeout': 30,
        'broker_connection_retry_on_startup': True,
        'broker_connection_max_retries': 3,
    
        # Queue settings
        'task_queue_max_priority': 10,
        'worker_prefetch_multiplier': 1,
        'task_acks_late': True,
    }
    
    # Amazon SQS Configuration
    sqs_config = {
        'broker_url': 'sqs://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY@',
        'broker_transport_options': {
            'region': 'us-east-1',
            'visibility_timeout': 60,
            'polling_interval': 1,
        },
        'task_default_queue': 'celery',
        'task_queues': {
            'celery': {
                'url': 'https://sqs.us-east-1.amazonaws.com/123456789012/celery'
            }
        }
    }
    Bash

    Result Backend Options

    # Redis Result Backend
    redis_result_config = {
        'result_backend': 'redis://localhost:6379/0',
        'result_backend_transport_options': {
            'master_name': 'mymaster',
            'retry_on_timeout': True,
        },
        'result_expires': 3600,  # 1 hour
        'result_persistent': True,
    }
    
    # Database Result Backend
    database_result_config = {
        'result_backend': 'db+postgresql://user:pass@localhost/celery',
        'result_backend_transport_options': {
            'echo': True,  # SQL query logging
        },
        'database_table_names': {
            'task': 'custom_task_table',
            'group': 'custom_group_table',
        }
    }
    
    # File System Result Backend
    filesystem_result_config = {
        'result_backend': 'file:///var/celery/results',
        'result_expires': 3600,
    }
    
    # Cache Result Backend (Memcached)
    cache_result_config = {
        'result_backend': 'cache+memcached://127.0.0.1:11211/',
        'cache_backend_options': {
            'binary': True,
            'behaviors': {'tcp_nodelay': True}
        }
    }
    Bash

    Serialization Basics

    Serialization Formats

    graph TD
        subgraph "Serialization Formats"
            A[JSON - Default]
            B[Pickle - Python Objects]
            C[YAML - Human Readable]
            D[MessagePack - Binary]
            E[XML - Structured]
        end
    
        subgraph "Characteristics"
            F[Fast, Secure, Limited Types]
            G[Full Python Support, Security Risk]
            H[Readable, Slow]
            I[Compact, Fast]
            J[Verbose, Structured]
        end
    
        A --> F
        B --> G
        C --> H
        D --> I
        E --> J
    
        style A fill:#e8f5e8
        style B fill:#ffebee
        style C fill:#e3f2fd
        style D fill:#fff3e0
        style E fill:#fce4ec

    Configuring Serialization

    # serialization_config.py
    from kombu.serialization import register
    import json
    
    # Basic serialization configuration
    app.conf.update(
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
    
        # Enable compression
        task_compression='gzip',
        result_compression='gzip',
    )
    
    # Custom JSON serializer with datetime support
    from datetime import datetime
    import json
    
    class DateTimeEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime):
                return obj.isoformat()
            return super().default(obj)
    
    def datetime_decoder(dct):
        for key, value in dct.items():
            if isinstance(value, str):
                try:
                    dct[key] = datetime.fromisoformat(value)
                except ValueError:
                    pass
        return dct
    
    def dumps(obj):
        return json.dumps(obj, cls=DateTimeEncoder)
    
    def loads(data):
        return json.loads(data, object_hook=datetime_decoder)
    
    # Register custom serializer
    register('datetime_json', dumps, loads,
             content_type='application/x-datetime-json',
             content_encoding='utf-8')
    
    # Use custom serializer
    app.conf.update(
        task_serializer='datetime_json',
        accept_content=['datetime_json'],
        result_serializer='datetime_json',
    )
    Python

    Serialization Security

    # security_config.py
    # NEVER use pickle in production with untrusted data
    INSECURE_CONFIG = {
        'task_serializer': 'pickle',
        'accept_content': ['pickle'],
        'result_serializer': 'pickle',
    }
    
    # Secure configuration
    SECURE_CONFIG = {
        'task_serializer': 'json',
        'accept_content': ['json'],
        'result_serializer': 'json',
    
        # Additional security
        'worker_hijack_root_logger': False,
        'worker_log_color': False,
        'task_reject_on_worker_lost': True,
    }
    
    # Content type validation
    app.conf.update(
        accept_content=['json'],
        task_serializer='json',
        result_serializer='json',
    
        # Reject unknown content types
        task_always_eager=False,
        task_eager_propagates=True,
    )
    Python

    Handling Complex Data Types

    # complex_serialization.py
    from decimal import Decimal
    from datetime import datetime, date
    import uuid
    
    # Custom serialization for complex types
    class ComplexTypeTask:
    
        @staticmethod
        def serialize_complex_data(data):
            """Serialize complex Python objects to JSON-compatible format."""
            if isinstance(data, Decimal):
                return {'__type__': 'Decimal', 'value': str(data)}
            elif isinstance(data, datetime):
                return {'__type__': 'datetime', 'value': data.isoformat()}
            elif isinstance(data, date):
                return {'__type__': 'date', 'value': data.isoformat()}
            elif isinstance(data, uuid.UUID):
                return {'__type__': 'UUID', 'value': str(data)}
            elif isinstance(data, set):
                return {'__type__': 'set', 'value': list(data)}
            return data
    
        @staticmethod
        def deserialize_complex_data(data):
            """Deserialize JSON data back to Python objects."""
            if isinstance(data, dict) and '__type__' in data:
                type_name = data['__type__']
                value = data['value']
    
                if type_name == 'Decimal':
                    return Decimal(value)
                elif type_name == 'datetime':
                    return datetime.fromisoformat(value)
                elif type_name == 'date':
                    return date.fromisoformat(value)
                elif type_name == 'UUID':
                    return uuid.UUID(value)
                elif type_name == 'set':
                    return set(value)
            return data
    
    @app.task
    def process_complex_data(data):
        """Task that handles complex data types."""
        # Deserialize incoming data
        processed_data = ComplexTypeTask.deserialize_complex_data(data)
    
        # Process the data
        if isinstance(processed_data, Decimal):
            result = processed_data * 2
        elif isinstance(processed_data, datetime):
            result = processed_data.strftime('%Y-%m-%d %H:%M:%S')
        else:
            result = f"Processed: {processed_data}"
    
        # Return serializable result
        return ComplexTypeTask.serialize_complex_data(result)
    
    # Usage example
    complex_data = {
        'price': Decimal('19.99'),
        'created_at': datetime.now(),
        'tags': {'python', 'celery', 'async'},
        'id': uuid.uuid4(),
    }
    
    # Serialize before sending
    serialized_data = ComplexTypeTask.serialize_complex_data(complex_data)
    result = process_complex_data.delay(serialized_data)
    Python

    Chapter 5: Task Routing and Organization

    Task Discovery

    Celery automatically discovers tasks in your application through several mechanisms:

    graph TB
        subgraph "Task Discovery Methods"
            A[Explicit Include] --> B[app.conf.include]
            C[Auto Discovery] --> D[app.autodiscover_tasks]
            E[Manual Import] --> F[Import modules]
            G[Package Discovery] --> H[Scan packages]
        end
    
        subgraph "Discovery Flow"
            I[Celery App Start] --> J[Load Configuration]
            J --> K[Run Discovery]
            K --> L[Register Tasks]
            L --> M[Workers Ready]
        end
    
        A --> I
        C --> I
        E --> I
        G --> I
    
        style A fill:#e8f5e8
        style C fill:#e3f2fd
        style E fill:#fff3e0
        style G fill:#fce4ec

    Automatic Task Discovery

    # app/__init__.py
    from celery import Celery
    
    app = Celery('myapp')
    
    # Method 1: Explicit include
    app.conf.include = [
        'app.tasks.email',
        'app.tasks.image_processing',
        'app.tasks.reports',
    ]
    
    # Method 2: Auto-discovery from installed apps (Django-style)
    app.autodiscover_tasks(['app.tasks', 'app.background_jobs'])
    
    # Method 3: Auto-discovery with lambda
    app.autodiscover_tasks(lambda: ['app.tasks'])
    
    # Method 4: Scan all Python packages
    import pkgutil
    def find_task_modules():
        """Find all modules containing tasks."""
        task_modules = []
        for importer, modname, ispkg in pkgutil.walk_packages(['app']):
            if 'tasks' in modname:
                task_modules.append(f'app.{modname}')
        return task_modules
    
    app.autodiscover_tasks(find_task_modules)
    Python

    Organizing Tasks by Module

    app/
    ├── __init__.py
    ├── celery.py
    ├── tasks/
       ├── __init__.py
       ├── email.py          # Email-related tasks
       ├── image.py          # Image processing tasks
       ├── reports.py        # Report generation tasks
       ├── cleanup.py        # Maintenance tasks
       └── integrations/
           ├── __init__.py
           ├── social.py     # Social media tasks
           └── payments.py   # Payment processing tasks
    Bash
    # app/tasks/email.py
    from ..celery import app
    
    @app.task
    def send_welcome_email(user_id):
        """Send welcome email to new user."""
        pass
    
    @app.task
    def send_newsletter(subscriber_list):
        """Send newsletter to subscribers."""
        pass
    
    @app.task
    def send_password_reset(email, token):
        """Send password reset email."""
        pass
    
    # app/tasks/image.py
    from ..celery import app
    
    @app.task
    def resize_image(image_path, width, height):
        """Resize image to specified dimensions."""
        pass
    
    @app.task
    def generate_thumbnail(image_path):
        """Generate thumbnail for image."""
        pass
    
    @app.task
    def process_image_upload(upload_data):
        """Process uploaded image."""
        pass
    Python

    Routing Tasks to Specific Workers

    Queue-Based Routing

    graph TB
        subgraph "Client Application"
            A[Email Task] --> B[email queue]
            C[Image Task] --> D[image queue]
            E[Report Task] --> F[report queue]
            G[Default Task] --> H[default queue]
        end
    
        subgraph "Workers"
            I[Email Worker] --> B
            J[Image Worker] --> D
            K[Report Worker] --> F
            L[General Worker] --> H
            L --> B
            L --> D
        end
    
        style A fill:#e8f5e8
        style C fill:#e3f2fd
        style E fill:#fff3e0
        style G fill:#fce4ec
        style I fill:#e8f5e8
        style J fill:#e3f2fd
        style K fill:#fff3e0
        style L fill:#fce4ec

    Configuration-Based Routing

    # routing_config.py
    from kombu import Queue, Exchange
    
    # Define exchanges
    default_exchange = Exchange('default', type='direct')
    email_exchange = Exchange('email', type='direct')
    image_exchange = Exchange('image', type='direct')
    
    # Configure routing
    app.conf.update(
        task_routes={
            # Route by task name
            'app.tasks.email.send_welcome_email': {'queue': 'email'},
            'app.tasks.email.*': {'queue': 'email'},
    
            # Route by pattern
            'app.tasks.image.*': {
                'queue': 'image',
                'routing_key': 'image.processing',
            },
    
            # Route with custom exchange
            'app.tasks.reports.*': {
                'queue': 'reports',
                'exchange': 'reports',
                'routing_key': 'reports.generate',
            },
    
            # Complex routing logic
            'app.tasks.priority.*': {
                'queue': 'priority',
                'priority': 9,
            },
        },
    
        # Define queues
        task_queues=(
            Queue('default', default_exchange, routing_key='default'),
            Queue('email', email_exchange, routing_key='email'),
            Queue('image', image_exchange, routing_key='image'),
            Queue('reports', routing_key='reports'),
            Queue('priority', routing_key='priority'),
        ),
    
        # Default queue
        task_default_queue='default',
        task_default_exchange='default',
        task_default_routing_key='default',
    )
    Python

    Dynamic Routing with Custom Router

    # custom_router.py
    from celery.routes import MapRoute
    
    class CustomRouter:
        """Custom task router with advanced logic."""
    
        def route_for_task(self, task, args=None, kwargs=None):
            """Route tasks based on custom logic."""
    
            # Route based on task arguments
            if task == 'app.tasks.process_user_data':
                user_id = args[0] if args else kwargs.get('user_id')
                if user_id:
                    # Route premium users to faster queue
                    if self.is_premium_user(user_id):
                        return {'queue': 'premium'}
                    else:
                        return {'queue': 'standard'}
    
            # Route based on task load
            if task.startswith('app.tasks.heavy_'):
                return {
                    'queue': 'heavy_processing',
                    'routing_key': 'heavy.processing',
                    'priority': 5,
                }
    
            # Route based on time of day
            from datetime import datetime
            current_hour = datetime.now().hour
            if 9 <= current_hour <= 17:  # Business hours
                if task.startswith('app.tasks.report'):
                    return {'queue': 'business_hours'}
    
            # Default routing
            return None
    
        def is_premium_user(self, user_id):
            """Check if user is premium (mock implementation)."""
            # In real implementation, check database/cache
            return user_id % 10 == 0  # Every 10th user is premium
    
    # Configure custom router
    app.conf.task_routes = (CustomRouter(),)
    Python

    Priority-Based Routing

    # priority_routing.py
    from kombu import Queue
    
    # Configure priority queues
    app.conf.update(
        task_routes={
            'app.tasks.critical.*': {'queue': 'critical', 'priority': 9},
            'app.tasks.high.*': {'queue': 'high', 'priority': 7},
            'app.tasks.normal.*': {'queue': 'normal', 'priority': 5},
            'app.tasks.low.*': {'queue': 'low', 'priority': 3},
            'app.tasks.background.*': {'queue': 'background', 'priority': 1},
        },
    
        task_queues=(
            Queue('critical', routing_key='critical', queue_arguments={'x-max-priority': 10}),
            Queue('high', routing_key='high', queue_arguments={'x-max-priority': 10}),
            Queue('normal', routing_key='normal', queue_arguments={'x-max-priority': 10}),
            Queue('low', routing_key='low', queue_arguments={'x-max-priority': 10}),
            Queue('background', routing_key='background', queue_arguments={'x-max-priority': 10}),
        ),
    
        # Enable priority support
        task_queue_max_priority=10,
        worker_prefetch_multiplier=1,  # Important for priority to work
    )
    
    # Usage with explicit priority
    @app.task
    def urgent_notification(message):
        """Send urgent notification."""
        pass
    
    # Call with priority
    urgent_notification.apply_async(
        args=['Critical system alert!'],
        priority=9,
        queue='critical'
    )
    Python

    Task Naming and Organization

    Naming Conventions

    # naming_conventions.py
    
    # Bad naming - unclear and inconsistent
    @app.task
    def func1(x):
        pass
    
    @app.task
    def send_mail(email):
        pass
    
    @app.task
    def img_proc(path):
        pass
    
    # Good naming - clear and consistent
    @app.task(name='email.send.welcome')
    def send_welcome_email(user_id):
        """Send welcome email to new user."""
        pass
    
    @app.task(name='email.send.newsletter')
    def send_newsletter_email(subscriber_ids):
        """Send newsletter to multiple subscribers."""
        pass
    
    @app.task(name='image.process.resize')
    def resize_image(image_path, dimensions):
        """Resize image to specified dimensions."""
        pass
    
    @app.task(name='image.process.thumbnail')
    def generate_image_thumbnail(image_path):
        """Generate thumbnail for uploaded image."""
        pass
    
    @app.task(name='report.generate.sales')
    def generate_sales_report(start_date, end_date):
        """Generate sales report for date range."""
        pass
    
    @app.task(name='maintenance.cleanup.temp_files')
    def cleanup_temporary_files():
        """Clean up temporary files older than 24 hours."""
        pass
    Python

    Namespace Organization

    # namespace_organization.py
    from celery import Celery
    
    app = Celery('enterprise_app')
    
    # Organize tasks by business domain
    class TaskNamespaces:
        """Centralized task name definitions."""
    
        # User management tasks
        class User:
            REGISTER = 'user.register'
            ACTIVATE = 'user.activate'
            DEACTIVATE = 'user.deactivate'
            SEND_WELCOME = 'user.email.welcome'
            SEND_VERIFICATION = 'user.email.verification'
    
        # Order processing tasks
        class Order:
            CREATE = 'order.create'
            PROCESS_PAYMENT = 'order.payment.process'
            SEND_CONFIRMATION = 'order.email.confirmation'
            UPDATE_INVENTORY = 'order.inventory.update'
            FULFILL = 'order.fulfillment.process'
    
        # Content management tasks
        class Content:
            UPLOAD_IMAGE = 'content.image.upload'
            RESIZE_IMAGE = 'content.image.resize'
            GENERATE_THUMBNAIL = 'content.image.thumbnail'
            PROCESS_VIDEO = 'content.video.process'
            EXTRACT_METADATA = 'content.metadata.extract'
    
        # Analytics and reporting
        class Analytics:
            TRACK_EVENT = 'analytics.event.track'
            GENERATE_REPORT = 'analytics.report.generate'
            UPDATE_METRICS = 'analytics.metrics.update'
            PROCESS_BATCH = 'analytics.batch.process'
    
    # Task implementations
    @app.task(name=TaskNamespaces.User.REGISTER)
    def register_user(user_data):
        """Register a new user account."""
        pass
    
    @app.task(name=TaskNamespaces.Order.PROCESS_PAYMENT)
    def process_order_payment(order_id, payment_data):
        """Process payment for an order."""
        pass
    
    @app.task(name=TaskNamespaces.Content.RESIZE_IMAGE)
    def resize_content_image(image_id, width, height):
        """Resize uploaded image content."""
        pass
    Python

    Multiple Queues

    Queue Architecture Design

    graph TB
        subgraph "Application Layer"
            A[Web App] --> B[API Layer]
            C[Admin Panel] --> B
            D[Scheduled Jobs] --> B
        end
    
        subgraph "Queue Layer"
            E[email Queue]
            F[image Queue]
            G[reports Queue]
            H[background Queue]
            I[priority Queue]
            J[dlq Queue]
        end
    
        subgraph "Worker Layer"
            K[Email Workers]
            L[Image Workers]
            M[Report Workers]
            N[Background Workers]
            O[Priority Workers]
        end
    
        B --> E
        B --> F
        B --> G
        B --> H
        B --> I
    
        E --> K
        F --> L
        G --> M
        H --> N
        I --> O
    
        E -.-> J
        F -.-> J
        G -.-> J
    
        style E fill:#e8f5e8
        style F fill:#e3f2fd
        style G fill:#fff3e0
        style H fill:#fce4ec
        style I fill:#ffebee
        style J fill:#f3e5f5

    Advanced Queue Configuration

    # advanced_queues.py
    from kombu import Queue, Exchange
    from datetime import timedelta
    
    # Define exchanges for different message types
    email_exchange = Exchange('email', type='direct')
    image_exchange = Exchange('image', type='topic')
    reports_exchange = Exchange('reports', type='fanout')
    priority_exchange = Exchange('priority', type='direct')
    
    app.conf.update(
        # Queue definitions with advanced options
        task_queues=[
            # Email processing queue
            Queue(
                'email',
                email_exchange,
                routing_key='email',
                queue_arguments={
                    'x-message-ttl': 3600000,  # 1 hour TTL
                    'x-max-length': 10000,     # Max 10k messages
                    'x-overflow': 'reject-publish',
                }
            ),
    
            # Image processing with different priorities
            Queue(
                'image.high',
                image_exchange,
                routing_key='image.high',
                queue_arguments={
                    'x-max-priority': 10,
                    'x-message-ttl': 1800000,  # 30 minutes TTL
                }
            ),
            Queue(
                'image.normal',
                image_exchange,
                routing_key='image.normal',
                queue_arguments={'x-max-priority': 5}
            ),
    
            # Reports with delayed processing
            Queue(
                'reports.immediate',
                reports_exchange,
                routing_key='reports.immediate'
            ),
            Queue(
                'reports.scheduled',
                reports_exchange,
                routing_key='reports.scheduled',
                queue_arguments={
                    'x-message-ttl': 86400000,  # 24 hours TTL
                }
            ),
    
            # Dead letter queue for failed tasks
            Queue(
                'failed',
                Exchange('failed', type='direct'),
                routing_key='failed',
                queue_arguments={
                    'x-message-ttl': 604800000,  # 7 days TTL
                }
            ),
    
            # Priority queue for critical tasks
            Queue(
                'critical',
                priority_exchange,
                routing_key='critical',
                queue_arguments={
                    'x-max-priority': 10,
                    'x-expires': 3600000,  # Queue expires if unused for 1 hour
                }
            ),
        ],
    
        # Routing configuration
        task_routes={
            # Email tasks
            'app.tasks.email.*': {
                'queue': 'email',
                'exchange': 'email',
                'routing_key': 'email',
            },
    
            # Image tasks with conditional routing
            'app.tasks.image.resize': {
                'queue': 'image.high',
                'exchange': 'image',
                'routing_key': 'image.high',
            },
            'app.tasks.image.thumbnail': {
                'queue': 'image.normal',
                'exchange': 'image',
                'routing_key': 'image.normal',
            },
    
            # Report tasks
            'app.tasks.reports.realtime': {
                'queue': 'reports.immediate',
                'exchange': 'reports',
            },
            'app.tasks.reports.batch': {
                'queue': 'reports.scheduled',
                'exchange': 'reports',
            },
    
            # Critical tasks
            'app.tasks.critical.*': {
                'queue': 'critical',
                'exchange': 'priority',
                'routing_key': 'critical',
                'priority': 9,
            },
        },
    
        # Default configurations
        task_default_queue='default',
        task_default_exchange='default',
        task_default_routing_key='default',
    
        # Queue-specific settings
        task_queue_max_priority=10,
        worker_prefetch_multiplier=1,
        task_acks_late=True,
    )
    Python

    Starting Specialized Workers

    #!/bin/bash
    # worker_startup.sh
    
    # Email processing workers (I/O bound)
    celery -A app worker -Q email -n email_worker@%h \
        --pool=eventlet --concurrency=100 --loglevel=info &
    
    # Image processing workers (CPU bound)
    celery -A app worker -Q image.high,image.normal -n image_worker@%h \
        --pool=prefork --concurrency=4 --loglevel=info &
    
    # Report generation workers (mixed workload)
    celery -A app worker -Q reports.immediate,reports.scheduled -n reports_worker@%h \
        --pool=threads --concurrency=8 --loglevel=info &
    
    # Critical task worker (dedicated)
    celery -A app worker -Q critical -n critical_worker@%h \
        --pool=prefork --concurrency=2 --loglevel=info &
    
    # Background task worker (low priority)
    celery -A app worker -Q background,cleanup -n background_worker@%h \
        --pool=prefork --concurrency=2 --loglevel=warning &
    
    # General purpose worker (handles multiple queues)
    celery -A app worker -Q default,email,image.normal -n general_worker@%h \
        --pool=prefork --concurrency=4 --loglevel=info &
    
    echo "All workers started"
    Bash

    Queue Monitoring and Management

    # queue_management.py
    from celery import Celery
    from kombu import Connection
    
    app = Celery('queue_manager')
    
    class QueueManager:
        """Utility class for queue management."""
    
        def __init__(self, broker_url):
            self.broker_url = broker_url
    
        def get_queue_info(self, queue_name):
            """Get information about a specific queue."""
            with Connection(self.broker_url) as conn:
                queue = conn.SimpleQueue(queue_name)
                try:
                    return {
                        'name': queue_name,
                        'size': queue.qsize(),
                        'consumer_count': getattr(queue, 'consumer_count', 0),
                    }
                finally:
                    queue.close()
    
        def purge_queue(self, queue_name):
            """Remove all messages from a queue."""
            with Connection(self.broker_url) as conn:
                queue = conn.SimpleQueue(queue_name)
                try:
                    purged = queue.clear()
                    return purged
                finally:
                    queue.close()
    
        def move_messages(self, source_queue, dest_queue, max_messages=None):
            """Move messages from one queue to another."""
            with Connection(self.broker_url) as conn:
                source = conn.SimpleQueue(source_queue)
                dest = conn.SimpleQueue(dest_queue)
                moved = 0
    
                try:
                    while True:
                        if max_messages and moved >= max_messages:
                            break
    
                        try:
                            message = source.get(block=False)
                            dest.put(message.payload)
                            message.ack()
                            moved += 1
                        except:
                            break  # No more messages
    
                    return moved
                finally:
                    source.close()
                    dest.close()
    
    # Usage
    manager = QueueManager('redis://localhost:6379/0')
    
    # Check queue status
    info = manager.get_queue_info('email')
    print(f"Email queue has {info['size']} messages")
    
    # Move failed messages back to processing queue
    moved = manager.move_messages('failed', 'email', max_messages=100)
    print(f"Moved {moved} messages from failed to email queue")
    Python

    This completes the intermediate topics chapters covering task routing, organization, and queue management with comprehensive examples and diagrams.


    Chapter 6: Configuration and Settings

    Celery Configuration Architecture

    graph TB
        subgraph "Configuration Sources"
            A[Environment Variables] --> E[Final Config]
            B[Configuration Files] --> E
            C[Command Line Args] --> E
            D[Code Configuration] --> E
        end
    
        subgraph "Configuration Categories"
            F[Broker Settings]
            G[Worker Settings]
            H[Task Settings]
            I[Result Backend]
            J[Security Settings]
            K[Monitoring Settings]
        end
    
        E --> F
        E --> G
        E --> H
        E --> I
        E --> J
        E --> K
    
        style A fill:#e8f5e8
        style B fill:#e3f2fd
        style C fill:#fff3e0
        style D fill:#fce4ec

    Comprehensive Configuration System

    # config/base.py
    """Base configuration for all environments."""
    import os
    from datetime import timedelta
    from kombu import Queue, Exchange
    
    class BaseConfig:
        """Base configuration class."""
    
        # Broker Configuration
        broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
        result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
    
        # Serialization
        task_serializer = 'json'
        accept_content = ['json']
        result_serializer = 'json'
        timezone = 'UTC'
        enable_utc = True
    
        # Task Configuration
        task_always_eager = False
        task_eager_propagates = True
        task_ignore_result = False
        task_store_eager_result = True
    
        # Worker Configuration
        worker_prefetch_multiplier = 1
        worker_max_tasks_per_child = 1000
        worker_disable_rate_limits = False
        worker_log_color = True
    
        # Security
        worker_hijack_root_logger = False
        accept_content = ['json']
        task_reject_on_worker_lost = True
    
        # Monitoring
        worker_send_task_events = True
        task_send_sent_event = True
    
        # Result Configuration
        result_expires = 3600  # 1 hour
        result_persistent = True
        result_compression = 'gzip'
    
        # Error Handling
        task_annotations = {
            '*': {
                'rate_limit': '100/s',
                'time_limit': 300,  # 5 minutes
                'soft_time_limit': 240,  # 4 minutes
            }
        }
    
    # config/development.py
    """Development environment configuration."""
    from .base import BaseConfig
    
    class DevelopmentConfig(BaseConfig):
        """Development configuration."""
    
        # Development-specific settings
        worker_log_level = 'DEBUG'
        worker_reload = True
        task_always_eager = False  # Enable for synchronous testing
    
        # Reduced limits for development
        worker_max_tasks_per_child = 100
        result_expires = 300  # 5 minutes
    
        # Development queues
        task_routes = {
            'app.tasks.test.*': {'queue': 'test'},
            'app.tasks.dev.*': {'queue': 'development'},
        }
    
        task_queues = (
            Queue('default'),
            Queue('test'),
            Queue('development'),
        )
    
    # config/production.py
    """Production environment configuration."""
    from .base import BaseConfig
    from kombu import Queue, Exchange
    
    class ProductionConfig(BaseConfig):
        """Production configuration."""
    
        # Production broker with connection pooling
        broker_url = os.environ.get('CELERY_BROKER_URL')
        broker_pool_limit = 10
        broker_connection_timeout = 30
        broker_connection_retry_on_startup = True
        broker_connection_max_retries = 3
    
        # Redis-specific production settings
        redis_max_connections = 20
        redis_retry_on_timeout = True
        redis_socket_connect_timeout = 30
        redis_socket_timeout = 30
    
        # Worker optimizations
        worker_prefetch_multiplier = 4
        worker_max_tasks_per_child = 10000
        worker_log_level = 'INFO'
        worker_proc_alive_timeout = 60
    
        # Task timeouts and limits
        task_time_limit = 1800  # 30 minutes
        task_soft_time_limit = 1500  # 25 minutes
    
        # Result backend optimization
        result_backend_transport_options = {
            'retry_on_timeout': True,
            'retry_policy': {
                'timeout': 5.0
            }
        }
    
        # Production queues with exchanges
        default_exchange = Exchange('default', type='direct')
        email_exchange = Exchange('email', type='direct')
        image_exchange = Exchange('image', type='topic')
        reports_exchange = Exchange('reports', type='fanout')
    
        task_queues = (
            Queue('default', default_exchange, routing_key='default'),
            Queue('email', email_exchange, routing_key='email'),
            Queue('image_high', image_exchange, routing_key='image.high'),
            Queue('image_normal', image_exchange, routing_key='image.normal'),
            Queue('reports', reports_exchange),
            Queue('background', routing_key='background'),
        )
    
        # Complex routing
        task_routes = {
            'app.tasks.email.*': {
                'queue': 'email',
                'exchange': 'email',
                'routing_key': 'email',
            },
            'app.tasks.image.priority.*': {
                'queue': 'image_high',
                'exchange': 'image',
                'routing_key': 'image.high',
                'priority': 8,
            },
            'app.tasks.image.*': {
                'queue': 'image_normal',
                'exchange': 'image',
                'routing_key': 'image.normal',
            },
            'app.tasks.reports.*': {
                'queue': 'reports',
                'exchange': 'reports',
            },
        }
    
        # Security settings
        worker_hijack_root_logger = False
        task_reject_on_worker_lost = True
    
        # Monitoring
        worker_send_task_events = True
        task_send_sent_event = True
    
        # Performance annotations
        task_annotations = {
            'app.tasks.email.*': {
                'rate_limit': '200/s',
                'time_limit': 300,
            },
            'app.tasks.image.*': {
                'rate_limit': '50/s',
                'time_limit': 1800,
                'soft_time_limit': 1500,
            },
            'app.tasks.reports.*': {
                'rate_limit': '10/s',
                'time_limit': 3600,
                'soft_time_limit': 3300,
            },
        }
    
    # config/__init__.py
    """Configuration module."""
    import os
    
    config_mapping = {
        'development': 'config.development.DevelopmentConfig',
        'production': 'config.production.ProductionConfig',
        'testing': 'config.testing.TestingConfig',
    }
    
    def get_config():
        """Get configuration based on environment."""
        env = os.environ.get('CELERY_ENV', 'development')
        config_class = config_mapping.get(env)
    
        if not config_class:
            raise ValueError(f"Unknown environment: {env}")
    
        module_name, class_name = config_class.rsplit('.', 1)
        module = __import__(module_name, fromlist=[class_name])
        return getattr(module, class_name)()
    Python

    Advanced Worker Configuration

    # worker_config.py
    """Advanced worker configuration and optimization."""
    import multiprocessing
    import os
    
    class WorkerConfig:
        """Advanced worker configuration."""
    
        @staticmethod
        def get_optimal_concurrency():
            """Calculate optimal worker concurrency."""
            cpu_count = multiprocessing.cpu_count()
    
            # For CPU-bound tasks
            cpu_bound_concurrency = cpu_count
    
            # For I/O-bound tasks
            io_bound_concurrency = cpu_count * 4
    
            # For mixed workloads
            mixed_concurrency = cpu_count * 2
    
            return {
                'cpu_bound': cpu_bound_concurrency,
                'io_bound': io_bound_concurrency,
                'mixed': mixed_concurrency,
            }
    
        @staticmethod
        def get_memory_optimized_config():
            """Get memory-optimized configuration."""
            return {
                'worker_max_memory_per_child': 200000,  # 200MB
                'worker_prefetch_multiplier': 1,
                'worker_max_tasks_per_child': 1000,
                'task_acks_late': True,
                'task_reject_on_worker_lost': True,
            }
    
        @staticmethod
        def get_performance_config():
            """Get performance-optimized configuration."""
            return {
                'worker_prefetch_multiplier': 4,
                'worker_max_tasks_per_child': 10000,
                'worker_disable_rate_limits': True,
                'task_compression': 'gzip',
                'result_compression': 'gzip',
            }
    
    # Dynamic configuration based on environment
    def configure_worker_dynamically():
        """Configure worker based on system resources."""
        import psutil
    
        # Get system information
        memory_gb = psutil.virtual_memory().total / (1024**3)
        cpu_count = psutil.cpu_count()
    
        config = {}
    
        # Memory-based configuration
        if memory_gb < 2:
            config.update({
                'worker_max_tasks_per_child': 100,
                'worker_prefetch_multiplier': 1,
            })
        elif memory_gb < 8:
            config.update({
                'worker_max_tasks_per_child': 1000,
                'worker_prefetch_multiplier': 2,
            })
        else:
            config.update({
                'worker_max_tasks_per_child': 10000,
                'worker_prefetch_multiplier': 4,
            })
    
        # CPU-based configuration
        if cpu_count <= 2:
            config['worker_concurrency'] = cpu_count
        else:
            config['worker_concurrency'] = cpu_count * 2
    
        return config
    Python

    Environment-Specific Settings

    # environments.py
    """Environment-specific configuration management."""
    import os
    from typing import Dict, Any
    
    class EnvironmentManager:
        """Manage environment-specific configurations."""
    
        def __init__(self):
            self.env = os.environ.get('CELERY_ENV', 'development')
    
        def get_broker_config(self) -> Dict[str, Any]:
            """Get broker configuration for current environment."""
            configs = {
                'development': {
                    'broker_url': 'redis://localhost:6379/0',
                    'broker_connection_retry_on_startup': True,
                },
                'testing': {
                    'broker_url': 'memory://',
                    'task_always_eager': True,
                    'task_eager_propagates': True,
                },
                'staging': {
                    'broker_url': os.environ.get('REDIS_URL'),
                    'broker_pool_limit': 5,
                    'broker_connection_timeout': 10,
                },
                'production': {
                    'broker_url': os.environ.get('REDIS_URL'),
                    'broker_pool_limit': 20,
                    'broker_connection_timeout': 30,
                    'broker_connection_retry_on_startup': True,
                    'broker_connection_max_retries': 5,
                }
            }
            return configs.get(self.env, configs['development'])
    
        def get_result_backend_config(self) -> Dict[str, Any]:
            """Get result backend configuration."""
            configs = {
                'development': {
                    'result_backend': 'redis://localhost:6379/0',
                    'result_expires': 300,  # 5 minutes
                },
                'testing': {
                    'result_backend': 'cache+memory://',
                    'result_expires': 60,
                },
                'production': {
                    'result_backend': os.environ.get('REDIS_URL'),
                    'result_expires': 3600,  # 1 hour
                    'result_persistent': True,
                    'result_backend_transport_options': {
                        'retry_on_timeout': True,
                    }
                }
            }
            return configs.get(self.env, configs['development'])
    
        def get_logging_config(self) -> Dict[str, Any]:
            """Get logging configuration."""
            configs = {
                'development': {
                    'worker_log_level': 'DEBUG',
                    'worker_log_color': True,
                    'worker_redirect_stdouts_level': 'DEBUG',
                },
                'production': {
                    'worker_log_level': 'INFO',
                    'worker_log_color': False,
                    'worker_redirect_stdouts_level': 'WARNING',
                    'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
                    'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s',
                }
            }
            return configs.get(self.env, configs['development'])
    
    # Configuration factory
    def create_app_config():
        """Create application configuration."""
        env_manager = EnvironmentManager()
    
        config = {}
        config.update(env_manager.get_broker_config())
        config.update(env_manager.get_result_backend_config())
        config.update(env_manager.get_logging_config())
    
        # Common settings
        config.update({
            'task_serializer': 'json',
            'accept_content': ['json'],
            'result_serializer': 'json',
            'timezone': 'UTC',
            'enable_utc': True,
        })
    
        return config
    Python

    Chapter 7: Error Handling and Retries

    Exception Handling Architecture

    graph TB
        subgraph "Task Execution Flow"
            A[Task Starts] --> B{Task Executes}
            B -->|Success| C[Return Result]
            B -->|Exception| D[Exception Handler]
        end
    
        subgraph "Error Handling Options"
            D --> E{Retry Logic}
            E -->|Auto Retry| F[Automatic Retry]
            E -->|Manual Retry| G[Custom Retry Logic]
            E -->|No Retry| H[Mark as Failed]
    
            F --> I[Exponential Backoff]
            F --> J[Fixed Delay]
            F --> K[Custom Strategy]
    
            G --> L[Business Logic Check]
            G --> M[Conditional Retry]
            G --> N[Circuit Breaker]
        end
    
        subgraph "Final States"
            C --> O[SUCCESS]
            H --> P[FAILURE]
            I --> Q{Max Retries?}
            J --> Q
            K --> Q
            L --> Q
            M --> Q
            N --> Q
    
            Q -->|No| B
            Q -->|Yes| P
        end
    
        style A fill:#e8f5e8
        style C fill:#e8f5e8
        style D fill:#ffebee
        style H fill:#ffebee
        style P fill:#ffebee

    Comprehensive Error Handling

    # error_handling.py
    """Comprehensive error handling examples."""
    from celery import Celery
    from celery.exceptions import Retry, Ignore, WorkerLostError
    import random
    import time
    import logging
    from typing import Optional, Dict, Any
    
    app = Celery('error_handling_demo')
    
    # Custom exceptions
    class BusinessLogicError(Exception):
        """Exception for business logic failures."""
        pass
    
    class ExternalServiceError(Exception):
        """Exception for external service failures."""
        pass
    
    class ValidationError(Exception):
        """Exception for input validation failures."""
        pass
    
    # Basic exception handling
    @app.task(bind=True)
    def basic_error_handling(self, data):
        """Basic task with exception handling."""
        try:
            # Simulate processing
            if not data:
                raise ValidationError("Data cannot be empty")
    
            if data.get('fail'):
                raise BusinessLogicError("Simulated business logic failure")
    
            return f"Processed: {data}"
    
        except ValidationError as exc:
            # Don't retry validation errors
            logging.error(f"Validation error in task {self.request.id}: {exc}")
            raise Ignore()
    
        except BusinessLogicError as exc:
            # Log business logic errors but don't retry
            logging.error(f"Business logic error in task {self.request.id}: {exc}")
            raise
    
        except Exception as exc:
            # Unexpected errors - log and re-raise
            logging.error(f"Unexpected error in task {self.request.id}: {exc}")
            raise
    
    # Automatic retry configuration
    @app.task(
        bind=True,
        autoretry_for=(ExternalServiceError, ConnectionError),
        retry_kwargs={'max_retries': 3, 'countdown': 60},
        retry_backoff=True,
        retry_backoff_max=700,
        retry_jitter=True
    )
    def auto_retry_task(self, url):
        """Task with automatic retry configuration."""
        try:
            # Simulate external service call
            import requests
            response = requests.get(url, timeout=10)
    
            if response.status_code >= 500:
                raise ExternalServiceError(f"Server error: {response.status_code}")
    
            return response.json()
    
        except requests.RequestException as exc:
            logging.warning(f"Request failed, will retry: {exc}")
            raise ExternalServiceError(f"Request failed: {exc}")
    
    # Manual retry with custom logic
    @app.task(bind=True)
    def manual_retry_task(self, operation_data):
        """Task with manual retry logic."""
        try:
            # Simulate operation
            success_rate = operation_data.get('success_rate', 0.5)
    
            if random.random() > success_rate:
                raise ExternalServiceError("Operation failed")
    
            return f"Operation succeeded: {operation_data}"
    
        except ExternalServiceError as exc:
            # Custom retry logic
            current_retry = self.request.retries
            max_retries = 5
    
            if current_retry < max_retries:
                # Calculate exponential backoff with jitter
                base_delay = 2 ** current_retry
                jitter = random.uniform(0.5, 1.5)
                countdown = base_delay * jitter
    
                logging.warning(
                    f"Task failed (attempt {current_retry + 1}/{max_retries + 1}). "
                    f"Retrying in {countdown:.2f} seconds. Error: {exc}"
                )
    
                raise self.retry(countdown=countdown, exc=exc)
            else:
                logging.error(f"Task failed after {max_retries + 1} attempts: {exc}")
                raise
    
    # Conditional retry logic
    @app.task(bind=True)
    def conditional_retry_task(self, data):
        """Task with conditional retry based on error type."""
        try:
            # Simulate different types of failures
            error_type = data.get('error_type')
    
            if error_type == 'temporary':
                raise ExternalServiceError("Temporary service unavailable")
            elif error_type == 'permanent':
                raise BusinessLogicError("Invalid data format")
            elif error_type == 'network':
                raise ConnectionError("Network timeout")
    
            return f"Success: {data}"
    
        except ExternalServiceError as exc:
            # Retry external service errors
            if self.request.retries < 3:
                raise self.retry(countdown=60, exc=exc)
            raise
    
        except ConnectionError as exc:
            # Retry network errors with exponential backoff
            if self.request.retries < 5:
                countdown = (2 ** self.request.retries) * 60
                raise self.retry(countdown=countdown, exc=exc)
            raise
    
        except BusinessLogicError as exc:
            # Don't retry business logic errors
            logging.error(f"Business logic error: {exc}")
            raise
    
    # Circuit breaker pattern implementation
    class CircuitBreaker:
        """Simple circuit breaker implementation."""
    
        def __init__(self, failure_threshold=5, recovery_timeout=300):
            self.failure_threshold = failure_threshold
            self.recovery_timeout = recovery_timeout
            self.failure_count = 0
            self.last_failure_time = None
            self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
        def call(self, func, *args, **kwargs):
            """Execute function with circuit breaker protection."""
            if self.state == 'OPEN':
                if time.time() - self.last_failure_time > self.recovery_timeout:
                    self.state = 'HALF_OPEN'
                else:
                    raise ExternalServiceError("Circuit breaker is OPEN")
    
            try:
                result = func(*args, **kwargs)
                self.on_success()
                return result
            except Exception as exc:
                self.on_failure()
                raise
    
        def on_success(self):
            """Handle successful call."""
            self.failure_count = 0
            self.state = 'CLOSED'
    
        def on_failure(self):
            """Handle failed call."""
            self.failure_count += 1
            self.last_failure_time = time.time()
    
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
    
    # Global circuit breaker instance
    external_service_breaker = CircuitBreaker()
    
    @app.task(bind=True)
    def circuit_breaker_task(self, service_url):
        """Task using circuit breaker pattern."""
        def external_service_call():
            import requests
            response = requests.get(service_url, timeout=10)
            if response.status_code >= 500:
                raise ExternalServiceError(f"Service error: {response.status_code}")
            return response.json()
    
        try:
            return external_service_breaker.call(external_service_call)
        except ExternalServiceError as exc:
            if self.request.retries < 3:
                raise self.retry(countdown=300, exc=exc)  # Wait 5 minutes
            raise
    
    # Dead letter queue implementation
    @app.task(bind=True)
    def dead_letter_handler(self, original_task_data, error_info):
        """Handle tasks that have exhausted all retries."""
        logging.error(
            f"Task moved to dead letter queue: "
            f"Task: {original_task_data.get('task_name')}, "
            f"Args: {original_task_data.get('args')}, "
            f"Error: {error_info}"
        )
    
        # Store in database for manual review
        # Send notification to administrators
        # Implement recovery logic if needed
    
        return f"Dead letter processed: {original_task_data['task_id']}"
    
    @app.task(bind=True)
    def task_with_dead_letter(self, data):
        """Task that sends failed attempts to dead letter queue."""
        try:
            # Simulate unreliable operation
            if random.random() < 0.8:  # 80% failure rate
                raise ExternalServiceError("Simulated failure")
    
            return f"Success: {data}"
    
        except ExternalServiceError as exc:
            if self.request.retries < 3:
                raise self.retry(countdown=60, exc=exc)
            else:
                # Send to dead letter queue
                dead_letter_data = {
                    'task_id': self.request.id,
                    'task_name': self.name,
                    'args': self.request.args,
                    'kwargs': self.request.kwargs,
                    'retries': self.request.retries,
                }
    
                error_info = {
                    'error_type': type(exc).__name__,
                    'error_message': str(exc),
                    'final_attempt': True,
                }
    
                dead_letter_handler.delay(dead_letter_data, error_info)
    
                # Mark task as failed
                raise
    Python

    Advanced Retry Strategies

    # retry_strategies.py
    """Advanced retry strategies and patterns."""
    import time
    import random
    import math
    from datetime import datetime, timedelta
    from celery import Celery
    from typing import Optional, Callable, Any
    
    app = Celery('retry_strategies')
    
    class RetryStrategy:
        """Base class for retry strategies."""
    
        def get_delay(self, retry_count: int) -> float:
            """Get delay in seconds for given retry count."""
            raise NotImplementedError
    
    class ExponentialBackoffStrategy(RetryStrategy):
        """Exponential backoff with optional jitter."""
    
        def __init__(self, base_delay=1, max_delay=300, jitter=True, backoff_factor=2):
            self.base_delay = base_delay
            self.max_delay = max_delay
            self.jitter = jitter
            self.backoff_factor = backoff_factor
    
        def get_delay(self, retry_count: int) -> float:
            """Calculate exponential backoff delay."""
            delay = self.base_delay * (self.backoff_factor ** retry_count)
            delay = min(delay, self.max_delay)
    
            if self.jitter:
                # Add ±25% jitter
                jitter_factor = random.uniform(0.75, 1.25)
                delay *= jitter_factor
    
            return delay
    
    class LinearBackoffStrategy(RetryStrategy):
        """Linear backoff strategy."""
    
        def __init__(self, base_delay=60, increment=30, max_delay=600):
            self.base_delay = base_delay
            self.increment = increment
            self.max_delay = max_delay
    
        def get_delay(self, retry_count: int) -> float:
            """Calculate linear backoff delay."""
            delay = self.base_delay + (self.increment * retry_count)
            return min(delay, self.max_delay)
    
    class FibonacciBackoffStrategy(RetryStrategy):
        """Fibonacci sequence backoff strategy."""
    
        def __init__(self, base_delay=1, max_delay=600):
            self.base_delay = base_delay
            self.max_delay = max_delay
    
        def get_delay(self, retry_count: int) -> float:
            """Calculate Fibonacci backoff delay."""
            def fibonacci(n):
                if n <= 1:
                    return n
                a, b = 0, 1
                for _ in range(2, n + 1):
                    a, b = b, a + b
                return b
    
            delay = self.base_delay * fibonacci(retry_count + 1)
            return min(delay, self.max_delay)
    
    class AdaptiveRetryStrategy(RetryStrategy):
        """Adaptive retry strategy based on recent success rates."""
    
        def __init__(self, base_delay=60, min_delay=30, max_delay=900):
            self.base_delay = base_delay
            self.min_delay = min_delay
            self.max_delay = max_delay
            self.recent_attempts = []
            self.window_size = 100
    
        def record_attempt(self, success: bool):
            """Record the result of an attempt."""
            self.recent_attempts.append({
                'success': success,
                'timestamp': time.time()
            })
    
            # Keep only recent attempts
            if len(self.recent_attempts) > self.window_size:
                self.recent_attempts.pop(0)
    
        def get_success_rate(self) -> float:
            """Calculate recent success rate."""
            if not self.recent_attempts:
                return 0.5  # Default assumption
    
            successes = sum(1 for attempt in self.recent_attempts if attempt['success'])
            return successes / len(self.recent_attempts)
    
        def get_delay(self, retry_count: int) -> float:
            """Calculate adaptive delay based on success rate."""
            success_rate = self.get_success_rate()
    
            # Lower success rate = longer delay
            failure_factor = 1 / (success_rate + 0.1)  # Avoid division by zero
    
            delay = self.base_delay * failure_factor * (retry_count + 1)
            return max(self.min_delay, min(delay, self.max_delay))
    
    # Task with custom retry strategy
    @app.task(bind=True)
    def task_with_custom_retry(self, data, strategy_name='exponential'):
        """Task using custom retry strategies."""
        strategies = {
            'exponential': ExponentialBackoffStrategy(base_delay=2, max_delay=300),
            'linear': LinearBackoffStrategy(base_delay=60, increment=30),
            'fibonacci': FibonacciBackoffStrategy(base_delay=5, max_delay=600),
            'adaptive': AdaptiveRetryStrategy(base_delay=60),
        }
    
        strategy = strategies.get(strategy_name, strategies['exponential'])
    
        try:
            # Simulate unreliable operation
            if random.random() < 0.7:  # 70% failure rate
                raise ExternalServiceError("Simulated failure")
    
            # Record success for adaptive strategy
            if isinstance(strategy, AdaptiveRetryStrategy):
                strategy.record_attempt(True)
    
            return f"Success with {strategy_name} strategy: {data}"
    
        except ExternalServiceError as exc:
            # Record failure for adaptive strategy
            if isinstance(strategy, AdaptiveRetryStrategy):
                strategy.record_attempt(False)
    
            if self.request.retries < 5:
                delay = strategy.get_delay(self.request.retries)
    
                logging.warning(
                    f"Task failed (retry {self.request.retries + 1}/6) "
                    f"using {strategy_name} strategy. "
                    f"Retrying in {delay:.2f} seconds."
                )
    
                raise self.retry(countdown=delay, exc=exc)
    
            raise
    
    # Time-based retry limits
    @app.task(bind=True)
    def time_limited_retry_task(self, data, max_retry_time=3600):
        """Task with time-based retry limits (1 hour)."""
        start_time = getattr(self.request, 'start_time', None)
    
        if start_time is None:
            # First attempt - store start time
            self.request.start_time = time.time()
            start_time = self.request.start_time
    
        try:
            # Simulate operation
            if random.random() < 0.6:
                raise ExternalServiceError("Simulated failure")
    
            return f"Success: {data}"
    
        except ExternalServiceError as exc:
            elapsed_time = time.time() - start_time
    
            if elapsed_time < max_retry_time:
                # Calculate delay with exponential backoff
                delay = min(60 * (2 ** self.request.retries), 900)  # Max 15 minutes
    
                logging.warning(
                    f"Task failed after {elapsed_time:.0f}s. "
                    f"Retrying in {delay}s. "
                    f"Time limit: {max_retry_time}s"
                )
    
                raise self.retry(countdown=delay, exc=exc)
            else:
                logging.error(
                    f"Task exceeded time limit of {max_retry_time}s. "
                    f"Total elapsed: {elapsed_time:.0f}s"
                )
                raise
    
    # Conditional retry based on error analysis
    @app.task(bind=True)
    def smart_retry_task(self, data):
        """Task with intelligent retry logic based on error analysis."""
    
        def analyze_error(error):
            """Analyze error to determine retry strategy."""
            error_str = str(error).lower()
    
            if 'timeout' in error_str or 'connection' in error_str:
                return {'should_retry': True, 'delay': 30, 'max_retries': 5}
            elif 'rate limit' in error_str:
                return {'should_retry': True, 'delay': 300, 'max_retries': 3}
            elif 'server error' in error_str or '500' in error_str:
                return {'should_retry': True, 'delay': 120, 'max_retries': 4}
            elif 'authentication' in error_str or '401' in error_str:
                return {'should_retry': False, 'reason': 'Authentication error'}
            elif 'not found' in error_str or '404' in error_str:
                return {'should_retry': False, 'reason': 'Resource not found'}
            else:
                return {'should_retry': True, 'delay': 60, 'max_retries': 3}
    
        try:
            # Simulate various types of failures
            failure_types = [
                "Connection timeout",
                "Rate limit exceeded",
                "Internal server error",
                "Authentication failed",
                "Resource not found",
                "Unknown error"
            ]
    
            if random.random() < 0.7:
                error_type = random.choice(failure_types)
                raise ExternalServiceError(error_type)
    
            return f"Success: {data}"
    
        except ExternalServiceError as exc:
            retry_info = analyze_error(exc)
    
            if not retry_info['should_retry']:
                logging.error(f"Not retrying: {retry_info.get('reason', 'Unknown')}")
                raise
    
            max_retries = retry_info['max_retries']
            if self.request.retries < max_retries:
                delay = retry_info['delay']
    
                logging.warning(
                    f"Retrying task due to: {exc}. "
                    f"Attempt {self.request.retries + 1}/{max_retries + 1}. "
                    f"Delay: {delay}s"
                )
    
                raise self.retry(countdown=delay, exc=exc)
    
            logging.error(f"Task failed after {max_retries + 1} attempts: {exc}")
            raise
    Python

    This continues the comprehensive Celery book with detailed error handling and retry strategies. The examples show progressive complexity from basic error handling to sophisticated retry mechanisms with circuit breakers and adaptive strategies.


    Chapter 8: Monitoring and Debugging

    Monitoring Architecture

    graph TB
        subgraph "Monitoring Components"
            A[Celery Events] --> B[Event Monitor]
            C[Worker Status] --> B
            D[Task States] --> B
            E[Queue Metrics] --> B
        end
    
        subgraph "Monitoring Tools"
            B --> F[Flower Web UI]
            B --> G[Custom Dashboard]
            B --> H[Prometheus/Grafana]
            B --> I[Application Logs]
        end
    
        subgraph "Alerting"
            F --> J[Email Alerts]
            G --> K[Slack Notifications]
            H --> L[PagerDuty]
            I --> M[Log Aggregation]
        end
    
        subgraph "Metrics"
            N[Task Throughput]
            O[Worker Utilization]
            P[Queue Depth]
            Q[Error Rates]
            R[Response Times]
        end
    
        B --> N
        B --> O
        B --> P
        B --> Q
        B --> R
    
        style A fill:#e8f5e8
        style F fill:#e3f2fd
        style G fill:#fff3e0
        style H fill:#fce4ec

    Comprehensive Monitoring Setup

    # monitoring.py
    """Comprehensive monitoring and metrics collection."""
    import time
    import logging
    import json
    from datetime import datetime, timedelta
    from collections import defaultdict, deque
    from celery import Celery
    from celery.events import EventReceiver
    from celery.signals import (
        task_sent, task_received, task_started, task_success, 
        task_failure, task_retry, worker_ready, worker_shutdown
    )
    import threading
    
    app = Celery('monitoring_demo')
    
    class TaskMetrics:
        """Collect and track task metrics."""
    
        def __init__(self):
            self.task_counts = defaultdict(int)
            self.task_times = defaultdict(list)
            self.task_errors = defaultdict(list)
            self.recent_tasks = deque(maxlen=1000)
            self.worker_stats = defaultdict(dict)
            self.queue_stats = defaultdict(dict)
            self.lock = threading.Lock()
    
        def record_task_sent(self, task_id, task_name, queue):
            """Record when a task is sent."""
            with self.lock:
                self.task_counts[f"{task_name}.sent"] += 1
                self.recent_tasks.append({
                    'task_id': task_id,
                    'task_name': task_name,
                    'queue': queue,
                    'event': 'sent',
                    'timestamp': time.time()
                })
    
        def record_task_started(self, task_id, task_name, worker):
            """Record when a task starts executing."""
            with self.lock:
                self.task_counts[f"{task_name}.started"] += 1
                self.worker_stats[worker]['current_tasks'] = self.worker_stats[worker].get('current_tasks', 0) + 1
    
        def record_task_success(self, task_id, task_name, runtime, worker):
            """Record successful task completion."""
            with self.lock:
                self.task_counts[f"{task_name}.success"] += 1
                self.task_times[task_name].append(runtime)
                self.worker_stats[worker]['current_tasks'] = max(0, self.worker_stats[worker].get('current_tasks', 1) - 1)
                self.worker_stats[worker]['total_completed'] = self.worker_stats[worker].get('total_completed', 0) + 1
    
        def record_task_failure(self, task_id, task_name, exception, worker):
            """Record task failure."""
            with self.lock:
                self.task_counts[f"{task_name}.failure"] += 1
                self.task_errors[task_name].append({
                    'timestamp': time.time(),
                    'exception': str(exception),
                    'worker': worker
                })
                self.worker_stats[worker]['current_tasks'] = max(0, self.worker_stats[worker].get('current_tasks', 1) - 1)
                self.worker_stats[worker]['total_failed'] = self.worker_stats[worker].get('total_failed', 0) + 1
    
        def get_task_stats(self, task_name):
            """Get statistics for a specific task."""
            with self.lock:
                sent = self.task_counts.get(f"{task_name}.sent", 0)
                started = self.task_counts.get(f"{task_name}.started", 0)
                success = self.task_counts.get(f"{task_name}.success", 0)
                failure = self.task_counts.get(f"{task_name}.failure", 0)
    
                times = self.task_times.get(task_name, [])
                avg_time = sum(times) / len(times) if times else 0
    
                recent_errors = [
                    err for err in self.task_errors.get(task_name, [])
                    if time.time() - err['timestamp'] < 3600  # Last hour
                ]
    
                return {
                    'sent': sent,
                    'started': started,
                    'success': success,
                    'failure': failure,
                    'success_rate': success / max(1, success + failure),
                    'avg_runtime': avg_time,
                    'recent_errors': len(recent_errors),
                    'pending': sent - started,
                    'executing': started - success - failure
                }
    
        def get_worker_stats(self):
            """Get worker statistics."""
            with self.lock:
                return dict(self.worker_stats)
    
        def get_system_overview(self):
            """Get system-wide overview."""
            with self.lock:
                total_tasks = sum(
                    count for key, count in self.task_counts.items()
                    if key.endswith('.sent')
                )
                total_success = sum(
                    count for key, count in self.task_counts.items()
                    if key.endswith('.success')
                )
                total_failure = sum(
                    count for key, count in self.task_counts.items()
                    if key.endswith('.failure')
                )
    
                return {
                    'total_tasks': total_tasks,
                    'total_success': total_success,
                    'total_failure': total_failure,
                    'overall_success_rate': total_success / max(1, total_success + total_failure),
                    'active_workers': len(self.worker_stats),
                    'recent_activity': len(self.recent_tasks)
                }
    
    # Global metrics collector
    metrics = TaskMetrics()
    
    # Signal handlers for automatic metrics collection
    @task_sent.connect
    def task_sent_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
        """Handle task sent signal."""
        queue = kwds.get('routing_key', 'default')
        metrics.record_task_sent(task_id, task, queue)
    
    @task_started.connect
    def task_started_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
        """Handle task started signal."""
        worker = sender.hostname if sender else 'unknown'
        metrics.record_task_started(task_id, task, worker)
    
    @task_success.connect
    def task_success_handler(sender=None, task_id=None, task=None, result=None, runtime=None, **kwds):
        """Handle task success signal."""
        worker = sender.hostname if sender else 'unknown'
        metrics.record_task_success(task_id, task, runtime, worker)
    
    @task_failure.connect
    def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
        """Handle task failure signal."""
        worker = sender.hostname if sender else 'unknown'
        task_name = kwds.get('task', 'unknown')
        metrics.record_task_failure(task_id, task_name, exception, worker)
    
    # Custom monitoring tasks
    @app.task
    def system_health_check():
        """Perform system health check."""
        stats = metrics.get_system_overview()
        worker_stats = metrics.get_worker_stats()
    
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'system_stats': stats,
            'worker_count': len(worker_stats),
            'workers': worker_stats,
            'alerts': []
        }
    
        # Check for alerts
        if stats['overall_success_rate'] < 0.9:
            health_report['alerts'].append({
                'level': 'warning',
                'message': f"Low success rate: {stats['overall_success_rate']:.2%}"
            })
    
        if len(worker_stats) == 0:
            health_report['alerts'].append({
                'level': 'critical',
                'message': "No active workers detected"
            })
    
        # Check individual worker health
        for worker, stats in worker_stats.items():
            current_tasks = stats.get('current_tasks', 0)
            if current_tasks > 100:  # Assuming high load threshold
                health_report['alerts'].append({
                    'level': 'warning',
                    'message': f"Worker {worker} has high load: {current_tasks} current tasks"
                })
    
        return health_report
    
    @app.task
    def generate_performance_report():
        """Generate detailed performance report."""
        report = {
            'timestamp': datetime.now().isoformat(),
            'period': 'last_hour',
            'task_performance': {},
            'worker_performance': metrics.get_worker_stats(),
            'system_overview': metrics.get_system_overview()
        }
    
        # Get performance data for each task type
        task_names = set()
        for key in metrics.task_counts.keys():
            task_name = key.rsplit('.', 1)[0]
            task_names.add(task_name)
    
        for task_name in task_names:
            report['task_performance'][task_name] = metrics.get_task_stats(task_name)
    
        return report
    Python

    Flower Integration and Custom Dashboards

    # flower_integration.py
    """Enhanced Flower integration and custom monitoring."""
    import json
    import time
    from celery.events import EventReceiver
    from kombu import Connection
    import threading
    import logging
    
    class CeleryEventMonitor:
        """Real-time Celery event monitor."""
    
        def __init__(self, broker_url):
            self.broker_url = broker_url
            self.running = False
            self.event_handlers = {}
            self.metrics = TaskMetrics()
    
        def register_handler(self, event_type, handler):
            """Register event handler."""
            if event_type not in self.event_handlers:
                self.event_handlers[event_type] = []
            self.event_handlers[event_type].append(handler)
    
        def start_monitoring(self):
            """Start event monitoring in a separate thread."""
            def monitor():
                with Connection(self.broker_url) as connection:
                    recv = EventReceiver(connection, handlers={
                        'task-sent': self.on_task_sent,
                        'task-received': self.on_task_received,
                        'task-started': self.on_task_started,
                        'task-succeeded': self.on_task_succeeded,
                        'task-failed': self.on_task_failed,
                        'task-retried': self.on_task_retried,
                        'worker-online': self.on_worker_online,
                        'worker-offline': self.on_worker_offline,
                        'worker-heartbeat': self.on_worker_heartbeat,
                    })
    
                    self.running = True
                    logging.info("Celery event monitoring started")
    
                    try:
                        recv.capture(limit=None, timeout=None, wakeup=True)
                    except KeyboardInterrupt:
                        logging.info("Event monitoring stopped")
                    finally:
                        self.running = False
    
            thread = threading.Thread(target=monitor, daemon=True)
            thread.start()
            return thread
    
        def on_task_sent(self, event):
            """Handle task-sent event."""
            for handler in self.event_handlers.get('task-sent', []):
                handler(event)
    
        def on_task_received(self, event):
            """Handle task-received event."""
            for handler in self.event_handlers.get('task-received', []):
                handler(event)
    
        def on_task_started(self, event):
            """Handle task-started event."""
            for handler in self.event_handlers.get('task-started', []):
                handler(event)
    
        def on_task_succeeded(self, event):
            """Handle task-succeeded event."""
            for handler in self.event_handlers.get('task-succeeded', []):
                handler(event)
    
        def on_task_failed(self, event):
            """Handle task-failed event."""
            for handler in self.event_handlers.get('task-failed', []):
                handler(event)
    
        def on_task_retried(self, event):
            """Handle task-retried event."""
            for handler in self.event_handlers.get('task-retried', []):
                handler(event)
    
        def on_worker_online(self, event):
            """Handle worker-online event."""
            logging.info(f"Worker came online: {event.get('hostname')}")
            for handler in self.event_handlers.get('worker-online', []):
                handler(event)
    
        def on_worker_offline(self, event):
            """Handle worker-offline event."""
            logging.warning(f"Worker went offline: {event.get('hostname')}")
            for handler in self.event_handlers.get('worker-offline', []):
                handler(event)
    
        def on_worker_heartbeat(self, event):
            """Handle worker-heartbeat event."""
            for handler in self.event_handlers.get('worker-heartbeat', []):
                handler(event)
    
    # Custom dashboard implementation
    from flask import Flask, jsonify, render_template
    import json
    
    dashboard_app = Flask(__name__)
    monitor = CeleryEventMonitor('redis://localhost:6379/0')
    
    @dashboard_app.route('/api/metrics')
    def get_metrics():
        """Get current metrics."""
        return jsonify({
            'system_overview': monitor.metrics.get_system_overview(),
            'worker_stats': monitor.metrics.get_worker_stats(),
            'timestamp': time.time()
        })
    
    @dashboard_app.route('/api/task/<task_name>')
    def get_task_metrics(task_name):
        """Get metrics for specific task."""
        return jsonify(monitor.metrics.get_task_stats(task_name))
    
    @dashboard_app.route('/api/health')
    def health_check():
        """Health check endpoint."""
        stats = monitor.metrics.get_system_overview()
    
        is_healthy = (
            stats['overall_success_rate'] > 0.95 and
            stats['active_workers'] > 0
        )
    
        return jsonify({
            'healthy': is_healthy,
            'stats': stats,
            'timestamp': time.time()
        })
    
    @dashboard_app.route('/')
    def dashboard():
        """Main dashboard page."""
        return render_template('dashboard.html')
    
    # Dashboard HTML template (dashboard.html)
    DASHBOARD_HTML = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>Celery Monitoring Dashboard</title>
        <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            .metric-card { 
                border: 1px solid #ddd; 
                border-radius: 8px; 
                padding: 15px; 
                margin: 10px; 
                display: inline-block; 
                min-width: 200px;
            }
            .metric-value { font-size: 2em; font-weight: bold; }
            .metric-label { color: #666; }
            .alert { background-color: #f8d7da; border: 1px solid #f5c6cb; padding: 10px; margin: 10px 0; border-radius: 4px; }
            .success { color: #28a745; }
            .warning { color: #ffc107; }
            .danger { color: #dc3545; }
        </style>
    </head>
    <body>
        <h1>Celery Monitoring Dashboard</h1>
    
        <div id="overview">
            <div class="metric-card">
                <div class="metric-value" id="total-tasks">-</div>
                <div class="metric-label">Total Tasks</div>
            </div>
            <div class="metric-card">
                <div class="metric-value" id="success-rate">-</div>
                <div class="metric-label">Success Rate</div>
            </div>
            <div class="metric-card">
                <div class="metric-value" id="active-workers">-</div>
                <div class="metric-label">Active Workers</div>
            </div>
        </div>
    
        <div id="alerts"></div>
    
        <canvas id="taskChart" width="400" height="200"></canvas>
    
        <script>
            function updateDashboard() {
                fetch('/api/metrics')
                    .then(response => response.json())
                    .then(data => {
                        const overview = data.system_overview;
    
                        document.getElementById('total-tasks').textContent = overview.total_tasks;
                        document.getElementById('success-rate').textContent = 
                            (overview.overall_success_rate * 100).toFixed(1) + '%';
                        document.getElementById('active-workers').textContent = overview.active_workers;
    
                        // Update chart
                        updateChart(overview);
                    })
                    .catch(error => console.error('Error:', error));
            }
    
            function updateChart(data) {
                // Chart update logic here
            }
    
            // Update every 5 seconds
            setInterval(updateDashboard, 5000);
            updateDashboard();
        </script>
    </body>
    </html>
    """
    
    # Alerting system
    class AlertManager:
        """Manage alerts and notifications."""
    
        def __init__(self):
            self.alert_rules = []
            self.notification_channels = []
    
        def add_rule(self, name, condition, severity='warning'):
            """Add an alert rule."""
            self.alert_rules.append({
                'name': name,
                'condition': condition,
                'severity': severity
            })
    
        def add_notification_channel(self, channel):
            """Add notification channel."""
            self.notification_channels.append(channel)
    
        def check_alerts(self, metrics):
            """Check all alert rules against current metrics."""
            alerts = []
    
            for rule in self.alert_rules:
                try:
                    if rule['condition'](metrics):
                        alert = {
                            'name': rule['name'],
                            'severity': rule['severity'],
                            'timestamp': time.time(),
                            'metrics': metrics
                        }
                        alerts.append(alert)
                        self.send_alert(alert)
                except Exception as e:
                    logging.error(f"Error checking alert rule {rule['name']}: {e}")
    
            return alerts
    
        def send_alert(self, alert):
            """Send alert through all notification channels."""
            for channel in self.notification_channels:
                try:
                    channel.send(alert)
                except Exception as e:
                    logging.error(f"Error sending alert: {e}")
    
    # Email notification channel
    import smtplib
    from email.mime.text import MIMEText
    
    class EmailNotificationChannel:
        """Email notification channel."""
    
        def __init__(self, smtp_host, smtp_port, username, password, recipients):
            self.smtp_host = smtp_host
            self.smtp_port = smtp_port
            self.username = username
            self.password = password
            self.recipients = recipients
    
        def send(self, alert):
            """Send alert via email."""
            subject = f"Celery Alert: {alert['name']} ({alert['severity']})"
            body = f"""
            Alert: {alert['name']}
            Severity: {alert['severity']}
            Time: {datetime.fromtimestamp(alert['timestamp'])}
    
            Metrics:
            {json.dumps(alert['metrics'], indent=2)}
            """
    
            msg = MIMEText(body)
            msg['Subject'] = subject
            msg['From'] = self.username
            msg['To'] = ', '.join(self.recipients)
    
            with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
                server.starttls()
                server.login(self.username, self.password)
                server.send_message(msg)
    
    # Setup example
    alert_manager = AlertManager()
    
    # Add alert rules
    alert_manager.add_rule(
        'Low Success Rate',
        lambda m: m.get_system_overview()['overall_success_rate'] < 0.9,
        'warning'
    )
    
    alert_manager.add_rule(
        'No Active Workers',
        lambda m: m.get_system_overview()['active_workers'] == 0,
        'critical'
    )
    
    # Add notification channels
    email_channel = EmailNotificationChannel(
        smtp_host='smtp.gmail.com',
        smtp_port=587,
        username='alerts@company.com',
        password='password',
        recipients=['admin@company.com']
    )
    alert_manager.add_notification_channel(email_channel)
    Python

    Chapter 9: Workflows and Chains

    Workflow Architecture

    graph TB
        subgraph "Workflow Patterns"
            A[Simple Chain] --> B[Task 1 → Task 2 → Task 3]
            C[Group] --> D[Task 1, Task 2, Task 3 in parallel]
            E[Chord] --> F[Group → Callback Task]
            G[Map] --> H[Same Task with different args]
            I[Starmap] --> J[Task with argument unpacking]
            K[Chain of Groups] --> L[Complex Workflow]
        end
    
        subgraph "Workflow Control"
            M[Conditional Execution]
            N[Error Handling]
            O[Partial Failures]
            P[Workflow State]
            Q[Result Aggregation]
        end
    
        B --> M
        D --> N
        F --> O
        H --> P
        J --> Q
        L --> M
    
        style A fill:#e8f5e8
        style C fill:#e3f2fd
        style E fill:#fff3e0
        style G fill:#fce4ec
        style I fill:#f3e5f5
        style K fill:#e1f5fe

    Advanced Workflow Implementation

    # workflows.py
    """Advanced workflow patterns and implementations."""
    from celery import Celery, group, chain, chord, chunks, signature
    from celery.result import GroupResult, ResultSet
    import time
    import random
    import logging
    from typing import List, Dict, Any, Optional
    
    app = Celery('workflow_demo')
    
    # Basic workflow tasks
    @app.task
    def download_data(url: str) -> Dict[str, Any]:
        """Download data from URL."""
        time.sleep(random.uniform(1, 3))  # Simulate download
        return {
            'url': url,
            'size': random.randint(1000, 10000),
            'data': f"content_from_{url.split('/')[-1]}",
            'timestamp': time.time()
        }
    
    @app.task
    def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Process downloaded data."""
        time.sleep(random.uniform(0.5, 2))  # Simulate processing
    
        processed = {
            'original_size': data['size'],
            'processed_size': data['size'] * random.uniform(0.7, 1.3),
            'url': data['url'],
            'processed_at': time.time(),
            'processing_time': random.uniform(0.5, 2)
        }
    
        return processed
    
    @app.task
    def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate processed data."""
        time.sleep(random.uniform(0.2, 0.8))
    
        # Simulate validation logic
        is_valid = random.random() > 0.1  # 90% success rate
    
        result = {
            'url': data['url'],
            'is_valid': is_valid,
            'validation_time': time.time(),
            'errors': [] if is_valid else ['Invalid data format'],
            'original_data': data
        }
    
        if not is_valid:
            raise ValueError(f"Data validation failed for {data['url']}")
    
        return result
    
    @app.task
    def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Aggregate multiple results."""
        time.sleep(random.uniform(0.5, 1))
    
        valid_results = [r for r in results if r.get('is_valid', False)]
        total_size = sum(r['original_data']['processed_size'] for r in valid_results)
    
        return {
            'total_items': len(results),
            'valid_items': len(valid_results),
            'total_size': total_size,
            'success_rate': len(valid_results) / len(results) if results else 0,
            'aggregated_at': time.time()
        }
    
    @app.task
    def send_notification(result: Dict[str, Any], recipient: str) -> str:
        """Send notification about workflow completion."""
        time.sleep(random.uniform(0.3, 0.7))
    
        message = f"""
        Workflow completed for {recipient}
        Total items: {result['total_items']}
        Valid items: {result['valid_items']}
        Success rate: {result['success_rate']:.2%}
        Total size: {result['total_size']:.2f}
        """
    
        return f"Notification sent to {recipient}: {message.strip()}"
    
    # Simple workflow chains
    def create_simple_chain(urls: List[str]) -> chain:
        """Create a simple sequential chain."""
        # Download → Process → Validate for each URL
        return chain(
            download_data.s(urls[0]),
            process_data.s(),
            validate_data.s()
        )
    
    def create_parallel_processing(urls: List[str]) -> group:
        """Create parallel processing workflow."""
        # Process multiple URLs in parallel
        return group(
            chain(
                download_data.s(url),
                process_data.s(),
                validate_data.s()
            ) for url in urls
        )
    
    def create_chord_workflow(urls: List[str], recipient: str) -> chord:
        """Create chord workflow (parallel + callback)."""
        # Process all URLs in parallel, then aggregate and notify
        parallel_tasks = group(
            chain(
                download_data.s(url),
                process_data.s(),
                validate_data.s()
            ) for url in urls
        )
    
        return chord(
            parallel_tasks,
            chain(
                aggregate_results.s(),
                send_notification.s(recipient)
            )
        )
    
    # Advanced workflow patterns
    class WorkflowBuilder:
        """Build complex workflows with error handling."""
    
        def __init__(self):
            self.tasks = []
            self.error_handlers = {}
    
        def add_task(self, task_sig, error_handler=None):
            """Add task to workflow."""
            self.tasks.append(task_sig)
            if error_handler:
                self.error_handlers[len(self.tasks) - 1] = error_handler
            return self
    
        def add_parallel_group(self, task_sigs, error_handler=None):
            """Add parallel group to workflow."""
            group_sig = group(task_sigs)
            return self.add_task(group_sig, error_handler)
    
        def build_chain(self):
            """Build chain from added tasks."""
            if not self.tasks:
                raise ValueError("No tasks added to workflow")
    
            return chain(*self.tasks)
    
        def build_resilient_chain(self):
            """Build chain with error handling."""
            # Implementation would include try/catch logic
            # This is a simplified version
            return self.build_chain()
    
    # Conditional workflow execution
    @app.task(bind=True)
    def conditional_task(self, data: Dict[str, Any], condition_func: str) -> Dict[str, Any]:
        """Execute task based on condition."""
    
        # Define condition functions
        conditions = {
            'size_check': lambda d: d.get('size', 0) > 5000,
            'url_check': lambda d: 'important' in d.get('url', ''),
            'time_check': lambda d: time.time() - d.get('timestamp', 0) < 3600,
        }
    
        condition = conditions.get(condition_func)
        if not condition:
            raise ValueError(f"Unknown condition: {condition_func}")
    
        if condition(data):
            # Execute additional processing
            time.sleep(random.uniform(1, 2))
            data['conditional_processing'] = True
            data['processed_at'] = time.time()
        else:
            data['conditional_processing'] = False
            data['skipped_reason'] = f"Failed condition: {condition_func}"
    
        return data
    
    def create_conditional_workflow(urls: List[str]) -> chain:
        """Create workflow with conditional execution."""
        return chain(
            # Download all data in parallel
            group(download_data.s(url) for url in urls),
    
            # Process each item conditionally
            conditional_task.s('size_check'),
    
            # Final aggregation
            aggregate_results.s()
        )
    
    # Workflow with retry and error handling
    @app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
    def resilient_download(self, url: str) -> Dict[str, Any]:
        """Download with automatic retry."""
        if random.random() < 0.2:  # 20% failure rate
            raise ConnectionError(f"Failed to download from {url}")
    
        return download_data(url)
    
    @app.task(bind=True)
    def error_recovery_task(self, failed_results: List[Any], backup_urls: List[str]) -> List[Dict[str, Any]]:
        """Recover from failed tasks using backup sources."""
        recovered_results = []
    
        for i, result in enumerate(failed_results):
            if isinstance(result, Exception) and i < len(backup_urls):
                # Try backup URL
                try:
                    backup_result = download_data(backup_urls[i])
                    recovered_results.append(backup_result)
                except Exception as e:
                    logging.error(f"Backup also failed: {e}")
                    recovered_results.append({'error': str(e), 'url': backup_urls[i]})
            else:
                recovered_results.append(result)
    
        return recovered_results
    
    # Dynamic workflow generation
    class DynamicWorkflowGenerator:
        """Generate workflows based on runtime conditions."""
    
        @staticmethod
        def generate_etl_workflow(config: Dict[str, Any]) -> Any:
            """Generate ETL workflow based on configuration."""
            source_type = config.get('source_type', 'url')
            processing_steps = config.get('processing_steps', [])
            output_format = config.get('output_format', 'json')
    
            workflow_tasks = []
    
            # Extract phase
            if source_type == 'url':
                extract_task = download_data.s(config['source'])
            elif source_type == 'file':
                extract_task = read_file.s(config['source'])
            else:
                raise ValueError(f"Unknown source type: {source_type}")
    
            workflow_tasks.append(extract_task)
    
            # Transform phase
            for step in processing_steps:
                if step == 'validate':
                    workflow_tasks.append(validate_data.s())
                elif step == 'process':
                    workflow_tasks.append(process_data.s())
                elif step == 'conditional':
                    workflow_tasks.append(conditional_task.s(config.get('condition', 'size_check')))
    
            # Load phase
            if output_format == 'json':
                workflow_tasks.append(save_as_json.s(config.get('output_path')))
            elif output_format == 'csv':
                workflow_tasks.append(save_as_csv.s(config.get('output_path')))
    
            return chain(*workflow_tasks)
    
        @staticmethod
        def generate_fan_out_fan_in(data_sources: List[str], aggregation_func: str) -> chord:
            """Generate fan-out/fan-in pattern."""
    
            # Fan-out: process each source independently
            fan_out_tasks = group(
                chain(
                    download_data.s(source),
                    process_data.s(),
                    validate_data.s()
                ) for source in data_sources
            )
    
            # Fan-in: aggregate results
            if aggregation_func == 'sum':
                fan_in_task = aggregate_results.s()
            elif aggregation_func == 'average':
                fan_in_task = calculate_average.s()
            else:
                fan_in_task = aggregate_results.s()
    
            return chord(fan_out_tasks, fan_in_task)
    
    # Workflow state management
    class WorkflowState:
        """Manage workflow state and progress."""
    
        def __init__(self, workflow_id: str):
            self.workflow_id = workflow_id
            self.start_time = time.time()
            self.tasks = {}
            self.current_step = 0
            self.total_steps = 0
            self.status = 'running'
    
        def add_task(self, task_id: str, task_name: str, status: str = 'pending'):
            """Add task to workflow state."""
            self.tasks[task_id] = {
                'name': task_name,
                'status': status,
                'start_time': None,
                'end_time': None,
                'result': None,
                'error': None
            }
            self.total_steps += 1
    
        def update_task(self, task_id: str, status: str, result: Any = None, error: str = None):
            """Update task status."""
            if task_id in self.tasks:
                task = self.tasks[task_id]
                task['status'] = status
    
                if status == 'started':
                    task['start_time'] = time.time()
                elif status in ['success', 'failure']:
                    task['end_time'] = time.time()
                    if result is not None:
                        task['result'] = result
                    if error:
                        task['error'] = error
    
                    if status == 'success':
                        self.current_step += 1
    
        def get_progress(self) -> Dict[str, Any]:
            """Get workflow progress."""
            completed_tasks = sum(1 for task in self.tasks.values() if task['status'] in ['success', 'failure'])
    
            return {
                'workflow_id': self.workflow_id,
                'status': self.status,
                'progress': completed_tasks / max(1, self.total_steps),
                'current_step': self.current_step,
                'total_steps': self.total_steps,
                'elapsed_time': time.time() - self.start_time,
                'tasks': self.tasks
            }
    
    # Workflow execution examples
    def execute_simple_workflow():
        """Execute simple workflow example."""
        urls = [
            'https://api.example.com/data/1',
            'https://api.example.com/data/2',
            'https://api.example.com/data/3'
        ]
    
        # Simple chain
        workflow = create_simple_chain(urls[:1])
        result = workflow.apply_async()
    
        print(f"Simple workflow started: {result.id}")
        return result
    
    def execute_parallel_workflow():
        """Execute parallel workflow example."""
        urls = [
            'https://api.example.com/data/1',
            'https://api.example.com/data/2',
            'https://api.example.com/data/3',
            'https://api.example.com/data/4'
        ]
    
        # Parallel processing
        workflow = create_parallel_processing(urls)
        result = workflow.apply_async()
    
        print(f"Parallel workflow started: {result.id}")
        return result
    
    def execute_chord_workflow():
        """Execute chord workflow example."""
        urls = [
            'https://api.example.com/data/1',
            'https://api.example.com/data/2',
            'https://api.example.com/data/3',
            'https://api.example.com/data/4',
            'https://api.example.com/data/5'
        ]
    
        # Chord workflow (parallel + callback)
        workflow = create_chord_workflow(urls, 'admin@example.com')
        result = workflow.apply_async()
    
        print(f"Chord workflow started: {result.id}")
        return result
    
    # Workflow monitoring
    def monitor_workflow(result):
        """Monitor workflow execution."""
        while not result.ready():
            if hasattr(result, 'children') and result.children:
                # Group or chord result
                completed = sum(1 for child in result.children if child.ready())
                total = len(result.children)
                print(f"Progress: {completed}/{total} tasks completed")
            else:
                # Single task result
                print(f"Status: {result.status}")
    
            time.sleep(2)
    
        if result.successful():
            print(f"Workflow completed successfully: {result.result}")
        else:
            print(f"Workflow failed: {result.result}")
    
        return result.result
    
    # Example usage
    if __name__ == '__main__':
        # Execute different workflow patterns
    
        print("=== Simple Workflow ===")
        simple_result = execute_simple_workflow()
        monitor_workflow(simple_result)
    
        print("\n=== Parallel Workflow ===")
        parallel_result = execute_parallel_workflow()
        monitor_workflow(parallel_result)
    
        print("\n=== Chord Workflow ===")
        chord_result = execute_chord_workflow()
        monitor_workflow(chord_result)
    Python

    This continues our comprehensive Celery book with advanced monitoring, debugging capabilities, and sophisticated workflow patterns. The examples demonstrate real-world scenarios with proper error handling, state management, and monitoring capabilities.


    Chapter 10: Performance Optimization

    Performance Analysis Framework

    graph TB
        subgraph "Performance Metrics"
            A[Throughput] --> E[Tasks/Second]
            B[Latency] --> F[Task Duration]
            C[Resource Usage] --> G[CPU/Memory]
            D[Queue Depth] --> H[Pending Tasks]
        end
    
        subgraph "Optimization Areas"
            I[Worker Optimization]
            J[Task Optimization]
            K[Broker Optimization]
            L[Network Optimization]
            M[Memory Optimization]
        end
    
        subgraph "Optimization Techniques"
            N[Connection Pooling]
            O[Batch Processing]
            P[Prefetching Control]
            Q[Serialization Tuning]
            R[Concurrency Tuning]
        end
    
        E --> I
        F --> J
        G --> K
        H --> L
    
        I --> N
        J --> O
        K --> P
        L --> Q
        M --> R
    
        style A fill:#e8f5e8
        style B fill:#e3f2fd
        style C fill:#fff3e0
        style D fill:#fce4ec

    Comprehensive Performance Optimization

    # performance_optimization.py
    """Comprehensive performance optimization techniques."""
    import time
    import psutil
    import threading
    import multiprocessing
    from collections import deque, defaultdict
    from contextlib import contextmanager
    from celery import Celery
    from celery.signals import task_prerun, task_postrun
    import logging
    
    app = Celery('performance_optimized')
    
    class PerformanceProfiler:
        """Profile task and worker performance."""
    
        def __init__(self):
            self.task_metrics = defaultdict(lambda: {
                'count': 0,
                'total_time': 0,
                'max_time': 0,
                'min_time': float('inf'),
                'memory_usage': deque(maxlen=100),
                'cpu_usage': deque(maxlen=100)
            })
            self.system_metrics = {
                'peak_memory': 0,
                'peak_cpu': 0,
                'total_tasks': 0
            }
            self.lock = threading.Lock()
    
        @contextmanager
        def profile_task(self, task_name):
            """Context manager for profiling task execution."""
            start_time = time.time()
            start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
            start_cpu = psutil.cpu_percent()
    
            try:
                yield
            finally:
                end_time = time.time()
                end_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
                end_cpu = psutil.cpu_percent()
    
                duration = end_time - start_time
                memory_used = end_memory - start_memory
                cpu_used = end_cpu - start_cpu
    
                with self.lock:
                    metrics = self.task_metrics[task_name]
                    metrics['count'] += 1
                    metrics['total_time'] += duration
                    metrics['max_time'] = max(metrics['max_time'], duration)
                    metrics['min_time'] = min(metrics['min_time'], duration)
                    metrics['memory_usage'].append(memory_used)
                    metrics['cpu_usage'].append(cpu_used)
    
                    self.system_metrics['peak_memory'] = max(
                        self.system_metrics['peak_memory'], end_memory
                    )
                    self.system_metrics['peak_cpu'] = max(
                        self.system_metrics['peak_cpu'], end_cpu
                    )
                    self.system_metrics['total_tasks'] += 1
    
        def get_task_stats(self, task_name):
            """Get performance statistics for a task."""
            with self.lock:
                metrics = self.task_metrics[task_name]
                if metrics['count'] == 0:
                    return None
    
                avg_time = metrics['total_time'] / metrics['count']
                avg_memory = sum(metrics['memory_usage']) / len(metrics['memory_usage']) if metrics['memory_usage'] else 0
                avg_cpu = sum(metrics['cpu_usage']) / len(metrics['cpu_usage']) if metrics['cpu_usage'] else 0
    
                return {
                    'count': metrics['count'],
                    'avg_time': avg_time,
                    'max_time': metrics['max_time'],
                    'min_time': metrics['min_time'] if metrics['min_time'] != float('inf') else 0,
                    'avg_memory_mb': avg_memory,
                    'avg_cpu_percent': avg_cpu,
                    'throughput': metrics['count'] / metrics['total_time'] if metrics['total_time'] > 0 else 0
                }
    
        def get_system_stats(self):
            """Get system-wide performance statistics."""
            return self.system_metrics.copy()
    
    # Global profiler instance
    profiler = PerformanceProfiler()
    
    # Optimized task implementations
    @app.task(bind=True)
    def optimized_data_processing(self, data_batch, chunk_size=1000):
        """Optimized task for processing large datasets."""
        task_name = self.name
    
        with profiler.profile_task(task_name):
            results = []
    
            # Process in chunks to manage memory
            for i in range(0, len(data_batch), chunk_size):
                chunk = data_batch[i:i + chunk_size]
    
                # Efficient processing logic
                chunk_results = [
                    process_single_item(item) for item in chunk
                ]
                results.extend(chunk_results)
    
                # Periodic garbage collection for large datasets
                if i % (chunk_size * 10) == 0:
                    import gc
                    gc.collect()
    
            return {
                'processed_count': len(results),
                'results': results[:100],  # Return only first 100 for memory efficiency
                'total_batches': len(data_batch) // chunk_size + 1
            }
    
    def process_single_item(item):
        """Optimized single item processing."""
        # Simulate processing with minimal memory allocation
        return {
            'id': item.get('id'),
            'processed': True,
            'value': item.get('value', 0) * 2
        }
    
    # Connection pooling optimization
    class OptimizedCeleryConfig:
        """Optimized Celery configuration for high performance."""
    
        # Broker optimizations
        broker_url = 'redis://localhost:6379/0'
        broker_pool_limit = 20  # Increase connection pool
        broker_connection_timeout = 30
        broker_connection_retry_on_startup = True
        broker_connection_max_retries = 5
    
        # Result backend optimizations
        result_backend = 'redis://localhost:6379/0'
        result_backend_transport_options = {
            'retry_on_timeout': True,
            'retry_policy': {'timeout': 5.0}
        }
    
        # Worker optimizations
        worker_prefetch_multiplier = 4  # Optimize based on task duration
        worker_max_tasks_per_child = 10000  # Prevent memory leaks
        worker_max_memory_per_child = 200000  # 200MB limit
    
        # Task optimizations
        task_compression = 'gzip'
        result_compression = 'gzip'
        task_serializer = 'json'
        result_serializer = 'json'
        accept_content = ['json']
    
        # Performance settings
        task_acks_late = True  # Better fault tolerance
        task_reject_on_worker_lost = True
        worker_disable_rate_limits = True  # For high throughput
    
        # Monitoring
        worker_send_task_events = True
        task_send_sent_event = True
    
    app.config_from_object(OptimizedCeleryConfig)
    
    # Batch processing optimization
    @app.task(bind=True)
    def batch_processor(self, items, batch_size=100):
        """Process items in optimized batches."""
        task_name = self.name
    
        with profiler.profile_task(task_name):
            total_items = len(items)
            processed_count = 0
            results = []
    
            for i in range(0, total_items, batch_size):
                batch = items[i:i + batch_size]
    
                # Process batch efficiently
                batch_results = process_batch_optimized(batch)
                results.extend(batch_results)
                processed_count += len(batch)
    
                # Update progress
                progress = processed_count / total_items
                self.update_state(
                    state='PROGRESS',
                    meta={'current': processed_count, 'total': total_items, 'progress': progress}
                )
    
                # Yield control periodically
                if i % (batch_size * 10) == 0:
                    time.sleep(0.001)  # Micro-sleep to yield
    
            return {
                'processed_count': processed_count,
                'success_rate': len([r for r in results if r.get('success')]) / processed_count,
                'total_time': time.time()
            }
    
    def process_batch_optimized(batch):
        """Optimized batch processing function."""
        # Use list comprehension for efficiency
        return [
            {
                'id': item['id'],
                'success': True,
                'result': item.get('value', 0) ** 2
            }
            for item in batch
            if item.get('value') is not None
        ]
    
    # Memory-efficient task for large files
    @app.task(bind=True)
    def process_large_file(self, file_path, chunk_size=8192):
        """Process large files with memory efficiency."""
        task_name = self.name
    
        with profiler.profile_task(task_name):
            processed_lines = 0
            processed_size = 0
    
            try:
                with open(file_path, 'r', buffering=chunk_size) as file:
                    while True:
                        chunk = file.read(chunk_size)
                        if not chunk:
                            break
    
                        # Process chunk
                        lines = chunk.count('\n')
                        processed_lines += lines
                        processed_size += len(chunk.encode('utf-8'))
    
                        # Update progress
                        self.update_state(
                            state='PROGRESS',
                            meta={
                                'lines_processed': processed_lines,
                                'bytes_processed': processed_size
                            }
                        )
    
            except IOError as e:
                raise Exception(f"Error processing file {file_path}: {e}")
    
            return {
                'file_path': file_path,
                'lines_processed': processed_lines,
                'bytes_processed': processed_size
            }
    
    # CPU-intensive task optimization
    @app.task(bind=True)
    def cpu_intensive_task(self, data, use_multiprocessing=True):
        """CPU-intensive task with multiprocessing optimization."""
        task_name = self.name
    
        with profiler.profile_task(task_name):
            if use_multiprocessing and len(data) > 1000:
                # Use multiprocessing for large datasets
                num_processes = min(multiprocessing.cpu_count(), 8)
                chunk_size = len(data) // num_processes
    
                with multiprocessing.Pool(processes=num_processes) as pool:
                    chunks = [
                        data[i:i + chunk_size]
                        for i in range(0, len(data), chunk_size)
                    ]
    
                    results = pool.map(cpu_intensive_function, chunks)
    
                    # Flatten results
                    final_result = []
                    for chunk_result in results:
                        final_result.extend(chunk_result)
    
                    return {
                        'processed_count': len(final_result),
                        'processes_used': num_processes,
                        'results': final_result
                    }
            else:
                # Single-threaded processing for smaller datasets
                result = cpu_intensive_function(data)
                return {
                    'processed_count': len(result),
                    'processes_used': 1,
                    'results': result
                }
    
    def cpu_intensive_function(data_chunk):
        """CPU-intensive processing function."""
        results = []
        for item in data_chunk:
            # Simulate CPU-intensive computation
            value = item.get('value', 1)
            computed = sum(i ** 2 for i in range(value % 1000))
            results.append({
                'id': item.get('id'),
                'computed_value': computed,
                'original_value': value
            })
        return results
    
    # I/O optimization with connection pooling
    import redis
    from contextlib import contextmanager
    
    class RedisConnectionPool:
        """Optimized Redis connection pool."""
    
        def __init__(self, host='localhost', port=6379, max_connections=20):
            self.pool = redis.ConnectionPool(
                host=host,
                port=port,
                max_connections=max_connections,
                retry_on_timeout=True,
                socket_keepalive=True,
                socket_keepalive_options={}
            )
    
        @contextmanager
        def get_connection(self):
            """Get Redis connection from pool."""
            client = redis.Redis(connection_pool=self.pool)
            try:
                yield client
            finally:
                client.close()
    
    # Global connection pool
    redis_pool = RedisConnectionPool()
    
    @app.task(bind=True)
    def io_optimized_task(self, keys_to_process):
        """I/O optimized task using connection pooling."""
        task_name = self.name
    
        with profiler.profile_task(task_name):
            results = []
    
            with redis_pool.get_connection() as redis_client:
                # Use pipeline for bulk operations
                pipe = redis_client.pipeline(transaction=False)
    
                # Batch Redis operations
                for key in keys_to_process:
                    pipe.get(key)
    
                # Execute all operations at once
                redis_results = pipe.execute()
    
                # Process results
                for i, value in enumerate(redis_results):
                    if value:
                        results.append({
                            'key': keys_to_process[i],
                            'value': value.decode('utf-8') if isinstance(value, bytes) else value,
                            'processed': True
                        })
    
            return {
                'processed_count': len(results),
                'total_keys': len(keys_to_process),
                'success_rate': len(results) / len(keys_to_process)
            }
    
    # Performance monitoring task
    @app.task
    def performance_report():
        """Generate performance report."""
        system_stats = profiler.get_system_stats()
    
        # Get stats for all tracked tasks
        task_stats = {}
        for task_name in profiler.task_metrics.keys():
            stats = profiler.get_task_stats(task_name)
            if stats:
                task_stats[task_name] = stats
    
        # System resource usage
        current_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        current_cpu = psutil.cpu_percent(interval=1)
    
        return {
            'timestamp': time.time(),
            'system_stats': system_stats,
            'current_memory_mb': current_memory,
            'current_cpu_percent': current_cpu,
            'task_performance': task_stats
        }
    
    # Performance optimization utilities
    class PerformanceOptimizer:
        """Utilities for performance optimization."""
    
        @staticmethod
        def optimize_worker_settings(task_type='mixed'):
            """Get optimized worker settings based on task type."""
            settings = {
                'cpu_bound': {
                    'pool': 'prefork',
                    'concurrency': multiprocessing.cpu_count(),
                    'prefetch_multiplier': 1,
                    'max_tasks_per_child': 1000,
                },
                'io_bound': {
                    'pool': 'eventlet',
                    'concurrency': 500,
                    'prefetch_multiplier': 10,
                    'max_tasks_per_child': 10000,
                },
                'mixed': {
                    'pool': 'threads',
                    'concurrency': multiprocessing.cpu_count() * 2,
                    'prefetch_multiplier': 4,
                    'max_tasks_per_child': 5000,
                }
            }
    
            return settings.get(task_type, settings['mixed'])
    
        @staticmethod
        def calculate_optimal_batch_size(item_size_bytes, available_memory_mb=1000):
            """Calculate optimal batch size based on memory constraints."""
            available_bytes = available_memory_mb * 1024 * 1024
            safety_factor = 0.8  # Use 80% of available memory
    
            optimal_batch_size = int((available_bytes * safety_factor) / item_size_bytes)
    
            # Ensure reasonable bounds
            return max(10, min(optimal_batch_size, 10000))
    
        @staticmethod
        def monitor_memory_usage():
            """Monitor current memory usage."""
            process = psutil.Process()
            memory_info = process.memory_info()
    
            return {
                'rss_mb': memory_info.rss / 1024 / 1024,
                'vms_mb': memory_info.vms / 1024 / 1024,
                'percent': process.memory_percent(),
                'available_mb': psutil.virtual_memory().available / 1024 / 1024
            }
    
    # Example optimization implementations
    def run_performance_comparison():
        """Compare performance of different implementations."""
    
        # Test data
        test_data = [{'id': i, 'value': i * 2} for i in range(10000)]
    
        print("=== Performance Comparison ===")
    
        # Test batch processing
        start_time = time.time()
        result1 = batch_processor.delay(test_data, batch_size=100)
        batch_time = time.time() - start_time
        print(f"Batch processing (100 items/batch): {batch_time:.2f}s")
    
        # Test larger batches
        start_time = time.time()
        result2 = batch_processor.delay(test_data, batch_size=1000)
        large_batch_time = time.time() - start_time
        print(f"Batch processing (1000 items/batch): {large_batch_time:.2f}s")
    
        # Wait for results and compare
        batch_result = result1.get()
        large_batch_result = result2.get()
    
        print(f"Small batches processed: {batch_result['processed_count']}")
        print(f"Large batches processed: {large_batch_result['processed_count']}")
    
        # Generate performance report
        report = performance_report.delay()
        print("Performance report:")
        print(report.get())
    
    if __name__ == '__main__':
        run_performance_comparison()
    Python

    Chapter 11: Security and Authentication

    Security Architecture

    graph TB
        subgraph "Security Layers"
            A[Transport Security] --> B[TLS/SSL Encryption]
            C[Message Security] --> D[Message Signing/Encryption]
            E[Access Control] --> F[Authentication & Authorization]
            G[Network Security] --> H[Firewalls & VPNs]
        end
    
        subgraph "Authentication Methods"
            I[Username/Password]
            J[Certificate-based]
            K[Token-based]
            L[SASL Authentication]
        end
    
        subgraph "Security Threats"
            M[Message Tampering]
            N[Unauthorized Access]
            O[Data Interception]
            P[Privilege Escalation]
        end
    
        subgraph "Mitigation Strategies"
            Q[Message Encryption]
            R[Access Controls]
            S[Secure Channels]
            T[Principle of Least Privilege]
        end
    
        B --> S
        D --> Q
        F --> R
        H --> S
    
        M --> Q
        N --> R
        O --> S
        P --> T
    
        style A fill:#e8f5e8
        style C fill:#e3f2fd
        style E fill:#fff3e0
        style G fill:#fce4ec

    Comprehensive Security Implementation

    # security.py
    """Comprehensive security implementation for Celery."""
    import os
    import ssl
    import hmac
    import hashlib
    import secrets
    import base64
    import json
    from datetime import datetime, timedelta
    from cryptography.fernet import Fernet
    from cryptography.hazmat.primitives import hashes, serialization
    from cryptography.hazmat.primitives.asymmetric import rsa, padding
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
    from celery import Celery
    from kombu.serialization import register
    import logging
    
    # Secure configuration class
    class SecureCeleryConfig:
        """Secure Celery configuration with authentication and encryption."""
    
        # Secure broker configuration with SSL
        broker_use_ssl = {
            'keyfile': '/path/to/client-key.pem',
            'certfile': '/path/to/client-cert.pem',
            'ca_certs': '/path/to/ca-cert.pem',
            'cert_reqs': ssl.CERT_REQUIRED,
            'ssl_version': ssl.PROTOCOL_TLSv1_2,
            'ciphers': 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
        }
    
        # Redis SSL configuration
        redis_backend_use_ssl = {
            'ssl_keyfile': '/path/to/client-key.pem',
            'ssl_certfile': '/path/to/client-cert.pem',
            'ssl_ca_certs': '/path/to/ca-cert.pem',
            'ssl_cert_reqs': ssl.CERT_REQUIRED,
        }
    
        # Secure serialization
        task_serializer = 'auth_json'
        accept_content = ['auth_json']
        result_serializer = 'auth_json'
    
        # Security settings
        worker_hijack_root_logger = False
        task_reject_on_worker_lost = True
    
        # Authentication
        broker_url = 'rediss://username:password@redis.example.com:6380/0'
        result_backend = 'rediss://username:password@redis.example.com:6380/0'
    
    class MessageSecurity:
        """Handle message encryption and authentication."""
    
        def __init__(self, secret_key=None):
            if secret_key is None:
                secret_key = os.environ.get('CELERY_SECRET_KEY')
                if not secret_key:
                    raise ValueError("CELERY_SECRET_KEY environment variable required")
    
            self.secret_key = secret_key.encode() if isinstance(secret_key, str) else secret_key
            self.fernet = self._create_cipher()
    
        def _create_cipher(self):
            """Create Fernet cipher from secret key."""
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                salt=b'celery_salt',  # In production, use a random salt
                iterations=100000,
            )
            key = base64.urlsafe_b64encode(kdf.derive(self.secret_key))
            return Fernet(key)
    
        def encrypt_message(self, data):
            """Encrypt message data."""
            try:
                json_data = json.dumps(data, sort_keys=True).encode()
                encrypted_data = self.fernet.encrypt(json_data)
                return base64.urlsafe_b64encode(encrypted_data).decode()
            except Exception as e:
                raise ValueError(f"Encryption failed: {e}")
    
        def decrypt_message(self, encrypted_data):
            """Decrypt message data."""
            try:
                encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
                decrypted_data = self.fernet.decrypt(encrypted_bytes)
                return json.loads(decrypted_data.decode())
            except Exception as e:
                raise ValueError(f"Decryption failed: {e}")
    
        def sign_message(self, data):
            """Create HMAC signature for message."""
            message = json.dumps(data, sort_keys=True).encode()
            signature = hmac.new(
                self.secret_key,
                message,
                hashlib.sha256
            ).hexdigest()
            return signature
    
        def verify_signature(self, data, signature):
            """Verify HMAC signature."""
            expected_signature = self.sign_message(data)
            return hmac.compare_digest(signature, expected_signature)
    
    # Global security instance
    message_security = MessageSecurity()
    
    # Custom serializer with authentication and encryption
    def secure_dumps(obj):
        """Serialize with encryption and authentication."""
        timestamp = datetime.utcnow().isoformat()
    
        # Add timestamp to prevent replay attacks
        message_data = {
            'payload': obj,
            'timestamp': timestamp,
            'version': '1.0'
        }
    
        # Create signature
        signature = message_security.sign_message(message_data)
    
        # Encrypt the entire message
        encrypted_message = message_security.encrypt_message(message_data)
    
        # Final message structure
        final_message = {
            'encrypted_data': encrypted_message,
            'signature': signature,
            'algorithm': 'AES-256-GCM',
            'auth': 'HMAC-SHA256'
        }
    
        return json.dumps(final_message)
    
    def secure_loads(data):
        """Deserialize with decryption and authentication."""
        try:
            message = json.loads(data)
    
            # Decrypt the message
            decrypted_data = message_security.decrypt_message(message['encrypted_data'])
    
            # Verify signature
            if not message_security.verify_signature(decrypted_data, message['signature']):
                raise ValueError("Message signature verification failed")
    
            # Check timestamp to prevent replay attacks (optional)
            timestamp = datetime.fromisoformat(decrypted_data['timestamp'])
            age = datetime.utcnow() - timestamp
            if age > timedelta(hours=1):  # 1 hour timeout
                raise ValueError("Message too old, possible replay attack")
    
            return decrypted_data['payload']
    
        except Exception as e:
            logging.error(f"Message deserialization failed: {e}")
            raise
    
    # Register custom secure serializer
    register('auth_json', secure_dumps, secure_loads,
             content_type='application/x-auth-json',
             content_encoding='utf-8')
    
    # Secure Celery app
    app = Celery('secure_app')
    app.config_from_object(SecureCeleryConfig)
    
    # Role-based access control
    class RoleBasedAccessControl:
        """Implement role-based access control for tasks."""
    
        def __init__(self):
            self.roles = {
                'admin': {
                    'permissions': ['*'],  # All permissions
                    'task_patterns': ['*']
                },
                'operator': {
                    'permissions': ['execute', 'monitor'],
                    'task_patterns': ['app.tasks.operations.*', 'app.tasks.monitoring.*']
                },
                'user': {
                    'permissions': ['execute'],
                    'task_patterns': ['app.tasks.user.*']
                },
                'readonly': {
                    'permissions': ['monitor'],
                    'task_patterns': ['app.tasks.monitoring.*']
                }
            }
    
            self.user_roles = {}  # Store user -> role mappings
    
        def assign_role(self, user_id, role):
            """Assign role to user."""
            if role not in self.roles:
                raise ValueError(f"Unknown role: {role}")
            self.user_roles[user_id] = role
    
        def check_permission(self, user_id, task_name, permission='execute'):
            """Check if user has permission to perform action on task."""
            user_role = self.user_roles.get(user_id)
            if not user_role:
                return False
    
            role_config = self.roles[user_role]
    
            # Check permission
            if '*' not in role_config['permissions'] and permission not in role_config['permissions']:
                return False
    
            # Check task pattern
            task_patterns = role_config['task_patterns']
            if '*' in task_patterns:
                return True
    
            for pattern in task_patterns:
                if pattern.endswith('*'):
                    if task_name.startswith(pattern[:-1]):
                        return True
                elif pattern == task_name:
                    return True
    
            return False
    
    # Global access control
    access_control = RoleBasedAccessControl()
    
    # Secure task decorator
    def secure_task(*args, **kwargs):
        """Decorator for secure tasks with access control."""
        def decorator(func):
            # Get user context from task
            def wrapper(self, *task_args, **task_kwargs):
                # Extract user context (in real implementation, this would come from authentication)
                user_id = task_kwargs.pop('_user_id', 'anonymous')
    
                # Check permissions
                if not access_control.check_permission(user_id, self.name, 'execute'):
                    raise PermissionError(f"User {user_id} not authorized to execute {self.name}")
    
                # Log access
                logging.info(f"User {user_id} executing task {self.name}")
    
                try:
                    return func(self, *task_args, **task_kwargs)
                except Exception as e:
                    logging.error(f"Task {self.name} failed for user {user_id}: {e}")
                    raise
    
            # Apply Celery task decorator
            return app.task(bind=True, *args, **kwargs)(wrapper)
    
        return decorator
    
    # Example secure tasks
    @secure_task
    def secure_user_task(self, user_data):
        """Secure task for user operations."""
        return f"Processing user data: {user_data.get('name', 'Unknown')}"
    
    @secure_task
    def secure_admin_task(self, admin_operation):
        """Secure task for admin operations."""
        return f"Executing admin operation: {admin_operation}"
    
    @secure_task
    def secure_financial_task(self, transaction_data):
        """Secure task for financial operations."""
        # Additional security for sensitive operations
        if not transaction_data.get('verified'):
            raise ValueError("Transaction not verified")
    
        return f"Processing transaction: {transaction_data['amount']}"
    
    # Certificate-based authentication
    class CertificateAuth:
        """Certificate-based authentication for workers and clients."""
    
        def __init__(self, ca_cert_path, client_cert_path, client_key_path):
            self.ca_cert_path = ca_cert_path
            self.client_cert_path = client_cert_path
            self.client_key_path = client_key_path
    
        def verify_certificate(self, cert_path):
            """Verify certificate against CA."""
            try:
                # Load CA certificate
                with open(self.ca_cert_path, 'rb') as f:
                    ca_cert = x509.load_pem_x509_certificate(f.read())
    
                # Load client certificate
                with open(cert_path, 'rb') as f:
                    client_cert = x509.load_pem_x509_certificate(f.read())
    
                # Verify certificate chain (simplified)
                ca_public_key = ca_cert.public_key()
    
                try:
                    ca_public_key.verify(
                        client_cert.signature,
                        client_cert.tbs_certificate_bytes,
                        padding.PKCS1v15(),
                        client_cert.signature_hash_algorithm
                    )
                    return True
                except Exception:
                    return False
    
            except Exception as e:
                logging.error(f"Certificate verification failed: {e}")
                return False
    
        def get_ssl_context(self):
            """Create SSL context for secure connections."""
            context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
            context.check_hostname = False  # For self-signed certificates
            context.verify_mode = ssl.CERT_REQUIRED
    
            # Load client certificate
            context.load_cert_chain(self.client_cert_path, self.client_key_path)
    
            # Load CA certificate
            context.load_verify_locations(self.ca_cert_path)
    
            return context
    
    # Audit logging
    class SecurityAuditLogger:
        """Security audit logging system."""
    
        def __init__(self, log_file='security_audit.log'):
            self.logger = logging.getLogger('security_audit')
            self.logger.setLevel(logging.INFO)
    
            # Create file handler
            handler = logging.FileHandler(log_file)
            handler.setLevel(logging.INFO)
    
            # Create formatter
            formatter = logging.Formatter(
                '%(asctime)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
    
            self.logger.addHandler(handler)
    
        def log_task_execution(self, user_id, task_name, status, details=None):
            """Log task execution for audit."""
            log_entry = {
                'event_type': 'task_execution',
                'user_id': user_id,
                'task_name': task_name,
                'status': status,
                'timestamp': datetime.utcnow().isoformat(),
                'details': details or {}
            }
    
            self.logger.info(json.dumps(log_entry))
    
        def log_authentication(self, user_id, method, success, ip_address=None):
            """Log authentication attempts."""
            log_entry = {
                'event_type': 'authentication',
                'user_id': user_id,
                'method': method,
                'success': success,
                'ip_address': ip_address,
                'timestamp': datetime.utcnow().isoformat()
            }
    
            self.logger.info(json.dumps(log_entry))
    
        def log_security_violation(self, user_id, violation_type, details):
            """Log security violations."""
            log_entry = {
                'event_type': 'security_violation',
                'user_id': user_id,
                'violation_type': violation_type,
                'details': details,
                'timestamp': datetime.utcnow().isoformat(),
                'severity': 'HIGH'
            }
    
            self.logger.warning(json.dumps(log_entry))
    
    # Global audit logger
    audit_logger = SecurityAuditLogger()
    
    # Security middleware
    from celery.signals import before_task_publish, task_prerun, task_postrun
    
    @before_task_publish.connect
    def secure_task_publish(sender=None, headers=None, body=None, properties=None, **kwargs):
        """Security check before task publish."""
        # Add security headers
        if headers is None:
            headers = {}
    
        headers['security_version'] = '1.0'
        headers['published_at'] = datetime.utcnow().isoformat()
    
    @task_prerun.connect
    def secure_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
        """Security check before task execution."""
        # Extract user context
        user_id = kwargs.get('_user_id', 'system') if kwargs else 'system'
    
        # Log task start
        audit_logger.log_task_execution(user_id, task, 'started')
    
    @task_postrun.connect
    def secure_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
        """Security logging after task execution."""
        # Extract user context
        user_id = kwargs.get('_user_id', 'system') if kwargs else 'system'
    
        # Log task completion
        audit_logger.log_task_execution(
            user_id, task, state,
            details={'task_id': task_id, 'return_value_type': type(retval).__name__}
        )
    
    # Example usage
    def setup_security():
        """Setup security configuration."""
        # Setup roles
        access_control.assign_role('admin_user', 'admin')
        access_control.assign_role('operator_user', 'operator')
        access_control.assign_role('regular_user', 'user')
    
        # Log security setup
        audit_logger.log_authentication('system', 'setup', True)
    
        print("Security configuration completed")
    
    if __name__ == '__main__':
        setup_security()
    
        # Example secure task execution
        try:
            result = secure_user_task.delay(
                {'name': 'John Doe', 'operation': 'profile_update'},
                _user_id='regular_user'
            )
            print(f"Task result: {result.get()}")
        except PermissionError as e:
            print(f"Permission denied: {e}")
    Python

    This continues our comprehensive Celery book with advanced performance optimization techniques and comprehensive security implementations. The examples demonstrate real-world production scenarios with proper security measures, authentication, encryption, and audit logging.


    Chapter 12: Production Deployment

    Production Architecture Design

    graph TB
        subgraph "Load Balancer Layer"
            LB[Load Balancer]
        end
    
        subgraph "Application Layer"
            A1[App Server 1]
            A2[App Server 2]
            A3[App Server N]
        end
    
        subgraph "Message Infrastructure"
            MB[Message Broker Cluster]
            RB[Result Backend Cluster]
        end
    
        subgraph "Worker Clusters"
            WC1[Worker Cluster 1CPU Tasks]
            WC2[Worker Cluster 2I/O Tasks]
            WC3[Worker Cluster 3Priority Tasks]
        end
    
        subgraph "Monitoring & Management"
            M1[Flower Monitoring]
            M2[Prometheus/Grafana]
            M3[Log Aggregation]
            M4[Alerting System]
        end
    
        subgraph "Storage Layer"
            DB[(Primary Database)]
            CACHE[(Cache Layer)]
            FS[File Storage]
        end
    
        LB --> A1
        LB --> A2
        LB --> A3
    
        A1 --> MB
        A2 --> MB
        A3 --> MB
    
        MB --> WC1
        MB --> WC2
        MB --> WC3
    
        WC1 --> RB
        WC2 --> RB
        WC3 --> RB
    
        WC1 --> DB
        WC2 --> CACHE
        WC3 --> FS
    
        MB -.-> M1
        WC1 -.-> M2
        WC2 -.-> M3
        WC3 -.-> M4
    
        style LB fill:#e8f5e8
        style MB fill:#e3f2fd
        style RB fill:#fff3e0
        style WC1 fill:#fce4ec
        style WC2 fill:#f3e5f5
        style WC3 fill:#e1f5fe

    Production Deployment Strategy

    # production_deployment.py
    """Production deployment configuration and management."""
    import os
    import logging
    import signal
    import sys
    from pathlib import Path
    from celery import Celery
    from kombu import Queue, Exchange
    import redis.sentinel
    import psutil
    
    class ProductionConfig:
        """Production-ready Celery configuration."""
    
        # Environment-based configuration
        ENVIRONMENT = os.environ.get('ENVIRONMENT', 'production')
    
        # Redis Sentinel for high availability
        REDIS_SENTINELS = [
            ('sentinel1.example.com', 26379),
            ('sentinel2.example.com', 26379),
            ('sentinel3.example.com', 26379),
        ]
        REDIS_MASTER_NAME = 'mymaster'
        REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD')
    
        # Broker configuration with failover
        broker_url = f'sentinel://:{REDIS_PASSWORD}@sentinel1.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel2.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel3.example.com:26379'
        broker_transport_options = {
            'sentinels': REDIS_SENTINELS,
            'password': REDIS_PASSWORD,
            'service_name': REDIS_MASTER_NAME,
            'socket_keepalive': True,
            'socket_keepalive_options': {
                'TCP_KEEPIDLE': 1,
                'TCP_KEEPINTVL': 3,
                'TCP_KEEPCNT': 5,
            },
            'retry_on_timeout': True,
            'health_check_interval': 30,
        }
    
        # Result backend with failover
        result_backend = f'sentinel://:{REDIS_PASSWORD}@sentinel1.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel2.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel3.example.com:26379'
        result_backend_transport_options = broker_transport_options
    
        # Connection settings
        broker_pool_limit = 20
        broker_connection_timeout = 30
        broker_connection_retry_on_startup = True
        broker_connection_max_retries = 10
    
        # Worker settings
        worker_prefetch_multiplier = 1
        worker_max_tasks_per_child = 1000
        worker_max_memory_per_child = 200000  # 200MB
        worker_disable_rate_limits = False
        worker_proc_alive_timeout = 60
    
        # Task settings
        task_serializer = 'json'
        result_serializer = 'json'
        accept_content = ['json']
        task_compression = 'gzip'
        result_compression = 'gzip'
    
        # Reliability settings
        task_acks_late = True
        task_reject_on_worker_lost = True
        task_track_started = True
    
        # Timeout settings
        task_time_limit = 3600  # 1 hour
        task_soft_time_limit = 3300  # 55 minutes
    
        # Result settings
        result_expires = 86400  # 24 hours
        result_persistent = True
    
        # Monitoring
        worker_send_task_events = True
        task_send_sent_event = True
    
        # Security
        worker_hijack_root_logger = False
    
        # Queue configuration for different environments
        if ENVIRONMENT == 'production':
            # Production queues with priority and routing
            task_queues = (
                Queue('critical', Exchange('critical', type='direct'), routing_key='critical',
                      queue_arguments={'x-max-priority': 10}),
                Queue('high', Exchange('high', type='direct'), routing_key='high',
                      queue_arguments={'x-max-priority': 8}),
                Queue('normal', Exchange('normal', type='direct'), routing_key='normal',
                      queue_arguments={'x-max-priority': 5}),
                Queue('low', Exchange('low', type='direct'), routing_key='low',
                      queue_arguments={'x-max-priority': 2}),
                Queue('email', Exchange('email', type='direct'), routing_key='email'),
                Queue('reports', Exchange('reports', type='direct'), routing_key='reports'),
                Queue('background', Exchange('background', type='direct'), routing_key='background'),
            )
    
            task_routes = {
                'app.tasks.critical.*': {'queue': 'critical', 'priority': 9},
                'app.tasks.notifications.*': {'queue': 'email'},
                'app.tasks.reports.*': {'queue': 'reports'},
                'app.tasks.background.*': {'queue': 'background', 'priority': 1},
            }
    
        # Logging configuration
        worker_log_level = 'INFO'
        worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
        worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
    
    class DeploymentManager:
        """Manage production deployment lifecycle."""
    
        def __init__(self, app_name='celery_app'):
            self.app_name = app_name
            self.app = Celery(app_name)
            self.app.config_from_object(ProductionConfig)
            self.logger = self._setup_logging()
    
        def _setup_logging(self):
            """Setup production logging."""
            logger = logging.getLogger(self.app_name)
            logger.setLevel(logging.INFO)
    
            # Console handler
            console_handler = logging.StreamHandler()
            console_handler.setLevel(logging.INFO)
    
            # File handler
            log_file = f'/var/log/{self.app_name}/celery.log'
            os.makedirs(os.path.dirname(log_file), exist_ok=True)
            file_handler = logging.FileHandler(log_file)
            file_handler.setLevel(logging.INFO)
    
            # Formatter
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            console_handler.setFormatter(formatter)
            file_handler.setFormatter(formatter)
    
            logger.addHandler(console_handler)
            logger.addHandler(file_handler)
    
            return logger
    
        def health_check(self):
            """Perform health check of the Celery system."""
            try:
                # Check broker connectivity
                inspect = self.app.control.inspect()
                stats = inspect.stats()
    
                if not stats:
                    return {'status': 'unhealthy', 'reason': 'No workers available'}
    
                # Check worker health
                unhealthy_workers = []
                for worker, worker_stats in stats.items():
                    if worker_stats.get('pool', {}).get('max-concurrency', 0) == 0:
                        unhealthy_workers.append(worker)
    
                if unhealthy_workers:
                    return {
                        'status': 'degraded',
                        'reason': f'Unhealthy workers: {unhealthy_workers}'
                    }
    
                # Check queue lengths
                active_queues = inspect.active_queues()
                high_queue_lengths = {}
    
                for worker, queues in active_queues.items():
                    for queue_info in queues:
                        # In production, implement actual queue length checking
                        # This is a simplified version
                        pass
    
                return {
                    'status': 'healthy',
                    'workers': len(stats),
                    'timestamp': os.time.time()
                }
    
            except Exception as e:
                self.logger.error(f"Health check failed: {e}")
                return {'status': 'unhealthy', 'reason': str(e)}
    
        def graceful_shutdown(self, signum=None, frame=None):
            """Implement graceful shutdown."""
            self.logger.info("Initiating graceful shutdown...")
    
            try:
                # Stop accepting new tasks
                self.app.control.cancel_consumer('*')
    
                # Wait for current tasks to complete
                inspect = self.app.control.inspect()
                active_tasks = inspect.active()
    
                if active_tasks:
                    self.logger.info(f"Waiting for {len(active_tasks)} active tasks to complete...")
                    # Implementation would wait for tasks to finish
                    # with a timeout
    
                # Shutdown workers
                self.app.control.shutdown()
    
                self.logger.info("Graceful shutdown completed")
    
            except Exception as e:
                self.logger.error(f"Error during graceful shutdown: {e}")
    
            finally:
                sys.exit(0)
    
        def setup_signal_handlers(self):
            """Setup signal handlers for graceful shutdown."""
            signal.signal(signal.SIGTERM, self.graceful_shutdown)
            signal.signal(signal.SIGINT, self.graceful_shutdown)
    
        def start_worker(self, queue_config):
            """Start worker with specific configuration."""
            self.setup_signal_handlers()
    
            worker_args = [
                '--app', self.app_name,
                '--loglevel', queue_config.get('loglevel', 'INFO'),
                '--concurrency', str(queue_config.get('concurrency', 4)),
                '--pool', queue_config.get('pool', 'prefork'),
            ]
    
            if queue_config.get('queues'):
                worker_args.extend(['--queues', ','.join(queue_config['queues'])])
    
            if queue_config.get('hostname'):
                worker_args.extend(['--hostname', queue_config['hostname']])
    
            self.logger.info(f"Starting worker with args: {worker_args}")
    
            # In production, this would use the Celery worker
            # self.app.worker_main(worker_args)
    
    class DockerDeployment:
        """Docker-based deployment configuration."""
    
        @staticmethod
        def generate_dockerfile():
            """Generate Dockerfile for production deployment."""
            dockerfile_content = """
    FROM python:3.11-slim
    
    # Set environment variables
    ENV PYTHONUNBUFFERED=1
    ENV PYTHONDONTWRITEBYTECODE=1
    ENV CELERY_ENV=production
    
    # Install system dependencies
    RUN apt-get update && apt-get install -y \\
        build-essential \\
        libpq-dev \\
        && rm -rf /var/lib/apt/lists/*
    
    # Create app directory
    WORKDIR /app
    
    # Copy requirements and install Python dependencies
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    # Copy application code
    COPY . .
    
    # Create non-root user
    RUN groupadd -r celery && useradd -r -g celery celery
    RUN chown -R celery:celery /app
    USER celery
    
    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \\
        CMD python -c "from health_check import check_health; exit(0 if check_health() else 1)"
    
    # Default command
    CMD ["celery", "worker", "--app=app.celery", "--loglevel=info"]
    """
            return dockerfile_content
    
        @staticmethod
        def generate_docker_compose():
            """Generate docker-compose.yml for production."""
            compose_content = """
    version: '3.8'
    
    services:
      redis-master:
        image: redis:7-alpine
        command: redis-server --requirepass ${REDIS_PASSWORD}
        volumes:
          - redis_data:/data
        ports:
          - "6379:6379"
        environment:
          - REDIS_PASSWORD=${REDIS_PASSWORD}
        healthcheck:
          test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
          interval: 30s
          timeout: 10s
          retries: 3
    
      redis-sentinel:
        image: redis:7-alpine
        command: redis-sentinel /etc/redis/sentinel.conf
        volumes:
          - ./sentinel.conf:/etc/redis/sentinel.conf
        depends_on:
          - redis-master
        deploy:
          replicas: 3
    
      celery-worker-high:
        build: .
        command: celery worker --app=app.celery --loglevel=info --queues=critical,high --concurrency=4
        environment:
          - CELERY_ENV=production
          - REDIS_PASSWORD=${REDIS_PASSWORD}
        depends_on:
          - redis-master
        volumes:
          - ./logs:/var/log/celery
        deploy:
          replicas: 2
          resources:
            limits:
              memory: 512M
            reservations:
              memory: 256M
    
      celery-worker-normal:
        build: .
        command: celery worker --app=app.celery --loglevel=info --queues=normal,email --concurrency=8
        environment:
          - CELERY_ENV=production
          - REDIS_PASSWORD=${REDIS_PASSWORD}
        depends_on:
          - redis-master
        volumes:
          - ./logs:/var/log/celery
        deploy:
          replicas: 3
          resources:
            limits:
              memory: 1G
            reservations:
              memory: 512M
    
      celery-worker-background:
        build: .
        command: celery worker --app=app.celery --loglevel=info --queues=background,reports --concurrency=2
        environment:
          - CELERY_ENV=production
          - REDIS_PASSWORD=${REDIS_PASSWORD}
        depends_on:
          - redis-master
        volumes:
          - ./logs:/var/log/celery
        deploy:
          replicas: 1
          resources:
            limits:
              memory: 256M
            reservations:
              memory: 128M
    
      flower:
        build: .
        command: celery flower --app=app.celery --port=5555
        ports:
          - "5555:5555"
        environment:
          - CELERY_ENV=production
          - REDIS_PASSWORD=${REDIS_PASSWORD}
        depends_on:
          - redis-master
    
      prometheus:
        image: prom/prometheus:latest
        ports:
          - "9090:9090"
        volumes:
          - ./prometheus.yml:/etc/prometheus/prometheus.yml
          - prometheus_data:/prometheus
    
      grafana:
        image: grafana/grafana:latest
        ports:
          - "3000:3000"
        environment:
          - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
        volumes:
          - grafana_data:/var/lib/grafana
    
    volumes:
      redis_data:
      prometheus_data:
      grafana_data:
    """
            return compose_content
    
    class KubernetesDeployment:
        """Kubernetes deployment configuration."""
    
        @staticmethod
        def generate_deployment_yaml():
            """Generate Kubernetes deployment YAML."""
            deployment_yaml = """
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: celery-worker-high
      labels:
        app: celery-worker
        tier: high
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: celery-worker
          tier: high
      template:
        metadata:
          labels:
            app: celery-worker
            tier: high
        spec:
          containers:
          - name: celery-worker
            image: myapp/celery:latest
            command: ["celery", "worker"]
            args:
              - "--app=app.celery"
              - "--loglevel=info"
              - "--queues=critical,high"
              - "--concurrency=4"
            env:
            - name: CELERY_ENV
              value: "production"
            - name: REDIS_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: redis-secret
                  key: password
            resources:
              requests:
                memory: "256Mi"
                cpu: "250m"
              limits:
                memory: "512Mi"
                cpu: "500m"
            livenessProbe:
              exec:
                command:
                  - celery
                  - inspect
                  - ping
                  - -A
                  - app.celery
              initialDelaySeconds: 30
              periodSeconds: 60
            readinessProbe:
              exec:
                command:
                  - celery
                  - inspect
                  - active
                  - -A
                  - app.celery
              initialDelaySeconds: 15
              periodSeconds: 30
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: celery-worker-service
    spec:
      selector:
        app: celery-worker
      ports:
        - protocol: TCP
          port: 80
          targetPort: 5555
    ---
    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
      name: celery-worker-hpa
    spec:
      scaleTargetRef:
        apiVersion: apps/v1
        kind: Deployment
        name: celery-worker-high
      minReplicas: 2
      maxReplicas: 10
      metrics:
      - type: Resource
        resource:
          name: cpu
          target:
            type: Utilization
            averageUtilization: 70
      - type: Resource
        resource:
          name: memory
          target:
            type: Utilization
            averageUtilization: 80
    """
            return deployment_yaml
    
    # Monitoring and alerting for production
    class ProductionMonitoring:
        """Production monitoring and alerting system."""
    
        def __init__(self, deployment_manager):
            self.deployment_manager = deployment_manager
            self.metrics = {
                'task_success_rate': 0.0,
                'average_task_duration': 0.0,
                'queue_lengths': {},
                'worker_count': 0,
                'system_resources': {}
            }
    
        def collect_metrics(self):
            """Collect system metrics."""
            try:
                inspect = self.deployment_manager.app.control.inspect()
    
                # Worker statistics
                stats = inspect.stats()
                self.metrics['worker_count'] = len(stats) if stats else 0
    
                # Queue statistics
                active_queues = inspect.active_queues()
                if active_queues:
                    for worker, queues in active_queues.items():
                        for queue_info in queues:
                            queue_name = queue_info['name']
                            # In production, get actual queue lengths from broker
                            self.metrics['queue_lengths'][queue_name] = 0
    
                # System resources
                self.metrics['system_resources'] = {
                    'cpu_percent': psutil.cpu_percent(interval=1),
                    'memory_percent': psutil.virtual_memory().percent,
                    'disk_percent': psutil.disk_usage('/').percent
                }
    
                return self.metrics
    
            except Exception as e:
                self.deployment_manager.logger.error(f"Failed to collect metrics: {e}")
                return None
    
        def check_alerts(self):
            """Check for alert conditions."""
            alerts = []
    
            # High CPU usage
            if self.metrics['system_resources'].get('cpu_percent', 0) > 80:
                alerts.append({
                    'severity': 'warning',
                    'message': f"High CPU usage: {self.metrics['system_resources']['cpu_percent']:.1f}%"
                })
    
            # High memory usage
            if self.metrics['system_resources'].get('memory_percent', 0) > 85:
                alerts.append({
                    'severity': 'warning',
                    'message': f"High memory usage: {self.metrics['system_resources']['memory_percent']:.1f}%"
                })
    
            # No workers available
            if self.metrics['worker_count'] == 0:
                alerts.append({
                    'severity': 'critical',
                    'message': "No Celery workers available"
                })
    
            # High queue lengths
            for queue_name, length in self.metrics['queue_lengths'].items():
                if length > 1000:  # Threshold
                    alerts.append({
                        'severity': 'warning',
                        'message': f"High queue length for {queue_name}: {length}"
                    })
    
            return alerts
    
    # Production deployment scripts
    def deploy_production():
        """Deploy to production environment."""
        print("Deploying Celery application to production...")
    
        # Initialize deployment manager
        deployment_manager = DeploymentManager('production_app')
    
        # Perform health check before deployment
        health = deployment_manager.health_check()
        if health['status'] == 'unhealthy':
            print(f"Pre-deployment health check failed: {health['reason']}")
            return False
    
        # Deploy workers
        worker_configs = [
            {
                'name': 'critical-worker',
                'queues': ['critical'],
                'concurrency': 2,
                'pool': 'prefork',
                'loglevel': 'INFO'
            },
            {
                'name': 'high-worker',
                'queues': ['high'],
                'concurrency': 4,
                'pool': 'prefork',
                'loglevel': 'INFO'
            },
            {
                'name': 'normal-worker',
                'queues': ['normal', 'email'],
                'concurrency': 8,
                'pool': 'eventlet',
                'loglevel': 'INFO'
            },
            {
                'name': 'background-worker',
                'queues': ['background', 'reports'],
                'concurrency': 2,
                'pool': 'prefork',
                'loglevel': 'WARNING'
            }
        ]
    
        for config in worker_configs:
            print(f"Starting {config['name']}...")
            # In production, this would start actual workers
            # deployment_manager.start_worker(config)
    
        # Setup monitoring
        monitoring = ProductionMonitoring(deployment_manager)
    
        print("Production deployment completed successfully!")
        return True
    
    if __name__ == '__main__':
        deploy_production()
    Python

    Chapter 13: Real-World Case Studies

    Case Study 1: E-commerce Order Processing System

    graph TB
        subgraph "Order Processing Flow"
            A[Customer Order] --> B[Order Validation]
            B --> C[Inventory Check]
            C --> D[Payment Processing]
            D --> E[Order Fulfillment]
            E --> F[Shipping Notification]
            F --> G[Order Complete]
        end
    
        subgraph "Celery Tasks"
            H[validate_order_task]
            I[check_inventory_task]
            J[process_payment_task]
            K[fulfill_order_task]
            L[send_notification_task]
            M[update_analytics_task]
        end
    
        subgraph "Error Handling"
            N[Payment Failed] --> O[Refund Process]
            P[Inventory Shortage] --> Q[Backorder Process]
            R[Shipping Error] --> S[Customer Support Alert]
        end
    
        B --> H
        C --> I
        D --> J
        E --> K
        F --> L
        G --> M
    
        J --> N
        I --> P
        K --> R
    
        style A fill:#e8f5e8
        style G fill:#e8f5e8
        style N fill:#ffebee
        style P fill:#ffebee
        style R fill:#ffebee

    E-commerce Implementation

    # ecommerce_case_study.py
    """
    E-commerce order processing system using Celery.
    Handles complex workflows with error recovery and monitoring.
    """
    from celery import Celery, chain, group, chord, signature
    from celery.exceptions import Retry
    import time
    import random
    import logging
    from datetime import datetime, timedelta
    from dataclasses import dataclass
    from typing import List, Dict, Any, Optional
    import json
    
    # Configuration
    app = Celery('ecommerce_orders')
    app.conf.update(
        broker_url='redis://localhost:6379/0',
        result_backend='redis://localhost:6379/0',
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        task_routes={
            'ecommerce.orders.critical.*': {'queue': 'critical'},
            'ecommerce.orders.payment.*': {'queue': 'payment'},
            'ecommerce.orders.fulfillment.*': {'queue': 'fulfillment'},
            'ecommerce.orders.notifications.*': {'queue': 'notifications'},
        }
    )
    
    @dataclass
    class OrderItem:
        product_id: str
        quantity: int
        price: float
        variant_id: Optional[str] = None
    
    @dataclass
    class Order:
        order_id: str
        customer_id: str
        items: List[OrderItem]
        shipping_address: Dict[str, str]
        payment_method: Dict[str, str]
        total_amount: float
        currency: str = 'USD'
        status: str = 'pending'
        created_at: datetime = None
    
    class OrderProcessingError(Exception):
        """Custom exception for order processing errors."""
        pass
    
    class PaymentError(OrderProcessingError):
        """Payment-specific errors."""
        pass
    
    class InventoryError(OrderProcessingError):
        """Inventory-specific errors."""
        pass
    
    # Core order processing tasks
    @app.task(bind=True, name='ecommerce.orders.critical.validate_order')
    def validate_order_task(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate order data and customer information."""
        try:
            order = Order(**order_data)
    
            # Simulate validation logic
            time.sleep(random.uniform(0.1, 0.5))
    
            # Validation checks
            if order.total_amount <= 0:
                raise OrderProcessingError("Invalid order total")
    
            if not order.customer_id:
                raise OrderProcessingError("Missing customer information")
    
            if not order.items:
                raise OrderProcessingError("Order has no items")
    
            # Customer verification (simulate external API call)
            if random.random() < 0.05:  # 5% failure rate
                raise OrderProcessingError("Customer verification failed")
    
            # Update order status
            order.status = 'validated'
    
            logging.info(f"Order {order.order_id} validated successfully")
    
            return {
                'order_id': order.order_id,
                'status': order.status,
                'validated_at': datetime.now().isoformat(),
                'order_data': order_data
            }
    
        except Exception as exc:
            logging.error(f"Order validation failed for {order_data.get('order_id')}: {exc}")
    
            # Retry logic for temporary failures
            if isinstance(exc, OrderProcessingError) and "verification failed" in str(exc):
                if self.request.retries < 3:
                    raise self.retry(countdown=60, exc=exc)
    
            raise
    
    @app.task(bind=True, name='ecommerce.orders.fulfillment.check_inventory')
    def check_inventory_task(self, validated_order: Dict[str, Any]) -> Dict[str, Any]:
        """Check inventory availability for all order items."""
        try:
            order_data = validated_order['order_data']
            order = Order(**order_data)
    
            inventory_results = []
            total_available = True
    
            for item in order.items:
                # Simulate inventory check
                time.sleep(random.uniform(0.1, 0.3))
    
                # Simulate occasional inventory shortage
                available_quantity = random.randint(0, item.quantity + 5)
                is_available = available_quantity >= item.quantity
    
                if not is_available:
                    total_available = False
    
                inventory_results.append({
                    'product_id': item.product_id,
                    'requested_quantity': item.quantity,
                    'available_quantity': available_quantity,
                    'is_available': is_available
                })
    
            if not total_available:
                # Handle partial inventory
                shortage_items = [
                    item for item in inventory_results
                    if not item['is_available']
                ]
    
                raise InventoryError(f"Insufficient inventory for items: {shortage_items}")
    
            # Reserve inventory
            reservation_id = f"res_{order.order_id}_{int(time.time())}"
    
            logging.info(f"Inventory reserved for order {order.order_id}")
    
            return {
                'order_id': order.order_id,
                'inventory_status': 'reserved',
                'reservation_id': reservation_id,
                'inventory_results': inventory_results,
                'reserved_at': datetime.now().isoformat(),
                'order_data': order_data
            }
    
        except InventoryError as exc:
            logging.warning(f"Inventory shortage for order {validated_order['order_id']}: {exc}")
    
            # Trigger backorder process
            handle_inventory_shortage.delay(validated_order, str(exc))
            raise
    
        except Exception as exc:
            logging.error(f"Inventory check failed for order {validated_order['order_id']}: {exc}")
    
            if self.request.retries < 2:
                raise self.retry(countdown=30, exc=exc)
            raise
    
    @app.task(bind=True, name='ecommerce.orders.payment.process_payment')
    def process_payment_task(self, inventory_result: Dict[str, Any]) -> Dict[str, Any]:
        """Process payment for the order."""
        try:
            order_data = inventory_result['order_data']
            order = Order(**order_data)
    
            # Simulate payment processing
            time.sleep(random.uniform(1, 3))
    
            # Simulate payment gateway interaction
            payment_success_rate = 0.95  # 95% success rate
    
            if random.random() > payment_success_rate:
                error_types = ['card_declined', 'insufficient_funds', 'expired_card', 'gateway_error']
                error_type = random.choice(error_types)
                raise PaymentError(f"Payment failed: {error_type}")
    
            # Generate transaction details
            transaction_id = f"txn_{order.order_id}_{int(time.time())}"
    
            payment_result = {
                'transaction_id': transaction_id,
                'amount': order.total_amount,
                'currency': order.currency,
                'payment_method': order.payment_method.get('type', 'card'),
                'status': 'completed',
                'processed_at': datetime.now().isoformat()
            }
    
            logging.info(f"Payment processed for order {order.order_id}: {transaction_id}")
    
            return {
                'order_id': order.order_id,
                'payment_status': 'completed',
                'payment_result': payment_result,
                'reservation_id': inventory_result['reservation_id'],
                'order_data': order_data
            }
    
        except PaymentError as exc:
            logging.error(f"Payment failed for order {inventory_result['order_id']}: {exc}")
    
            # Release inventory reservation
            release_inventory_reservation.delay(inventory_result['reservation_id'])
    
            # Handle payment failure
            handle_payment_failure.delay(inventory_result, str(exc))
            raise
    
        except Exception as exc:
            logging.error(f"Payment processing error for order {inventory_result['order_id']}: {exc}")
    
            if self.request.retries < 2:
                raise self.retry(countdown=60, exc=exc)
            raise
    
    @app.task(bind=True, name='ecommerce.orders.fulfillment.fulfill_order')
    def fulfill_order_task(self, payment_result: Dict[str, Any]) -> Dict[str, Any]:
        """Fulfill the order by preparing items for shipment."""
        try:
            order_data = payment_result['order_data']
            order = Order(**order_data)
    
            # Simulate fulfillment process
            time.sleep(random.uniform(2, 5))
    
            fulfillment_items = []
            for item in order.items:
                # Simulate item picking and packing
                fulfillment_items.append({
                    'product_id': item.product_id,
                    'quantity': item.quantity,
                    'picked_at': datetime.now().isoformat(),
                    'packed': True
                })
    
            # Generate shipment details
            tracking_number = f"TRK{order.order_id}{random.randint(1000, 9999)}"
            estimated_delivery = datetime.now() + timedelta(days=random.randint(2, 7))
    
            fulfillment_result = {
                'tracking_number': tracking_number,
                'estimated_delivery': estimated_delivery.isoformat(),
                'fulfillment_items': fulfillment_items,
                'fulfilled_at': datetime.now().isoformat(),
                'carrier': random.choice(['UPS', 'FedEx', 'DHL', 'USPS'])
            }
    
            logging.info(f"Order {order.order_id} fulfilled: {tracking_number}")
    
            return {
                'order_id': order.order_id,
                'fulfillment_status': 'completed',
                'fulfillment_result': fulfillment_result,
                'payment_result': payment_result['payment_result'],
                'order_data': order_data
            }
    
        except Exception as exc:
            logging.error(f"Fulfillment failed for order {payment_result['order_id']}: {exc}")
    
            if self.request.retries < 2:
                raise self.retry(countdown=120, exc=exc)
            raise
    
    @app.task(name='ecommerce.orders.notifications.send_confirmation')
    def send_order_confirmation(fulfillment_result: Dict[str, Any]) -> Dict[str, Any]:
        """Send order confirmation and shipping notification."""
        try:
            order_data = fulfillment_result['order_data']
            order = Order(**order_data)
            fulfillment = fulfillment_result['fulfillment_result']
    
            # Simulate email sending
            time.sleep(random.uniform(0.5, 1.5))
    
            notification_data = {
                'order_id': order.order_id,
                'customer_id': order.customer_id,
                'tracking_number': fulfillment['tracking_number'],
                'estimated_delivery': fulfillment['estimated_delivery'],
                'total_amount': order.total_amount,
                'notification_sent_at': datetime.now().isoformat()
            }
    
            logging.info(f"Order confirmation sent for {order.order_id}")
    
            return {
                'order_id': order.order_id,
                'notification_status': 'sent',
                'notification_data': notification_data,
                'final_status': 'completed'
            }
    
        except Exception as exc:
            logging.error(f"Failed to send confirmation for order {fulfillment_result['order_id']}: {exc}")
            raise
    
    # Error handling tasks
    @app.task(name='ecommerce.orders.fulfillment.handle_inventory_shortage')
    def handle_inventory_shortage(order_data: Dict[str, Any], error_message: str):
        """Handle inventory shortage by creating backorder."""
        try:
            order_id = order_data['order_id']
    
            # Create backorder
            backorder_id = f"BO_{order_id}_{int(time.time())}"
    
            # Notify customer about backorder
            send_backorder_notification.delay(order_data, backorder_id)
    
            # Schedule inventory recheck
            recheck_inventory.apply_async(
                args=[order_data, backorder_id],
                countdown=3600  # Recheck in 1 hour
            )
    
            logging.info(f"Backorder created for {order_id}: {backorder_id}")
    
            return {'backorder_id': backorder_id, 'status': 'backordered'}
    
        except Exception as exc:
            logging.error(f"Failed to handle inventory shortage: {exc}")
            raise
    
    @app.task(name='ecommerce.orders.payment.handle_payment_failure')
    def handle_payment_failure(order_data: Dict[str, Any], error_message: str):
        """Handle payment failure."""
        try:
            order_id = order_data['order_id']
    
            # Send payment failure notification
            send_payment_failure_notification.delay(order_data, error_message)
    
            # Log for fraud analysis
            log_payment_failure.delay(order_data, error_message)
    
            logging.warning(f"Payment failure handled for order {order_id}")
    
            return {'status': 'payment_failed', 'error': error_message}
    
        except Exception as exc:
            logging.error(f"Failed to handle payment failure: {exc}")
            raise
    
    @app.task(name='ecommerce.orders.fulfillment.release_inventory_reservation')
    def release_inventory_reservation(reservation_id: str):
        """Release inventory reservation."""
        try:
            # Simulate inventory release
            time.sleep(random.uniform(0.1, 0.3))
    
            logging.info(f"Inventory reservation released: {reservation_id}")
    
            return {'reservation_id': reservation_id, 'status': 'released'}
    
        except Exception as exc:
            logging.error(f"Failed to release reservation {reservation_id}: {exc}")
            raise
    
    # Notification tasks
    @app.task(name='ecommerce.orders.notifications.send_backorder_notification')
    def send_backorder_notification(order_data: Dict[str, Any], backorder_id: str):
        """Send backorder notification to customer."""
        # Implementation would send actual email
        logging.info(f"Backorder notification sent for {order_data['order_id']}")
    
    @app.task(name='ecommerce.orders.notifications.send_payment_failure_notification')
    def send_payment_failure_notification(order_data: Dict[str, Any], error_message: str):
        """Send payment failure notification."""
        # Implementation would send actual email
        logging.info(f"Payment failure notification sent for {order_data['order_id']}")
    
    @app.task(name='ecommerce.orders.analytics.log_payment_failure')
    def log_payment_failure(order_data: Dict[str, Any], error_message: str):
        """Log payment failure for analysis."""
        # Implementation would log to analytics system
        logging.info(f"Payment failure logged for analysis: {order_data['order_id']}")
    
    @app.task(name='ecommerce.orders.fulfillment.recheck_inventory')
    def recheck_inventory(order_data: Dict[str, Any], backorder_id: str):
        """Recheck inventory for backordered items."""
        try:
            # Trigger inventory check again
            result = check_inventory_task.delay(order_data)
    
            if result.successful():
                # If inventory is now available, continue with order
                continue_order_processing.delay(result.get(), backorder_id)
    
            logging.info(f"Inventory rechecked for backorder {backorder_id}")
    
        except Exception as exc:
            logging.error(f"Failed to recheck inventory for {backorder_id}: {exc}")
    
    @app.task(name='ecommerce.orders.fulfillment.continue_order_processing')
    def continue_order_processing(inventory_result: Dict[str, Any], backorder_id: str):
        """Continue order processing after inventory becomes available."""
        # Continue with payment and fulfillment
        order_workflow = chain(
            process_payment_task.s(inventory_result),
            fulfill_order_task.s(),
            send_order_confirmation.s()
        )
    
        result = order_workflow.apply_async()
        logging.info(f"Continued processing for backorder {backorder_id}")
    
        return result
    
    # Main order processing workflow
    def create_order_workflow(order_data: Dict[str, Any]):
        """Create the complete order processing workflow."""
        return chain(
            validate_order_task.s(order_data),
            check_inventory_task.s(),
            process_payment_task.s(),
            fulfill_order_task.s(),
            send_order_confirmation.s()
        )
    
    # Batch order processing
    @app.task(name='ecommerce.orders.batch.process_orders')
    def process_order_batch(order_batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process multiple orders efficiently."""
        try:
            # Create workflows for all orders
            workflows = []
            for order_data in order_batch:
                workflow = create_order_workflow(order_data)
                workflows.append(workflow)
    
            # Execute all workflows in parallel
            job = group(workflows)
            result = job.apply_async()
    
            # Monitor progress
            batch_id = f"batch_{int(time.time())}"
    
            logging.info(f"Processing order batch {batch_id} with {len(order_batch)} orders")
    
            return {
                'batch_id': batch_id,
                'total_orders': len(order_batch),
                'job_id': result.id,
                'status': 'processing'
            }
    
        except Exception as exc:
            logging.error(f"Failed to process order batch: {exc}")
            raise
    
    # Example usage and testing
    def simulate_order_processing():
        """Simulate order processing for testing."""
    
        # Sample order data
        sample_order = {
            'order_id': f'ORD-{int(time.time())}',
            'customer_id': 'CUST-12345',
            'items': [
                {
                    'product_id': 'PROD-001',
                    'quantity': 2,
                    'price': 29.99
                },
                {
                    'product_id': 'PROD-002',
                    'quantity': 1,
                    'price': 49.99
                }
            ],
            'shipping_address': {
                'street': '123 Main St',
                'city': 'Anytown',
                'state': 'CA',
                'zip': '12345',
                'country': 'US'
            },
            'payment_method': {
                'type': 'credit_card',
                'last_four': '1234'
            },
            'total_amount': 109.97,
            'currency': 'USD',
            'created_at': datetime.now().isoformat()
        }
    
        # Process single order
        print("Processing single order...")
        workflow = create_order_workflow(sample_order)
        result = workflow.apply_async()
    
        print(f"Order workflow started: {result.id}")
    
        # Monitor workflow progress
        try:
            final_result = result.get(timeout=300)  # 5 minute timeout
            print(f"Order processed successfully: {final_result}")
        except Exception as e:
            print(f"Order processing failed: {e}")
    
        # Process batch of orders
        print("\nProcessing order batch...")
        order_batch = [sample_order.copy() for _ in range(5)]
    
        # Update order IDs for batch
        for i, order in enumerate(order_batch):
            order['order_id'] = f'ORD-BATCH-{int(time.time())}-{i}'
    
        batch_result = process_order_batch.delay(order_batch)
        print(f"Batch processing started: {batch_result.get()}")
    
    if __name__ == '__main__':
        simulate_order_processing()
    Python

    This case study demonstrates a complete e-commerce order processing system with:

    1. Complex Workflow Management: Multi-step order processing with proper error handling
    2. Error Recovery: Sophisticated handling of payment failures and inventory shortages
    3. Batch Processing: Efficient handling of multiple orders
    4. Real-world Complexity: Realistic business logic with external service integrations
    5. Monitoring and Logging: Comprehensive logging for business intelligence

    The system handles the complete order lifecycle from validation through fulfillment, with proper error recovery mechanisms and customer notifications. This represents a production-ready implementation that could handle thousands of orders per day.


    Case Study 2: Data Pipeline Automation

    graph TB
        subgraph "Data Sources"
            A[API Endpoints]
            B[CSV Files]
            C[Database Tables]
            D[Real-time Streams]
        end
    
        subgraph "Extraction Layer"
            E[Extract API Data]
            F[Extract File Data]
            G[Extract DB Data]
            H[Extract Stream Data]
        end
    
        subgraph "Transformation Layer"
            I[Data Validation]
            J[Data Cleaning]
            K[Data Enrichment]
            L[Data Aggregation]
        end
    
        subgraph "Loading Layer"
            M[Load to Data Warehouse]
            N[Load to Analytics DB]
            O[Generate Reports]
            P[Update Dashboards]
        end
    
        A --> E
        B --> F
        C --> G
        D --> H
    
        E --> I
        F --> I
        G --> I
        H --> I
    
        I --> J
        J --> K
        K --> L
    
        L --> M
        L --> N
        L --> O
        L --> P
    
        style A fill:#e8f5e8
        style E fill:#e3f2fd
        style I fill:#fff3e0
        style M fill:#fce4ec

    Case Study 3: Image Processing Service

    Complete Image Processing Pipeline

    # image_processing_service.py
    """
    Comprehensive image processing service using Celery.
    Handles upload, processing, optimization, and CDN distribution.
    """
    from celery import Celery, group, chain, chord
    from PIL import Image, ImageFilter, ImageEnhance
    import boto3
    import os
    import logging
    from typing import Dict, List, Tuple, Any
    import time
    import hashlib
    import json
    
    app = Celery('image_service')
    app.conf.update(
        broker_url='redis://localhost:6379/0',
        result_backend='redis://localhost:6379/0',
        task_routes={
            'image.upload.*': {'queue': 'upload'},
            'image.process.*': {'queue': 'processing'},
            'image.optimize.*': {'queue': 'optimization'},
            'image.distribute.*': {'queue': 'distribution'},
        }
    )
    
    # AWS S3 client
    s3_client = boto3.client('s3')
    
    class ImageProcessingConfig:
        """Configuration for image processing pipeline."""
    
        UPLOAD_BUCKET = 'uploads-bucket'
        PROCESSED_BUCKET = 'processed-images-bucket'
        CDN_BUCKET = 'cdn-images-bucket'
    
        # Supported formats
        SUPPORTED_FORMATS = ['JPEG', 'PNG', 'WEBP', 'GIF']
    
        # Processing variations
        THUMBNAIL_SIZES = [(150, 150), (300, 300), (600, 600)]
        RESPONSIVE_WIDTHS = [480, 768, 1024, 1200, 1920]
    
        # Quality settings
        JPEG_QUALITY = 85
        WEBP_QUALITY = 80
        PNG_OPTIMIZE = True
    
    @app.task(bind=True, name='image.upload.validate_upload')
    def validate_image_upload(self, upload_data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate uploaded image file."""
        try:
            file_path = upload_data['file_path']
            file_size = upload_data['file_size']
            content_type = upload_data['content_type']
    
            # Size validation (max 50MB)
            if file_size > 50 * 1024 * 1024:
                raise ValueError("File size exceeds 50MB limit")
    
            # Content type validation
            allowed_types = ['image/jpeg', 'image/png', 'image/webp', 'image/gif']
            if content_type not in allowed_types:
                raise ValueError(f"Unsupported content type: {content_type}")
    
            # Open and validate image
            with Image.open(file_path) as img:
                # Check image format
                if img.format not in ImageProcessingConfig.SUPPORTED_FORMATS:
                    raise ValueError(f"Unsupported image format: {img.format}")
    
                # Check dimensions (min 100x100, max 8000x8000)
                width, height = img.size
                if width < 100 or height < 100:
                    raise ValueError("Image too small (minimum 100x100)")
                if width > 8000 or height > 8000:
                    raise ValueError("Image too large (maximum 8000x8000)")
    
                # Generate metadata
                metadata = {
                    'original_format': img.format,
                    'original_size': img.size,
                    'mode': img.mode,
                    'has_transparency': img.mode in ['RGBA', 'LA'] or 'transparency' in img.info,
                    'file_size': file_size
                }
    
            # Generate unique image ID
            image_id = hashlib.sha256(f"{file_path}{time.time()}".encode()).hexdigest()[:16]
    
            logging.info(f"Image validated: {image_id}")
    
            return {
                'image_id': image_id,
                'status': 'validated',
                'metadata': metadata,
                'upload_data': upload_data
            }
    
        except Exception as exc:
            logging.error(f"Image validation failed: {exc}")
            raise
    
    @app.task(bind=True, name='image.upload.upload_to_s3')
    def upload_original_to_s3(self, validation_result: Dict[str, Any]) -> Dict[str, Any]:
        """Upload original image to S3."""
        try:
            image_id = validation_result['image_id']
            upload_data = validation_result['upload_data']
            file_path = upload_data['file_path']
    
            # Upload to S3
            s3_key = f"originals/{image_id}/original.{validation_result['metadata']['original_format'].lower()}"
    
            s3_client.upload_file(
                file_path,
                ImageProcessingConfig.UPLOAD_BUCKET,
                s3_key,
                ExtraArgs={
                    'ContentType': upload_data['content_type'],
                    'Metadata': {
                        'image_id': image_id,
                        'original_filename': upload_data.get('filename', ''),
                        'upload_timestamp': str(time.time())
                    }
                }
            )
    
            # Clean up local file
            os.remove(file_path)
    
            logging.info(f"Original image uploaded to S3: {s3_key}")
    
            return {
                'image_id': image_id,
                'original_s3_key': s3_key,
                'bucket': ImageProcessingConfig.UPLOAD_BUCKET,
                'metadata': validation_result['metadata'],
                'status': 'uploaded'
            }
    
        except Exception as exc:
            logging.error(f"S3 upload failed for {validation_result['image_id']}: {exc}")
            raise
    
    @app.task(bind=True, name='image.process.generate_thumbnails')
    def generate_thumbnails(self, upload_result: Dict[str, Any]) -> Dict[str, Any]:
        """Generate thumbnail versions of the image."""
        try:
            image_id = upload_result['image_id']
            s3_key = upload_result['original_s3_key']
            bucket = upload_result['bucket']
    
            # Download original from S3
            local_path = f"/tmp/{image_id}_original"
            s3_client.download_file(bucket, s3_key, local_path)
    
            thumbnail_results = []
    
            with Image.open(local_path) as img:
                for size in ImageProcessingConfig.THUMBNAIL_SIZES:
                    # Create thumbnail
                    thumbnail = img.copy()
                    thumbnail.thumbnail(size, Image.Resampling.LANCZOS)
    
                    # Save thumbnail
                    thumb_path = f"/tmp/{image_id}_thumb_{size[0]}x{size[1]}.jpg"
                    thumbnail.save(
                        thumb_path,
                        'JPEG',
                        quality=ImageProcessingConfig.JPEG_QUALITY,
                        optimize=True
                    )
    
                    # Upload thumbnail to S3
                    thumb_s3_key = f"thumbnails/{image_id}/thumb_{size[0]}x{size[1]}.jpg"
                    s3_client.upload_file(
                        thumb_path,
                        ImageProcessingConfig.PROCESSED_BUCKET,
                        thumb_s3_key,
                        ExtraArgs={'ContentType': 'image/jpeg'}
                    )
    
                    thumbnail_results.append({
                        'size': size,
                        's3_key': thumb_s3_key,
                        'local_path': thumb_path
                    })
    
                    os.remove(thumb_path)
    
            os.remove(local_path)
    
            logging.info(f"Generated {len(thumbnail_results)} thumbnails for {image_id}")
    
            return {
                'image_id': image_id,
                'thumbnails': thumbnail_results,
                'metadata': upload_result['metadata'],
                'status': 'thumbnails_generated'
            }
    
        except Exception as exc:
            logging.error(f"Thumbnail generation failed for {upload_result['image_id']}: {exc}")
            raise
    
    @app.task(bind=True, name='image.process.generate_responsive')
    def generate_responsive_images(self, upload_result: Dict[str, Any]) -> Dict[str, Any]:
        """Generate responsive image variations."""
        try:
            image_id = upload_result['image_id']
            s3_key = upload_result['original_s3_key']
            bucket = upload_result['bucket']
    
            # Download original from S3
            local_path = f"/tmp/{image_id}_original"
            s3_client.download_file(bucket, s3_key, local_path)
    
            responsive_results = []
    
            with Image.open(local_path) as img:
                original_width, original_height = img.size
    
                for width in ImageProcessingConfig.RESPONSIVE_WIDTHS:
                    if width >= original_width:
                        continue  # Skip sizes larger than original
    
                    # Calculate proportional height
                    height = int((width * original_height) / original_width)
    
                    # Resize image
                    resized = img.resize((width, height), Image.Resampling.LANCZOS)
    
                    # Generate multiple formats
                    formats = [
                        ('webp', ImageProcessingConfig.WEBP_QUALITY),
                        ('jpg', ImageProcessingConfig.JPEG_QUALITY)
                    ]
    
                    for format_name, quality in formats:
                        if format_name == 'webp':
                            format_ext = 'webp'
                            pil_format = 'WEBP'
                            content_type = 'image/webp'
                        else:
                            format_ext = 'jpg'
                            pil_format = 'JPEG'
                            content_type = 'image/jpeg'
    
                        # Save resized image
                        resized_path = f"/tmp/{image_id}_responsive_{width}w.{format_ext}"
                        resized.save(
                            resized_path,
                            pil_format,
                            quality=quality,
                            optimize=True
                        )
    
                        # Upload to S3
                        responsive_s3_key = f"responsive/{image_id}/{width}w.{format_ext}"
                        s3_client.upload_file(
                            resized_path,
                            ImageProcessingConfig.PROCESSED_BUCKET,
                            responsive_s3_key,
                            ExtraArgs={'ContentType': content_type}
                        )
    
                        responsive_results.append({
                            'width': width,
                            'height': height,
                            'format': format_name,
                            's3_key': responsive_s3_key,
                            'file_size': os.path.getsize(resized_path)
                        })
    
                        os.remove(resized_path)
    
            os.remove(local_path)
    
            logging.info(f"Generated {len(responsive_results)} responsive images for {image_id}")
    
            return {
                'image_id': image_id,
                'responsive_images': responsive_results,
                'metadata': upload_result['metadata'],
                'status': 'responsive_generated'
            }
    
        except Exception as exc:
            logging.error(f"Responsive image generation failed for {upload_result['image_id']}: {exc}")
            raise
    
    @app.task(bind=True, name='image.optimize.apply_filters')
    def apply_image_filters(self, upload_result: Dict[str, Any], filters: List[str]) -> Dict[str, Any]:
        """Apply image filters and enhancements."""
        try:
            image_id = upload_result['image_id']
            s3_key = upload_result['original_s3_key']
            bucket = upload_result['bucket']
    
            if not filters:
                return {
                    'image_id': image_id,
                    'filtered_images': [],
                    'status': 'no_filters_applied'
                }
    
            # Download original from S3
            local_path = f"/tmp/{image_id}_original"
            s3_client.download_file(bucket, s3_key, local_path)
    
            filtered_results = []
    
            with Image.open(local_path) as img:
                for filter_name in filters:
                    filtered_img = img.copy()
    
                    # Apply specific filters
                    if filter_name == 'blur':
                        filtered_img = filtered_img.filter(ImageFilter.BLUR)
                    elif filter_name == 'sharpen':
                        filtered_img = filtered_img.filter(ImageFilter.SHARPEN)
                    elif filter_name == 'enhance_contrast':
                        enhancer = ImageEnhance.Contrast(filtered_img)
                        filtered_img = enhancer.enhance(1.2)
                    elif filter_name == 'enhance_brightness':
                        enhancer = ImageEnhance.Brightness(filtered_img)
                        filtered_img = enhancer.enhance(1.1)
                    elif filter_name == 'grayscale':
                        filtered_img = filtered_img.convert('L').convert('RGB')
    
                    # Save filtered image
                    filtered_path = f"/tmp/{image_id}_filtered_{filter_name}.jpg"
                    filtered_img.save(
                        filtered_path,
                        'JPEG',
                        quality=ImageProcessingConfig.JPEG_QUALITY,
                        optimize=True
                    )
    
                    # Upload to S3
                    filtered_s3_key = f"filtered/{image_id}/{filter_name}.jpg"
                    s3_client.upload_file(
                        filtered_path,
                        ImageProcessingConfig.PROCESSED_BUCKET,
                        filtered_s3_key,
                        ExtraArgs={'ContentType': 'image/jpeg'}
                    )
    
                    filtered_results.append({
                        'filter': filter_name,
                        's3_key': filtered_s3_key,
                        'file_size': os.path.getsize(filtered_path)
                    })
    
                    os.remove(filtered_path)
    
            os.remove(local_path)
    
            logging.info(f"Applied {len(filters)} filters to {image_id}")
    
            return {
                'image_id': image_id,
                'filtered_images': filtered_results,
                'status': 'filters_applied'
            }
    
        except Exception as exc:
            logging.error(f"Filter application failed for {upload_result['image_id']}: {exc}")
            raise
    
    @app.task(bind=True, name='image.distribute.update_cdn')
    def distribute_to_cdn(self, processing_results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Distribute processed images to CDN."""
        try:
            # Aggregate all processed images
            image_id = processing_results[0]['image_id']
            all_images = []
    
            for result in processing_results:
                if 'thumbnails' in result:
                    all_images.extend(result['thumbnails'])
                if 'responsive_images' in result:
                    all_images.extend(result['responsive_images'])
                if 'filtered_images' in result:
                    all_images.extend(result['filtered_images'])
    
            cdn_urls = []
    
            for image_info in all_images:
                s3_key = image_info['s3_key']
    
                # Copy to CDN bucket
                copy_source = {
                    'Bucket': ImageProcessingConfig.PROCESSED_BUCKET,
                    'Key': s3_key
                }
    
                cdn_key = f"cdn/{s3_key}"
                s3_client.copy_object(
                    CopySource=copy_source,
                    Bucket=ImageProcessingConfig.CDN_BUCKET,
                    Key=cdn_key,
                    ExtraArgs={'ACL': 'public-read'}
                )
    
                # Generate CDN URL
                cdn_url = f"https://cdn.example.com/{cdn_key}"
                cdn_urls.append({
                    'type': image_info.get('size') or image_info.get('width') or image_info.get('filter'),
                    'url': cdn_url,
                    'format': image_info.get('format', 'jpeg')
                })
    
            logging.info(f"Distributed {len(cdn_urls)} images to CDN for {image_id}")
    
            return {
                'image_id': image_id,
                'cdn_urls': cdn_urls,
                'status': 'distributed_to_cdn',
                'distribution_timestamp': time.time()
            }
    
        except Exception as exc:
            logging.error(f"CDN distribution failed: {exc}")
            raise
    
    @app.task(name='image.process.finalize_processing')
    def finalize_image_processing(distribution_result: Dict[str, Any]) -> Dict[str, Any]:
        """Finalize image processing and update database."""
        try:
            image_id = distribution_result['image_id']
    
            # Update database with processing results
            # (In real implementation, update your database)
    
            # Generate image manifest
            manifest = {
                'image_id': image_id,
                'processing_completed_at': time.time(),
                'cdn_urls': distribution_result['cdn_urls'],
                'status': 'completed'
            }
    
            # Store manifest in database or cache
            logging.info(f"Image processing completed for {image_id}")
    
            return manifest
    
        except Exception as exc:
            logging.error(f"Failed to finalize processing for {distribution_result['image_id']}: {exc}")
            raise
    
    # Main image processing workflow
    def create_image_processing_workflow(upload_data: Dict[str, Any], filters: List[str] = None):
        """Create complete image processing workflow."""
        filters = filters or []
    
        # Validation and upload
        validation_chain = chain(
            validate_image_upload.s(upload_data),
            upload_original_to_s3.s()
        )
    
        # Parallel processing
        processing_group = group([
            generate_thumbnails.s(),
            generate_responsive_images.s(),
            apply_image_filters.s(filters) if filters else signature('dummy_task', args=[])
        ])
    
        # Distribution and finalization
        distribution_chain = chain(
            distribute_to_cdn.s(),
            finalize_image_processing.s()
        )
    
        # Complete workflow
        workflow = chain(
            validation_chain,
            chord(processing_group, distribution_chain)
        )
    
        return workflow
    
    # Batch image processing
    @app.task(name='image.batch.process_images')
    def process_image_batch(upload_batch: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process multiple images in batch."""
        try:
            batch_id = f"batch_{int(time.time())}"
            workflows = []
    
            for upload_data in upload_batch:
                workflow = create_image_processing_workflow(upload_data)
                workflows.append(workflow)
    
            # Execute all workflows in parallel
            job = group(workflows)
            result = job.apply_async()
    
            logging.info(f"Processing image batch {batch_id} with {len(upload_batch)} images")
    
            return {
                'batch_id': batch_id,
                'total_images': len(upload_batch),
                'job_id': result.id,
                'status': 'processing'
            }
    
        except Exception as exc:
            logging.error(f"Failed to process image batch: {exc}")
            raise
    
    # Example usage
    def simulate_image_processing():
        """Simulate image processing workflow."""
    
        # Sample upload data
        sample_upload = {
            'file_path': '/tmp/sample_image.jpg',
            'file_size': 2048000,  # 2MB
            'content_type': 'image/jpeg',
            'filename': 'sample_image.jpg',
            'uploader_id': 'user123'
        }
    
        # Create sample image file for testing
        # (In real implementation, this would be an actual uploaded file)
    
        # Single image processing
        print("Processing single image...")
        filters = ['enhance_contrast', 'sharpen']
        workflow = create_image_processing_workflow(sample_upload, filters)
        result = workflow.apply_async()
    
        print(f"Image processing workflow started: {result.id}")
    
        # Batch processing
        print("Processing image batch...")
        batch_uploads = [sample_upload.copy() for _ in range(3)]
    
        batch_result = process_image_batch.delay(batch_uploads)
        print(f"Batch processing started: {batch_result.get()}")
    
    if __name__ == '__main__':
        simulate_image_processing()
    Python

    Appendix A: Configuration Reference

    Complete Configuration Options

    # complete_celery_config.py
    """Complete Celery configuration reference with all options."""
    
    class CompleteCeleryConfig:
        """Comprehensive Celery configuration with all available options."""
    
        # ========== BROKER SETTINGS ==========
    
        # Broker URL and transport
        broker_url = 'redis://localhost:6379/0'
        broker_transport = 'redis'  # redis, pyamqp, sqs, etc.
    
        # Connection settings
        broker_connection_timeout = 30
        broker_connection_retry_on_startup = True
        broker_connection_max_retries = 100
        broker_failover_strategy = 'round-robin'
        broker_heartbeat = 120
        broker_heartbeat_checkrate = 2.0
    
        # Pool settings
        broker_pool_limit = 10
        broker_channel_error_retry = False
    
        # Transport options
        broker_transport_options = {
            'region': 'us-east-1',  # For SQS
            'predefined_queues': {},
            'socket_keepalive': True,
            'socket_keepalive_options': {},
            'socket_timeout': 30.0,
            'retry_on_timeout': True,
            'max_connections': 20,
        }
    
        # Login method
        broker_login_method = 'AMQPLAIN'
    
        # SSL settings
        broker_use_ssl = {
            'keyfile': '/path/to/key.pem',
            'certfile': '/path/to/cert.pem',
            'ca_certs': '/path/to/ca.pem',
            'cert_reqs': 'ssl.CERT_REQUIRED',
            'ssl_version': 'ssl.PROTOCOL_TLS',
            'ciphers': None
        }
    
        # ========== TASK SETTINGS ==========
    
        # Serialization
        task_serializer = 'json'
        task_compression = 'gzip'
        task_compression_level = 6
    
        # Task execution
        task_always_eager = False
        task_eager_propagates = True
        task_ignore_result = False
        task_store_eager_result = True
    
        # Task routing
        task_routes = {}
        task_default_queue = 'celery'
        task_default_exchange = 'celery'
        task_default_exchange_type = 'direct'
        task_default_routing_key = 'celery'
    
        # Task annotations
        task_annotations = {
            '*': {
                'rate_limit': '100/s',
                'time_limit': 300,
                'soft_time_limit': 240,
            }
        }
    
        # Task inheritance
        task_inherit_parent_priority = True
        task_default_priority = 5
        task_queue_max_priority = 10
    
        # Task tracking
        task_track_started = False
        task_send_sent_event = False
    
        # Task time limits
        task_time_limit = 60 * 60  # 1 hour
        task_soft_time_limit = 60 * 55  # 55 minutes
    
        # Task acknowledgment
        task_acks_late = False
        task_acks_on_failure_or_timeout = True
        task_reject_on_worker_lost = False
    
        # Task retry settings
        task_publish_retry = True
        task_publish_retry_policy = {
            'max_retries': 3,
            'interval_start': 0,
            'interval_step': 0.2,
            'interval_max': 0.2,
        }
    
        # ========== RESULT BACKEND SETTINGS ==========
    
        # Result backend URL
        result_backend = 'redis://localhost:6379/0'
    
        # Result serialization
        result_serializer = 'json'
        result_compression = 'gzip'
        result_compression_level = 6
    
        # Result expiration
        result_expires = 3600  # 1 hour
        result_persistent = True
    
        # Result caching
        result_cache_max = 100
        result_backend_always_retry = False
        result_backend_max_retries = 3
        result_backend_retry_delay = 0
    
        # Result backend transport options
        result_backend_transport_options = {
            'master_name': 'sentinel-master',
            'socket_keepalive': True,
            'socket_keepalive_options': {},
            'socket_timeout': 30.0,
            'retry_on_timeout': True,
        }
    
        # ========== WORKER SETTINGS ==========
    
        # Worker concurrency
        worker_concurrency = 4
        worker_prefetch_multiplier = 4
    
        # Worker processes
        worker_max_tasks_per_child = 1000
        worker_max_memory_per_child = 200000  # 200MB
        worker_disable_rate_limits = False
    
        # Worker behavior
        worker_redirect_stdouts = True
        worker_redirect_stdouts_level = 'WARNING'
        worker_hijack_root_logger = True
        worker_log_color = True
    
        # Worker state
        worker_state_db = '/var/lib/celery/worker.db'
        worker_timer_precision = 1.0
    
        # Worker autoscaling
        worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
        worker_autoscale_max = 10
        worker_autoscale_min = 2
    
        # Worker process management
        worker_proc_alive_timeout = 60.0
        worker_lost_wait = 10.0
    
        # Worker consumer settings
        worker_consumer = 'celery.worker.consumer:Consumer'
        worker_direct = False
        worker_enable_remote_control = True
    
        # Worker heartbeat
        worker_send_task_events = False
    
        # ========== LOGGING SETTINGS ==========
    
        # Worker logging
        worker_log_level = 'INFO'
        worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
        worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
    
        # ========== SECURITY SETTINGS ==========
    
        # Content types
        accept_content = ['json']
    
        # Security key
        security_key = 'your-secret-key'
        security_certificate = '/path/to/cert.pem'
        security_cert_store = '/path/to/cert-store'
    
        # ========== BEAT SETTINGS ==========
    
        # Beat scheduler
        beat_scheduler = 'celery.beat:PersistentScheduler'
        beat_schedule_filename = 'celerybeat-schedule'
        beat_sync_every = 0
        beat_max_loop_interval = 300
    
        # Timezone
        timezone = 'UTC'
        enable_utc = True
    
        # ========== MONITORING SETTINGS ==========
    
        # Events
        worker_send_task_events = True
        task_send_sent_event = True
    
        # Monitoring
        control_queue_ttl = 300.0
        control_queue_expires = 10.0
        control_exchange = 'celeryctl'
    
        # ========== DATABASE SETTINGS ==========
    
        # Database result backend
        database_url = 'postgresql://user:pass@localhost/celery'
        database_engine_options = {
            'echo': False,
            'pool_recycle': 3600,
            'pool_size': 10,
            'pool_timeout': 30,
            'max_overflow': 20,
        }
    
        database_table_names = {
            'task': 'celery_taskmeta',
            'group': 'celery_tasksetmeta',
        }
    
        database_table_schemas = {
            'task': 'celery',
            'group': 'celery',
        }
    
        # ========== CACHE SETTINGS ==========
    
        # Cache backend
        cache_backend = 'memcached://127.0.0.1:11211/'
        cache_backend_options = {
            'binary': True,
            'behaviors': {
                'tcp_nodelay': True,
                'tcp_keepalive': True,
            }
        }
    
        # ========== ADVANCED SETTINGS ==========
    
        # Include
        include = []
    
        # Imports
        imports = []
    
        # Late binding
        task_create_missing_queues = True
    
        # Queue configuration
        task_queues = ()
    
        # Result chord settings
        result_chord_retry_delay = 1.0
        result_chord_retry_max = 3
    
        # Deprecated settings (for reference)
        # CELERY_ALWAYS_EAGER = task_always_eager
        # CELERY_EAGER_PROPAGATES_EXCEPTIONS = task_eager_propagates
        # BROKER_URL = broker_url
        # CELERY_RESULT_BACKEND = result_backend
    Python

    Appendix B: CLI Commands Reference

    Complete Celery CLI Reference

    # ========== WORKER COMMANDS ==========
    
    # Start worker
    celery -A myapp worker --loglevel=info
    
    # Worker with specific queues
    celery -A myapp worker --loglevel=info -Q queue1,queue2
    
    # Worker with concurrency
    celery -A myapp worker --loglevel=info --concurrency=8
    
    # Worker with specific pool
    celery -A myapp worker --pool=eventlet --concurrency=1000
    
    # Worker with hostname
    celery -A myapp worker --hostname=worker1@%h
    
    # Worker with autoscaling
    celery -A myapp worker --autoscale=10,3
    
    # Worker with time limits
    celery -A myapp worker --time-limit=300 --soft-time-limit=240
    
    # Worker with max tasks per child
    celery -A myapp worker --max-tasks-per-child=1000
    
    # Worker with max memory per child
    celery -A myapp worker --max-memory-per-child=200000
    
    # ========== BEAT COMMANDS ==========
    
    # Start beat scheduler
    celery -A myapp beat --loglevel=info
    
    # Beat with custom schedule file
    celery -A myapp beat --schedule=/var/run/celerybeat-schedule
    
    # Beat with pidfile
    celery -A myapp beat --pidfile=/var/run/celerybeat.pid
    
    # ========== FLOWER COMMANDS ==========
    
    # Start flower monitoring
    celery -A myapp flower
    
    # Flower with custom port
    celery -A myapp flower --port=5555
    
    # Flower with authentication
    celery -A myapp flower --basic_auth=user1:password1,user2:password2
    
    # ========== MONITORING COMMANDS ==========
    
    # List active tasks
    celery -A myapp inspect active
    
    # List scheduled tasks
    celery -A myapp inspect scheduled
    
    # List reserved tasks
    celery -A myapp inspect reserved
    
    # Worker statistics
    celery -A myapp inspect stats
    
    # Registered tasks
    celery -A myapp inspect registered
    
    # Worker ping
    celery -A myapp inspect ping
    
    # Queue lengths (if supported by broker)
    celery -A myapp inspect active_queues
    
    # ========== CONTROL COMMANDS ==========
    
    # Enable events
    celery -A myapp control enable_events
    
    # Disable events
    celery -A myapp control disable_events
    
    # Cancel task
    celery -A myapp control revoke task_id
    
    # Cancel task and terminate
    celery -A myapp control revoke task_id --terminate
    
    # Shutdown workers
    celery -A myapp control shutdown
    
    # Restart worker pool
    celery -A myapp control pool_restart
    
    # Add consumer
    celery -A myapp control add_consumer queue_name
    
    # Cancel consumer
    celery -A myapp control cancel_consumer queue_name
    
    # Set rate limit
    celery -A myapp control rate_limit task_name 100/s
    
    # Time limit
    celery -A myapp control time_limit task_name 300 240
    
    # ========== UTILITY COMMANDS ==========
    
    # Purge all messages
    celery -A myapp purge
    
    # Purge specific queue
    celery -A myapp purge -Q queue_name
    
    # List queues
    celery -A myapp amqp queue.declare queue_name
    
    # Migration commands
    celery -A myapp migrate
    
    # Shell
    celery -A myapp shell
    
    # Check configuration
    celery -A myapp report
    
    # Events monitoring
    celery -A myapp events
    
    # Graph workflow
    celery -A myapp graph
    
    # ========== RESULT COMMANDS ==========
    
    # Get task result
    celery -A myapp result task_id
    
    # Get task info
    celery -A myapp inspect task_info task_id
    
    # ========== MULTI COMMANDS ==========
    
    # Start multiple workers
    celery multi start worker1 worker2 -A myapp
    
    # Stop multiple workers
    celery multi stop worker1 worker2
    
    # Restart multiple workers
    celery multi restart worker1 worker2
    
    # Show worker status
    celery multi show worker1 worker2
    
    # Start workers with different configs
    celery multi start worker1 worker2 \
        -A myapp \
        -c 4 \
        --loglevel=info \
        -Q:worker1 queue1,queue2 \
        -Q:worker2 queue3,queue4 \
        --pidfile=/var/run/celery/%n.pid \
        --logfile=/var/log/celery/%n.log
    
    # ========== ADVANCED EXAMPLES ==========
    
    # Production worker with full configuration
    celery -A myapp worker \
        --loglevel=info \
        --concurrency=8 \
        --pool=prefork \
        --queues=high,normal,low \
        --hostname=worker1@%h \
        --time-limit=3600 \
        --soft-time-limit=3300 \
        --max-tasks-per-child=1000 \
        --max-memory-per-child=200000 \
        --pidfile=/var/run/celery/worker1.pid \
        --logfile=/var/log/celery/worker1.log \
        --autoscale=10,2
    
    # High-performance I/O worker
    celery -A myapp worker \
        --pool=eventlet \
        --concurrency=1000 \
        --queues=io_intensive \
        --loglevel=warning \
        --without-mingle \
        --without-gossip
    
    # CPU-intensive worker
    celery -A myapp worker \
        --pool=prefork \
        --concurrency=4 \
        --queues=cpu_intensive \
        --time-limit=7200 \
        --max-tasks-per-child=10
    
    # Development worker with debugging
    celery -A myapp worker \
        --loglevel=debug \
        --pool=solo \
        --queues=debug \
        --without-heartbeat \
        --without-mingle \
        --without-gossip
    
    # Monitoring and inspection examples
    celery -A myapp inspect active --destination=celery@worker1
    celery -A myapp control revoke task_id --destination=celery@worker1
    celery -A myapp control rate_limit myapp.tasks.heavy_task 10/m
    Bash

    Appendix C: Troubleshooting Guide

    Common Issues and Solutions

    Worker Issues

    Problem: Workers not starting

    # Check configuration
    celery -A myapp report
    
    # Check broker connectivity
    celery -A myapp inspect ping
    
    # Start worker with debug logging
    celery -A myapp worker --loglevel=debug
    Bash

    Problem: High memory usage

    # Add to configuration
    worker_max_memory_per_child = 200000  # 200MB
    worker_max_tasks_per_child = 1000
    Bash

    Problem: Tasks hanging

    # Add time limits
    task_time_limit = 300
    task_soft_time_limit = 240
    
    # Or in task annotation
    @app.task(time_limit=300, soft_time_limit=240)
    def my_task():
        pass
    Python

    Broker Issues

    Problem: Connection errors

    # Increase connection timeout
    broker_connection_timeout = 30
    broker_connection_retry_on_startup = True
    broker_connection_max_retries = 10
    INI

    Problem: Redis connection pool exhausted

    # Configure Redis pool
    redis_max_connections = 20
    redis_retry_on_timeout = True
    INI

    Performance Issues

    Problem: Low throughput

    # Optimize prefetch
    worker_prefetch_multiplier = 1  # For long tasks
    worker_prefetch_multiplier = 10  # For short tasks
    
    # Use appropriate pool
    worker_pool = 'eventlet'  # For I/O bound
    worker_pool = 'prefork'   # For CPU bound
    INI

    Problem: Queue backlog

    # Check queue lengths
    celery -A myapp inspect active_queues
    
    # Add more workers
    celery multi start worker1 worker2 worker3 -A myapp
    
    # Use autoscaling
    celery -A myapp worker --autoscale=10,2
    Bash

    Final Summary

    This comprehensive guide to Python Celery covers everything from basic concepts to expert-level implementations. The book demonstrates:

    1. Complete Learning Path: From installation to production deployment
    2. Real-World Examples: E-commerce order processing, image processing pipelines, and data automation
    3. Production-Ready Code: Security, monitoring, error handling, and scalability
    4. Best Practices: Configuration management, performance optimization, and troubleshooting
    5. Visual Documentation: Mermaid.js diagrams explaining complex architectures and workflows

    The examples provided are production-ready and can serve as templates for building robust, scalable distributed systems using Celery. Each chapter builds upon previous concepts while introducing new advanced techniques, making this a complete reference for developers at all skill levels. write


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *