A Comprehensive Guide to Distributed Task Processing
Table of Contents
Part I: Foundations (Beginner)
- Introduction to Celery
- What is Celery?
- Why Use Celery?
- When to Use Celery
- Celery Architecture Overview
- Installation and Setup
- Prerequisites
- Installing Celery
- Message Brokers (Redis, RabbitMQ)
- Development Environment Setup
- Your First Celery Application
- Creating a Basic Task
- Running Workers
- Calling Tasks
- Understanding the Workflow
- Basic Celery Concepts
- Tasks and Task Objects
- Workers and Concurrency
- Brokers and Backends
- Serialization Basics
Part II: Building Skills (Intermediate)
- Task Routing and Organization
- Task Discovery
- Routing Tasks to Specific Workers
- Task Naming and Organization
- Multiple Queues
- Configuration and Settings
- Celery Configuration
- Broker Settings
- Result Backend Configuration
- Worker Configuration
- Error Handling and Retries
- Exception Handling in Tasks
- Automatic Retries
- Custom Retry Logic
- Dead Letter Queues
- Monitoring and Debugging
- Celery Events
- Flower Monitoring
- Logging Best Practices
- Debugging Failed Tasks
- Workflows and Chains
- Task Signatures
- Chains and Groups
- Chords and Maps
- Complex Workflows
Part III: Advanced Techniques (Advanced)
- Performance Optimization
- Worker Optimization
- Memory Management
- Concurrency Models
- Bulk Operations
- Security and Authentication
- Message Encryption
- SSL/TLS Configuration
- Authentication Mechanisms
- Security Best Practices
- Custom Components
- Custom Serializers
- Custom Task Classes
- Custom Result Backends
- Middleware and Signals
- Scaling and Distribution
- Horizontal Scaling
- Load Balancing
- Geographic Distribution
- Container Orchestration
Part IV: Expert Level
- Production Deployment
- Deployment Strategies
- Process Management
- High Availability Setup
- Disaster Recovery
- Advanced Patterns
- Event-Driven Architecture
- Saga Pattern
- Circuit Breakers
- Rate Limiting
- Integration and Ecosystem
- Django Integration
- Flask Integration
- FastAPI Integration
- Third-party Extensions
- Troubleshooting and Maintenance
- Common Issues and Solutions
- Performance Profiling
- Memory Leaks
- Network Issues
- Real-World Case Studies
- E-commerce Order Processing
- Data Pipeline Automation
- Image Processing Service
- Notification System
Appendices
A. Configuration Reference B. CLI Commands Reference C. Troubleshooting Guide D. Resources and Further Reading
Chapter 1: Introduction to Celery
What is Celery?
Celery is a distributed task queue system for Python that enables you to execute tasks asynchronously across multiple machines or processes. It’s designed to handle millions of tasks per minute while providing flexibility, reliability, and ease of use.
graph TD
A[Client Application] -->|Submit Task| B[Message Broker]
B -->|Distribute| C[Worker 1]
B -->|Distribute| D[Worker 2]
B -->|Distribute| E[Worker N]
C -->|Store Result| F[Result Backend]
D -->|Store Result| F
E -->|Store Result| F
F -->|Retrieve Result| A
style A fill:#e1f5fe
style B fill:#fff3e0
style C fill:#e8f5e8
style D fill:#e8f5e8
style E fill:#e8f5e8
style F fill:#fce4ecKey Features
- Asynchronous Task Execution: Execute time-consuming tasks in the background
- Distributed Processing: Scale across multiple machines and processes
- Multiple Broker Support: Redis, RabbitMQ, Amazon SQS, and more
- Flexible Routing: Route tasks to specific workers or queues
- Monitoring and Management: Built-in monitoring tools and web interfaces
- Fault Tolerance: Automatic retries and error handling
- Integration Ready: Works seamlessly with Django, Flask, and other frameworks
Why Use Celery?
1. Improved User Experience
Long-running tasks can make web applications unresponsive. Celery allows you to offload these tasks to background workers, keeping your application fast and responsive.
sequenceDiagram
participant User
participant WebApp
participant Celery
participant Worker
User->>WebApp: Upload large file
WebApp->>Celery: Queue processing task
WebApp->>User: Immediate response (202 Accepted)
Celery->>Worker: Assign task
Worker->>Worker: Process file
Worker->>WebApp: Task complete notification
WebApp->>User: Send completion notification2. Scalability
Celery makes it easy to scale your application horizontally by adding more worker processes or machines as your workload grows.
3. Reliability
With features like automatic retries, dead letter queues, and result persistence, Celery ensures that important tasks are completed even in the face of failures.
4. Flexibility
Support for multiple message brokers, serialization formats, and result backends gives you the flexibility to choose the best tools for your specific use case.
When to Use Celery
Perfect Use Cases
- Web Application Tasks
- Sending emails
- Image/video processing
- Report generation
- Data imports/exports
- Data Processing
- ETL pipelines
- Machine learning training
- Batch processing
- Data synchronization
- System Integration
- API calls to external services
- File processing
- Backup operations
- Cleanup tasks
- Scheduled Tasks
- Periodic reports
- Database maintenance
- Cache warming
- Health checks
When NOT to Use Celery
- Simple Scripts: For one-off scripts or simple applications
- Real-time Requirements: When sub-second response times are critical
- Low Latency Needs: When task overhead exceeds processing time
- Simple Cron Jobs: When basic scheduling is sufficient
Celery Architecture Overview
Celery follows a distributed architecture pattern with several key components:
graph TB
subgraph "Client/Producer"
A[Application Code]
B[Celery Client]
end
subgraph "Message Infrastructure"
C[Message BrokerRedis/RabbitMQ]
D[Result BackendRedis/Database]
end
subgraph "Workers"
E[Worker Process 1]
F[Worker Process 2]
G[Worker Process N]
end
subgraph "Monitoring"
H[Flower]
I[Celery Events]
J[Management Commands]
end
A --> B
B -->|Send Task| C
C -->|Get Task| E
C -->|Get Task| F
C -->|Get Task| G
E -->|Store Result| D
F -->|Store Result| D
G -->|Store Result| D
D -->|Get Result| B
B --> A
C -.->|Monitor| H
E -.->|Events| I
F -.->|Events| I
G -.->|Events| I
I --> H
J -.->|Control| E
J -.->|Control| F
J -.->|Control| G
style A fill:#e3f2fd
style C fill:#fff8e1
style D fill:#f3e5f5
style E fill:#e8f5e8
style F fill:#e8f5e8
style G fill:#e8f5e8
style H fill:#fce4ecCore Components
- Producer/Client: Your application that creates and sends tasks
- Message Broker: Stores and routes messages between producers and workers
- Workers: Processes that execute tasks
- Result Backend: Stores task results (optional)
- Monitoring Tools: Tools for observing and managing the system
Chapter 2: Installation and Setup
Prerequisites
Before installing Celery, ensure you have:
- Python 3.7 or higher
- A message broker (Redis or RabbitMQ recommended)
- Basic knowledge of Python and command-line usage
Installing Celery
Basic Installation
pip install celeryBashWith Redis Support
pip install celery[redis]BashWith RabbitMQ Support
pip install celery[librabbitmq]BashComplete Installation (All Extras)
pip install celery[redis,auth,msgpack]BashMessage Brokers
Celery requires a message broker to send and receive messages. Here are the most popular options:
Redis Setup
Redis is often the easiest broker to get started with:
# Install Redis
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS
brew install redis
# Windows (using Docker)
docker run -d -p 6379:6379 redis:latestBashRedis Configuration for Celery:
# celeryconfig.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'PythonRabbitMQ Setup
RabbitMQ offers more advanced features for complex scenarios:
# Ubuntu/Debian
sudo apt-get install rabbitmq-server
# macOS
brew install rabbitmq
# Windows (using Docker)
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-managementBashRabbitMQ Configuration for Celery:
# celeryconfig.py
broker_url = 'pyamqp://guest@localhost//'
result_backend = 'rpc://'Pythongraph LR
subgraph "Message Broker Options"
A[RedisSimple, Fast]
B[RabbitMQFeature Rich]
C[Amazon SQSCloud Native]
D[Apache KafkaHigh Throughput]
end
subgraph "Use Cases"
E[Development& Simple Apps]
F[EnterpriseApplications]
G[AWSDeployments]
H[Big DataPipelines]
end
A --> E
B --> F
C --> G
D --> H
style A fill:#ffebee
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#e3f2fdDevelopment Environment Setup
Project Structure
Create a well-organized project structure:
my_celery_project/
├── app/
│ ├── __init__.py
│ ├── celery.py # Celery configuration
│ ├── tasks.py # Task definitions
│ └── config.py # Application config
├── requirements.txt
├── docker-compose.yml # For broker services
└── README.mdBashBasic Celery App Setup
Create the main Celery application:
# app/celery.py
from celery import Celery
# Create Celery instance
app = Celery('my_celery_project')
# Load configuration
app.config_from_object('app.config')
# Auto-discover tasks
app.autodiscover_tasks(['app'])
if __name__ == '__main__':
app.start()PythonConfiguration File
# app/config.py
from kombu import Queue
# Broker settings
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
# Task settings
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
timezone = 'UTC'
enable_utc = True
# Worker settings
worker_prefetch_multiplier = 1
task_acks_late = True
# Queue configuration
task_routes = {
'app.tasks.send_email': {'queue': 'email'},
'app.tasks.process_image': {'queue': 'images'},
}
task_default_queue = 'default'
task_queues = (
Queue('default'),
Queue('email'),
Queue('images'),
)PythonDocker Compose for Development
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
flower:
image: mher/flower:0.9.7
ports:
- "5555:5555"
environment:
CELERY_BROKER_URL: redis://redis:6379/0
FLOWER_PORT: 5555
depends_on:
- redisYAMLRequirements File
# requirements.txt
celery[redis]==5.3.4
flower==2.0.1
redis==5.0.1
kombu==5.3.4TOMLChapter 3: Your First Celery Application
Creating a Basic Task
Let’s create your first Celery task step by step:
# app/tasks.py
from .celery import app
import time
@app.task
def add_numbers(x, y):
"""A simple task that adds two numbers."""
time.sleep(2) # Simulate some work
return x + y
@app.task
def send_notification(message, recipient):
"""Simulate sending a notification."""
print(f"Sending '{message}' to {recipient}")
time.sleep(3) # Simulate network delay
return f"Notification sent to {recipient}"
@app.task(bind=True)
def long_running_task(self, duration):
"""A task that reports its progress."""
for i in range(duration):
time.sleep(1)
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': duration}
)
return {'current': duration, 'total': duration, 'status': 'Complete!'}PythonUnderstanding the Task Flow
sequenceDiagram
participant Client as Client Code
participant Broker as Redis Broker
participant Worker as Celery Worker
participant Backend as Result Backend
Client->>Broker: Send task message
Note over Client: Returns AsyncResult immediately
Worker->>Broker: Poll for tasks
Broker->>Worker: Deliver task
Worker->>Worker: Execute task
Worker->>Backend: Store result
Client->>Backend: Check result
Backend->>Client: Return resultRunning Workers
Start a Celery worker to process tasks:
# Basic worker
celery -A app.celery worker --loglevel=info
# Worker with specific queues
celery -A app.celery worker --loglevel=info -Q email,images
# Worker with concurrency control
celery -A app.celery worker --loglevel=info --concurrency=4
# Development worker with auto-reload
celery -A app.celery worker --loglevel=info --reloadBashWorker Output Explanation
-------------- celery@hostname v5.3.4 (emerald-rush)
--- ***** -----
-- ******* ---- Linux-5.15.0 2023-10-01 10:00:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x7f8b8c0a1234
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.tasks.add_numbers
. app.tasks.send_notification
. app.tasks.long_running_task
[2023-10-01 10:00:00,000: INFO/MainProcess] Connected to redis://localhost:6379/0
[2023-10-01 10:00:00,000: INFO/MainProcess] mingle: searching for neighbors
[2023-10-01 10:00:00,000: INFO/MainProcess] mingle: all alone
[2023-10-01 10:00:00,000: INFO/MainProcess] celery@hostname ready.ANSICalling Tasks
There are several ways to call Celery tasks:
1. Basic Task Calling
# client.py
from app.tasks import add_numbers, send_notification
# Asynchronous call (non-blocking)
result = add_numbers.delay(4, 4)
print(f"Task ID: {result.id}")
print(f"Task State: {result.state}")
# Get result (blocking)
answer = result.get(timeout=10)
print(f"Result: {answer}")
# Non-blocking result check
if result.ready():
print(f"Task completed: {result.result}")
else:
print("Task still running...")Python2. Using apply_async for More Control
from app.tasks import send_notification
from datetime import datetime, timedelta
# Schedule task for later
eta = datetime.now() + timedelta(minutes=5)
result = send_notification.apply_async(
args=['Hello!', 'user@example.com'],
eta=eta,
retry=True,
retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
)Python3. Monitoring Task Progress
from app.tasks import long_running_task
import time
# Start a long-running task
result = long_running_task.delay(10)
# Monitor progress
while not result.ready():
if result.state == 'PROGRESS':
meta = result.info
print(f"Progress: {meta['current']}/{meta['total']}")
time.sleep(1)
print(f"Final result: {result.result}")PythonTask States and Results
stateDiagram-v2
[*] --> PENDING
PENDING --> STARTED : Worker picks up task
STARTED --> PROGRESS : Task reports progress
PROGRESS --> PROGRESS : Multiple progress updates
STARTED --> SUCCESS : Task completes successfully
PROGRESS --> SUCCESS : Task completes successfully
STARTED --> FAILURE : Task raises exception
PROGRESS --> FAILURE : Task raises exception
STARTED --> RETRY : Task requests retry
PROGRESS --> RETRY : Task requests retry
RETRY --> STARTED : Retry attempt
SUCCESS --> [*]
FAILURE --> [*]
SUCCESS : Result available
FAILURE : Exception info available
PROGRESS : Meta info availableTask State Examples
from app.tasks import add_numbers
result = add_numbers.delay(4, 4)
# Check various states
print(f"State: {result.state}")
print(f"Info: {result.info}")
# State-specific information
if result.state == 'PENDING':
print("Task is waiting to be processed")
elif result.state == 'PROGRESS':
print(f"Progress: {result.info}")
elif result.state == 'SUCCESS':
print(f"Result: {result.result}")
elif result.state == 'FAILURE':
print(f"Error: {result.info}")PythonError Handling in Tasks
# app/tasks.py
from celery.exceptions import Retry
import random
@app.task(bind=True)
def unreliable_task(self):
"""A task that sometimes fails."""
if random.random() < 0.5:
raise Exception("Random failure occurred")
return "Task completed successfully"
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def auto_retry_task(self):
"""A task with automatic retry on exceptions."""
if random.random() < 0.7:
raise Exception("Task failed, will retry automatically")
return "Success after retries"
@app.task(bind=True)
def manual_retry_task(self, data):
"""A task with manual retry logic."""
try:
# Simulate processing
if random.random() < 0.5:
raise Exception("Processing failed")
return f"Processed: {data}"
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)PythonComplete Example Application
Here’s a complete working example:
# complete_example.py
from celery import Celery
import time
import random
# Create Celery app
app = Celery('complete_example')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
)
@app.task
def process_order(order_id, items):
"""Process an e-commerce order."""
print(f"Processing order {order_id} with {len(items)} items")
# Simulate processing time
time.sleep(2)
# Calculate total
total = sum(item['price'] * item['quantity'] for item in items)
# Simulate occasional failure
if random.random() < 0.1:
raise Exception(f"Payment failed for order {order_id}")
return {
'order_id': order_id,
'total': total,
'status': 'completed',
'processed_at': time.time()
}
@app.task
def send_confirmation_email(order_data):
"""Send order confirmation email."""
print(f"Sending confirmation for order {order_data['order_id']}")
time.sleep(1) # Simulate email sending
return f"Email sent for order {order_data['order_id']}"
# Usage example
if __name__ == '__main__':
# Simulate order processing
order_items = [
{'name': 'Widget A', 'price': 10.00, 'quantity': 2},
{'name': 'Widget B', 'price': 15.00, 'quantity': 1},
]
# Process order asynchronously
order_result = process_order.delay('ORD-001', order_items)
print(f"Order processing started: {order_result.id}")
# Wait for result
try:
order_data = order_result.get(timeout=10)
print(f"Order processed: {order_data}")
# Send confirmation email
email_result = send_confirmation_email.delay(order_data)
email_response = email_result.get(timeout=5)
print(f"Email result: {email_response}")
except Exception as e:
print(f"Order processing failed: {e}")PythonTo run this example:
- Start Redis:
redis-server - Start worker:
celery -A complete_example worker --loglevel=info - Run client:
python complete_example.py
This completes the foundation chapters of our Celery book. The progression shows how to move from basic concepts to practical applications with proper error handling and monitoring.
Chapter 4: Basic Celery Concepts
Tasks and Task Objects
Understanding Celery Tasks
A Celery task is a Python function decorated with @app.task. When called, it becomes a task object that can be executed asynchronously.
classDiagram
class Task {
+name: str
+request: Context
+retry(): void
+apply_async(): AsyncResult
+delay(): AsyncResult
+si(): Signature
+s(): Signature
}
class AsyncResult {
+id: str
+state: str
+result: Any
+info: dict
+ready(): bool
+get(): Any
+forget(): void
}
class Signature {
+task: str
+args: tuple
+kwargs: dict
+apply_async(): AsyncResult
+clone(): Signature
}
Task --> AsyncResult : creates
Task --> Signature : createsTask Types and Decorators
# app/task_types.py
from celery import Task
from .celery import app
import time
# Basic task
@app.task
def simple_task(name):
return f"Hello, {name}!"
# Task with custom name
@app.task(name='custom.task.name')
def named_task():
return "Task with custom name"
# Bound task (has access to self)
@app.task(bind=True)
def bound_task(self, x, y):
print(f"Task ID: {self.request.id}")
print(f"Task name: {self.name}")
return x + y
# Task with custom base class
class CallbackTask(Task):
"""Custom task class with callbacks."""
def on_success(self, retval, task_id, args, kwargs):
print(f"Task {task_id} succeeded: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(f"Task {task_id} failed: {exc}")
@app.task(base=CallbackTask)
def callback_task(value):
if value < 0:
raise ValueError("Value must be positive")
return value * 2
# Task with multiple decorators
@app.task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3, 'countdown': 60},
retry_backoff=True,
retry_jitter=True
)
def robust_task(self, data):
# Simulate unreliable operation
import random
if random.random() < 0.3:
raise Exception("Random failure")
return f"Processed: {data}"PythonTask Context and Request Object
# app/context_tasks.py
@app.task(bind=True)
def inspect_context(self):
"""Task that inspects its execution context."""
request = self.request
context_info = {
'task_id': request.id,
'task_name': self.name,
'args': request.args,
'kwargs': request.kwargs,
'retries': request.retries,
'is_eager': request.is_eager,
'eta': request.eta,
'expires': request.expires,
'hostname': request.hostname,
'delivery_info': request.delivery_info,
}
return context_info
# Usage
result = inspect_context.delay()
print(result.get())PythonWorkers and Concurrency
Worker Architecture
graph TB
subgraph "Celery Worker Process"
A[Main Process] --> B[Pool Process]
A --> C[Consumer Thread]
A --> D[Event Loop]
B --> E[Worker 1]
B --> F[Worker 2]
B --> G[Worker N]
C --> H[Message Queue]
D --> I[Periodic Tasks]
D --> J[Monitoring]
end
subgraph "Concurrency Models"
K[Prefork - multiprocessing]
L[Eventlet - green threads]
M[Gevent - green threads]
N[Threads - threading]
O[Solo - single process]
end
B -.-> K
E -.-> L
F -.-> M
G -.-> N
A -.-> O
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#fce4ecConcurrency Configuration
# worker_config.py
from celery import Celery
app = Celery('worker_demo')
# Prefork (default) - CPU-bound tasks
# Good for: CPU-intensive tasks, fault isolation
app.conf.update(
worker_concurrency=4,
worker_pool='prefork',
worker_prefetch_multiplier=1,
)
# Eventlet - I/O-bound tasks
# Good for: High concurrency I/O operations
app.conf.update(
worker_concurrency=1000,
worker_pool='eventlet',
worker_prefetch_multiplier=1,
)
# Gevent - I/O-bound tasks
# Good for: Network operations, database queries
app.conf.update(
worker_concurrency=1000,
worker_pool='gevent',
worker_prefetch_multiplier=1,
)
# Threads - Balanced approach
# Good for: Mixed workloads
app.conf.update(
worker_concurrency=10,
worker_pool='threads',
worker_prefetch_multiplier=1,
)PythonWorker Pool Comparison
| Pool Type | Best For | Pros | Cons |
|---|---|---|---|
| Prefork | CPU-bound tasks | Fault isolation, True parallelism | Memory overhead, Slow startup |
| Eventlet | I/O-bound tasks | High concurrency, Low memory | No true parallelism, Library compatibility |
| Gevent | Network operations | Good I/O performance | No true parallelism, C extension issues |
| Threads | Mixed workloads | Balanced approach | GIL limitations, Shared state issues |
| Solo | Development/Testing | Simple debugging | No concurrency |
Starting Workers with Different Configurations
# Default prefork worker
celery -A app worker --loglevel=info
# High concurrency with eventlet
celery -A app worker --pool=eventlet --concurrency=1000
# Gevent worker for I/O operations
celery -A app worker --pool=gevent --concurrency=500
# Thread-based worker
celery -A app worker --pool=threads --concurrency=10
# Single process for debugging
celery -A app worker --pool=solo
# Multiple workers with different pools
celery -A app worker --pool=prefork --concurrency=4 -n worker1@%h &
celery -A app worker --pool=eventlet --concurrency=1000 -n worker2@%h &BashBrokers and Backends
Message Broker Deep Dive
graph TB
subgraph "Producer Side"
A[Application] --> B[Celery Client]
B --> C[Serializer]
C --> D[Message]
end
subgraph "Broker"
E[Queue Manager]
F[Routing Logic]
G[Persistence Layer]
H[Exchange/Topics]
end
subgraph "Consumer Side"
I[Worker]
J[Deserializer]
K[Task Execution]
L[Result Storage]
end
D --> E
E --> F
F --> G
G --> H
H --> I
I --> J
J --> K
K --> L
style E fill:#fff3e0
style F fill:#e8f5e8
style G fill:#fce4ec
style H fill:#e3f2fdBroker Configuration Comparison
# Redis Configuration
redis_config = {
'broker_url': 'redis://localhost:6379/0',
'result_backend': 'redis://localhost:6379/0',
# Redis-specific settings
'redis_max_connections': 20,
'redis_retry_on_timeout': True,
'redis_socket_connect_timeout': 30,
'redis_socket_timeout': 30,
# Connection pool settings
'broker_pool_limit': 10,
'broker_connection_timeout': 4,
'broker_connection_retry_on_startup': True,
}
# RabbitMQ Configuration
rabbitmq_config = {
'broker_url': 'pyamqp://guest@localhost//',
'result_backend': 'rpc://',
# RabbitMQ-specific settings
'broker_heartbeat': 30,
'broker_connection_timeout': 30,
'broker_connection_retry_on_startup': True,
'broker_connection_max_retries': 3,
# Queue settings
'task_queue_max_priority': 10,
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
}
# Amazon SQS Configuration
sqs_config = {
'broker_url': 'sqs://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY@',
'broker_transport_options': {
'region': 'us-east-1',
'visibility_timeout': 60,
'polling_interval': 1,
},
'task_default_queue': 'celery',
'task_queues': {
'celery': {
'url': 'https://sqs.us-east-1.amazonaws.com/123456789012/celery'
}
}
}BashResult Backend Options
# Redis Result Backend
redis_result_config = {
'result_backend': 'redis://localhost:6379/0',
'result_backend_transport_options': {
'master_name': 'mymaster',
'retry_on_timeout': True,
},
'result_expires': 3600, # 1 hour
'result_persistent': True,
}
# Database Result Backend
database_result_config = {
'result_backend': 'db+postgresql://user:pass@localhost/celery',
'result_backend_transport_options': {
'echo': True, # SQL query logging
},
'database_table_names': {
'task': 'custom_task_table',
'group': 'custom_group_table',
}
}
# File System Result Backend
filesystem_result_config = {
'result_backend': 'file:///var/celery/results',
'result_expires': 3600,
}
# Cache Result Backend (Memcached)
cache_result_config = {
'result_backend': 'cache+memcached://127.0.0.1:11211/',
'cache_backend_options': {
'binary': True,
'behaviors': {'tcp_nodelay': True}
}
}BashSerialization Basics
Serialization Formats
graph TD
subgraph "Serialization Formats"
A[JSON - Default]
B[Pickle - Python Objects]
C[YAML - Human Readable]
D[MessagePack - Binary]
E[XML - Structured]
end
subgraph "Characteristics"
F[Fast, Secure, Limited Types]
G[Full Python Support, Security Risk]
H[Readable, Slow]
I[Compact, Fast]
J[Verbose, Structured]
end
A --> F
B --> G
C --> H
D --> I
E --> J
style A fill:#e8f5e8
style B fill:#ffebee
style C fill:#e3f2fd
style D fill:#fff3e0
style E fill:#fce4ecConfiguring Serialization
# serialization_config.py
from kombu.serialization import register
import json
# Basic serialization configuration
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
# Enable compression
task_compression='gzip',
result_compression='gzip',
)
# Custom JSON serializer with datetime support
from datetime import datetime
import json
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
def datetime_decoder(dct):
for key, value in dct.items():
if isinstance(value, str):
try:
dct[key] = datetime.fromisoformat(value)
except ValueError:
pass
return dct
def dumps(obj):
return json.dumps(obj, cls=DateTimeEncoder)
def loads(data):
return json.loads(data, object_hook=datetime_decoder)
# Register custom serializer
register('datetime_json', dumps, loads,
content_type='application/x-datetime-json',
content_encoding='utf-8')
# Use custom serializer
app.conf.update(
task_serializer='datetime_json',
accept_content=['datetime_json'],
result_serializer='datetime_json',
)PythonSerialization Security
# security_config.py
# NEVER use pickle in production with untrusted data
INSECURE_CONFIG = {
'task_serializer': 'pickle',
'accept_content': ['pickle'],
'result_serializer': 'pickle',
}
# Secure configuration
SECURE_CONFIG = {
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
# Additional security
'worker_hijack_root_logger': False,
'worker_log_color': False,
'task_reject_on_worker_lost': True,
}
# Content type validation
app.conf.update(
accept_content=['json'],
task_serializer='json',
result_serializer='json',
# Reject unknown content types
task_always_eager=False,
task_eager_propagates=True,
)PythonHandling Complex Data Types
# complex_serialization.py
from decimal import Decimal
from datetime import datetime, date
import uuid
# Custom serialization for complex types
class ComplexTypeTask:
@staticmethod
def serialize_complex_data(data):
"""Serialize complex Python objects to JSON-compatible format."""
if isinstance(data, Decimal):
return {'__type__': 'Decimal', 'value': str(data)}
elif isinstance(data, datetime):
return {'__type__': 'datetime', 'value': data.isoformat()}
elif isinstance(data, date):
return {'__type__': 'date', 'value': data.isoformat()}
elif isinstance(data, uuid.UUID):
return {'__type__': 'UUID', 'value': str(data)}
elif isinstance(data, set):
return {'__type__': 'set', 'value': list(data)}
return data
@staticmethod
def deserialize_complex_data(data):
"""Deserialize JSON data back to Python objects."""
if isinstance(data, dict) and '__type__' in data:
type_name = data['__type__']
value = data['value']
if type_name == 'Decimal':
return Decimal(value)
elif type_name == 'datetime':
return datetime.fromisoformat(value)
elif type_name == 'date':
return date.fromisoformat(value)
elif type_name == 'UUID':
return uuid.UUID(value)
elif type_name == 'set':
return set(value)
return data
@app.task
def process_complex_data(data):
"""Task that handles complex data types."""
# Deserialize incoming data
processed_data = ComplexTypeTask.deserialize_complex_data(data)
# Process the data
if isinstance(processed_data, Decimal):
result = processed_data * 2
elif isinstance(processed_data, datetime):
result = processed_data.strftime('%Y-%m-%d %H:%M:%S')
else:
result = f"Processed: {processed_data}"
# Return serializable result
return ComplexTypeTask.serialize_complex_data(result)
# Usage example
complex_data = {
'price': Decimal('19.99'),
'created_at': datetime.now(),
'tags': {'python', 'celery', 'async'},
'id': uuid.uuid4(),
}
# Serialize before sending
serialized_data = ComplexTypeTask.serialize_complex_data(complex_data)
result = process_complex_data.delay(serialized_data)PythonChapter 5: Task Routing and Organization
Task Discovery
Celery automatically discovers tasks in your application through several mechanisms:
graph TB
subgraph "Task Discovery Methods"
A[Explicit Include] --> B[app.conf.include]
C[Auto Discovery] --> D[app.autodiscover_tasks]
E[Manual Import] --> F[Import modules]
G[Package Discovery] --> H[Scan packages]
end
subgraph "Discovery Flow"
I[Celery App Start] --> J[Load Configuration]
J --> K[Run Discovery]
K --> L[Register Tasks]
L --> M[Workers Ready]
end
A --> I
C --> I
E --> I
G --> I
style A fill:#e8f5e8
style C fill:#e3f2fd
style E fill:#fff3e0
style G fill:#fce4ecAutomatic Task Discovery
# app/__init__.py
from celery import Celery
app = Celery('myapp')
# Method 1: Explicit include
app.conf.include = [
'app.tasks.email',
'app.tasks.image_processing',
'app.tasks.reports',
]
# Method 2: Auto-discovery from installed apps (Django-style)
app.autodiscover_tasks(['app.tasks', 'app.background_jobs'])
# Method 3: Auto-discovery with lambda
app.autodiscover_tasks(lambda: ['app.tasks'])
# Method 4: Scan all Python packages
import pkgutil
def find_task_modules():
"""Find all modules containing tasks."""
task_modules = []
for importer, modname, ispkg in pkgutil.walk_packages(['app']):
if 'tasks' in modname:
task_modules.append(f'app.{modname}')
return task_modules
app.autodiscover_tasks(find_task_modules)PythonOrganizing Tasks by Module
app/
├── __init__.py
├── celery.py
├── tasks/
│ ├── __init__.py
│ ├── email.py # Email-related tasks
│ ├── image.py # Image processing tasks
│ ├── reports.py # Report generation tasks
│ ├── cleanup.py # Maintenance tasks
│ └── integrations/
│ ├── __init__.py
│ ├── social.py # Social media tasks
│ └── payments.py # Payment processing tasksBash# app/tasks/email.py
from ..celery import app
@app.task
def send_welcome_email(user_id):
"""Send welcome email to new user."""
pass
@app.task
def send_newsletter(subscriber_list):
"""Send newsletter to subscribers."""
pass
@app.task
def send_password_reset(email, token):
"""Send password reset email."""
pass
# app/tasks/image.py
from ..celery import app
@app.task
def resize_image(image_path, width, height):
"""Resize image to specified dimensions."""
pass
@app.task
def generate_thumbnail(image_path):
"""Generate thumbnail for image."""
pass
@app.task
def process_image_upload(upload_data):
"""Process uploaded image."""
passPythonRouting Tasks to Specific Workers
Queue-Based Routing
graph TB
subgraph "Client Application"
A[Email Task] --> B[email queue]
C[Image Task] --> D[image queue]
E[Report Task] --> F[report queue]
G[Default Task] --> H[default queue]
end
subgraph "Workers"
I[Email Worker] --> B
J[Image Worker] --> D
K[Report Worker] --> F
L[General Worker] --> H
L --> B
L --> D
end
style A fill:#e8f5e8
style C fill:#e3f2fd
style E fill:#fff3e0
style G fill:#fce4ec
style I fill:#e8f5e8
style J fill:#e3f2fd
style K fill:#fff3e0
style L fill:#fce4ecConfiguration-Based Routing
# routing_config.py
from kombu import Queue, Exchange
# Define exchanges
default_exchange = Exchange('default', type='direct')
email_exchange = Exchange('email', type='direct')
image_exchange = Exchange('image', type='direct')
# Configure routing
app.conf.update(
task_routes={
# Route by task name
'app.tasks.email.send_welcome_email': {'queue': 'email'},
'app.tasks.email.*': {'queue': 'email'},
# Route by pattern
'app.tasks.image.*': {
'queue': 'image',
'routing_key': 'image.processing',
},
# Route with custom exchange
'app.tasks.reports.*': {
'queue': 'reports',
'exchange': 'reports',
'routing_key': 'reports.generate',
},
# Complex routing logic
'app.tasks.priority.*': {
'queue': 'priority',
'priority': 9,
},
},
# Define queues
task_queues=(
Queue('default', default_exchange, routing_key='default'),
Queue('email', email_exchange, routing_key='email'),
Queue('image', image_exchange, routing_key='image'),
Queue('reports', routing_key='reports'),
Queue('priority', routing_key='priority'),
),
# Default queue
task_default_queue='default',
task_default_exchange='default',
task_default_routing_key='default',
)PythonDynamic Routing with Custom Router
# custom_router.py
from celery.routes import MapRoute
class CustomRouter:
"""Custom task router with advanced logic."""
def route_for_task(self, task, args=None, kwargs=None):
"""Route tasks based on custom logic."""
# Route based on task arguments
if task == 'app.tasks.process_user_data':
user_id = args[0] if args else kwargs.get('user_id')
if user_id:
# Route premium users to faster queue
if self.is_premium_user(user_id):
return {'queue': 'premium'}
else:
return {'queue': 'standard'}
# Route based on task load
if task.startswith('app.tasks.heavy_'):
return {
'queue': 'heavy_processing',
'routing_key': 'heavy.processing',
'priority': 5,
}
# Route based on time of day
from datetime import datetime
current_hour = datetime.now().hour
if 9 <= current_hour <= 17: # Business hours
if task.startswith('app.tasks.report'):
return {'queue': 'business_hours'}
# Default routing
return None
def is_premium_user(self, user_id):
"""Check if user is premium (mock implementation)."""
# In real implementation, check database/cache
return user_id % 10 == 0 # Every 10th user is premium
# Configure custom router
app.conf.task_routes = (CustomRouter(),)PythonPriority-Based Routing
# priority_routing.py
from kombu import Queue
# Configure priority queues
app.conf.update(
task_routes={
'app.tasks.critical.*': {'queue': 'critical', 'priority': 9},
'app.tasks.high.*': {'queue': 'high', 'priority': 7},
'app.tasks.normal.*': {'queue': 'normal', 'priority': 5},
'app.tasks.low.*': {'queue': 'low', 'priority': 3},
'app.tasks.background.*': {'queue': 'background', 'priority': 1},
},
task_queues=(
Queue('critical', routing_key='critical', queue_arguments={'x-max-priority': 10}),
Queue('high', routing_key='high', queue_arguments={'x-max-priority': 10}),
Queue('normal', routing_key='normal', queue_arguments={'x-max-priority': 10}),
Queue('low', routing_key='low', queue_arguments={'x-max-priority': 10}),
Queue('background', routing_key='background', queue_arguments={'x-max-priority': 10}),
),
# Enable priority support
task_queue_max_priority=10,
worker_prefetch_multiplier=1, # Important for priority to work
)
# Usage with explicit priority
@app.task
def urgent_notification(message):
"""Send urgent notification."""
pass
# Call with priority
urgent_notification.apply_async(
args=['Critical system alert!'],
priority=9,
queue='critical'
)PythonTask Naming and Organization
Naming Conventions
# naming_conventions.py
# Bad naming - unclear and inconsistent
@app.task
def func1(x):
pass
@app.task
def send_mail(email):
pass
@app.task
def img_proc(path):
pass
# Good naming - clear and consistent
@app.task(name='email.send.welcome')
def send_welcome_email(user_id):
"""Send welcome email to new user."""
pass
@app.task(name='email.send.newsletter')
def send_newsletter_email(subscriber_ids):
"""Send newsletter to multiple subscribers."""
pass
@app.task(name='image.process.resize')
def resize_image(image_path, dimensions):
"""Resize image to specified dimensions."""
pass
@app.task(name='image.process.thumbnail')
def generate_image_thumbnail(image_path):
"""Generate thumbnail for uploaded image."""
pass
@app.task(name='report.generate.sales')
def generate_sales_report(start_date, end_date):
"""Generate sales report for date range."""
pass
@app.task(name='maintenance.cleanup.temp_files')
def cleanup_temporary_files():
"""Clean up temporary files older than 24 hours."""
passPythonNamespace Organization
# namespace_organization.py
from celery import Celery
app = Celery('enterprise_app')
# Organize tasks by business domain
class TaskNamespaces:
"""Centralized task name definitions."""
# User management tasks
class User:
REGISTER = 'user.register'
ACTIVATE = 'user.activate'
DEACTIVATE = 'user.deactivate'
SEND_WELCOME = 'user.email.welcome'
SEND_VERIFICATION = 'user.email.verification'
# Order processing tasks
class Order:
CREATE = 'order.create'
PROCESS_PAYMENT = 'order.payment.process'
SEND_CONFIRMATION = 'order.email.confirmation'
UPDATE_INVENTORY = 'order.inventory.update'
FULFILL = 'order.fulfillment.process'
# Content management tasks
class Content:
UPLOAD_IMAGE = 'content.image.upload'
RESIZE_IMAGE = 'content.image.resize'
GENERATE_THUMBNAIL = 'content.image.thumbnail'
PROCESS_VIDEO = 'content.video.process'
EXTRACT_METADATA = 'content.metadata.extract'
# Analytics and reporting
class Analytics:
TRACK_EVENT = 'analytics.event.track'
GENERATE_REPORT = 'analytics.report.generate'
UPDATE_METRICS = 'analytics.metrics.update'
PROCESS_BATCH = 'analytics.batch.process'
# Task implementations
@app.task(name=TaskNamespaces.User.REGISTER)
def register_user(user_data):
"""Register a new user account."""
pass
@app.task(name=TaskNamespaces.Order.PROCESS_PAYMENT)
def process_order_payment(order_id, payment_data):
"""Process payment for an order."""
pass
@app.task(name=TaskNamespaces.Content.RESIZE_IMAGE)
def resize_content_image(image_id, width, height):
"""Resize uploaded image content."""
passPythonMultiple Queues
Queue Architecture Design
graph TB
subgraph "Application Layer"
A[Web App] --> B[API Layer]
C[Admin Panel] --> B
D[Scheduled Jobs] --> B
end
subgraph "Queue Layer"
E[email Queue]
F[image Queue]
G[reports Queue]
H[background Queue]
I[priority Queue]
J[dlq Queue]
end
subgraph "Worker Layer"
K[Email Workers]
L[Image Workers]
M[Report Workers]
N[Background Workers]
O[Priority Workers]
end
B --> E
B --> F
B --> G
B --> H
B --> I
E --> K
F --> L
G --> M
H --> N
I --> O
E -.-> J
F -.-> J
G -.-> J
style E fill:#e8f5e8
style F fill:#e3f2fd
style G fill:#fff3e0
style H fill:#fce4ec
style I fill:#ffebee
style J fill:#f3e5f5Advanced Queue Configuration
# advanced_queues.py
from kombu import Queue, Exchange
from datetime import timedelta
# Define exchanges for different message types
email_exchange = Exchange('email', type='direct')
image_exchange = Exchange('image', type='topic')
reports_exchange = Exchange('reports', type='fanout')
priority_exchange = Exchange('priority', type='direct')
app.conf.update(
# Queue definitions with advanced options
task_queues=[
# Email processing queue
Queue(
'email',
email_exchange,
routing_key='email',
queue_arguments={
'x-message-ttl': 3600000, # 1 hour TTL
'x-max-length': 10000, # Max 10k messages
'x-overflow': 'reject-publish',
}
),
# Image processing with different priorities
Queue(
'image.high',
image_exchange,
routing_key='image.high',
queue_arguments={
'x-max-priority': 10,
'x-message-ttl': 1800000, # 30 minutes TTL
}
),
Queue(
'image.normal',
image_exchange,
routing_key='image.normal',
queue_arguments={'x-max-priority': 5}
),
# Reports with delayed processing
Queue(
'reports.immediate',
reports_exchange,
routing_key='reports.immediate'
),
Queue(
'reports.scheduled',
reports_exchange,
routing_key='reports.scheduled',
queue_arguments={
'x-message-ttl': 86400000, # 24 hours TTL
}
),
# Dead letter queue for failed tasks
Queue(
'failed',
Exchange('failed', type='direct'),
routing_key='failed',
queue_arguments={
'x-message-ttl': 604800000, # 7 days TTL
}
),
# Priority queue for critical tasks
Queue(
'critical',
priority_exchange,
routing_key='critical',
queue_arguments={
'x-max-priority': 10,
'x-expires': 3600000, # Queue expires if unused for 1 hour
}
),
],
# Routing configuration
task_routes={
# Email tasks
'app.tasks.email.*': {
'queue': 'email',
'exchange': 'email',
'routing_key': 'email',
},
# Image tasks with conditional routing
'app.tasks.image.resize': {
'queue': 'image.high',
'exchange': 'image',
'routing_key': 'image.high',
},
'app.tasks.image.thumbnail': {
'queue': 'image.normal',
'exchange': 'image',
'routing_key': 'image.normal',
},
# Report tasks
'app.tasks.reports.realtime': {
'queue': 'reports.immediate',
'exchange': 'reports',
},
'app.tasks.reports.batch': {
'queue': 'reports.scheduled',
'exchange': 'reports',
},
# Critical tasks
'app.tasks.critical.*': {
'queue': 'critical',
'exchange': 'priority',
'routing_key': 'critical',
'priority': 9,
},
},
# Default configurations
task_default_queue='default',
task_default_exchange='default',
task_default_routing_key='default',
# Queue-specific settings
task_queue_max_priority=10,
worker_prefetch_multiplier=1,
task_acks_late=True,
)PythonStarting Specialized Workers
#!/bin/bash
# worker_startup.sh
# Email processing workers (I/O bound)
celery -A app worker -Q email -n email_worker@%h \
--pool=eventlet --concurrency=100 --loglevel=info &
# Image processing workers (CPU bound)
celery -A app worker -Q image.high,image.normal -n image_worker@%h \
--pool=prefork --concurrency=4 --loglevel=info &
# Report generation workers (mixed workload)
celery -A app worker -Q reports.immediate,reports.scheduled -n reports_worker@%h \
--pool=threads --concurrency=8 --loglevel=info &
# Critical task worker (dedicated)
celery -A app worker -Q critical -n critical_worker@%h \
--pool=prefork --concurrency=2 --loglevel=info &
# Background task worker (low priority)
celery -A app worker -Q background,cleanup -n background_worker@%h \
--pool=prefork --concurrency=2 --loglevel=warning &
# General purpose worker (handles multiple queues)
celery -A app worker -Q default,email,image.normal -n general_worker@%h \
--pool=prefork --concurrency=4 --loglevel=info &
echo "All workers started"BashQueue Monitoring and Management
# queue_management.py
from celery import Celery
from kombu import Connection
app = Celery('queue_manager')
class QueueManager:
"""Utility class for queue management."""
def __init__(self, broker_url):
self.broker_url = broker_url
def get_queue_info(self, queue_name):
"""Get information about a specific queue."""
with Connection(self.broker_url) as conn:
queue = conn.SimpleQueue(queue_name)
try:
return {
'name': queue_name,
'size': queue.qsize(),
'consumer_count': getattr(queue, 'consumer_count', 0),
}
finally:
queue.close()
def purge_queue(self, queue_name):
"""Remove all messages from a queue."""
with Connection(self.broker_url) as conn:
queue = conn.SimpleQueue(queue_name)
try:
purged = queue.clear()
return purged
finally:
queue.close()
def move_messages(self, source_queue, dest_queue, max_messages=None):
"""Move messages from one queue to another."""
with Connection(self.broker_url) as conn:
source = conn.SimpleQueue(source_queue)
dest = conn.SimpleQueue(dest_queue)
moved = 0
try:
while True:
if max_messages and moved >= max_messages:
break
try:
message = source.get(block=False)
dest.put(message.payload)
message.ack()
moved += 1
except:
break # No more messages
return moved
finally:
source.close()
dest.close()
# Usage
manager = QueueManager('redis://localhost:6379/0')
# Check queue status
info = manager.get_queue_info('email')
print(f"Email queue has {info['size']} messages")
# Move failed messages back to processing queue
moved = manager.move_messages('failed', 'email', max_messages=100)
print(f"Moved {moved} messages from failed to email queue")PythonThis completes the intermediate topics chapters covering task routing, organization, and queue management with comprehensive examples and diagrams.
Chapter 6: Configuration and Settings
Celery Configuration Architecture
graph TB
subgraph "Configuration Sources"
A[Environment Variables] --> E[Final Config]
B[Configuration Files] --> E
C[Command Line Args] --> E
D[Code Configuration] --> E
end
subgraph "Configuration Categories"
F[Broker Settings]
G[Worker Settings]
H[Task Settings]
I[Result Backend]
J[Security Settings]
K[Monitoring Settings]
end
E --> F
E --> G
E --> H
E --> I
E --> J
E --> K
style A fill:#e8f5e8
style B fill:#e3f2fd
style C fill:#fff3e0
style D fill:#fce4ecComprehensive Configuration System
# config/base.py
"""Base configuration for all environments."""
import os
from datetime import timedelta
from kombu import Queue, Exchange
class BaseConfig:
"""Base configuration class."""
# Broker Configuration
broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
# Serialization
task_serializer = 'json'
accept_content = ['json']
result_serializer = 'json'
timezone = 'UTC'
enable_utc = True
# Task Configuration
task_always_eager = False
task_eager_propagates = True
task_ignore_result = False
task_store_eager_result = True
# Worker Configuration
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
worker_disable_rate_limits = False
worker_log_color = True
# Security
worker_hijack_root_logger = False
accept_content = ['json']
task_reject_on_worker_lost = True
# Monitoring
worker_send_task_events = True
task_send_sent_event = True
# Result Configuration
result_expires = 3600 # 1 hour
result_persistent = True
result_compression = 'gzip'
# Error Handling
task_annotations = {
'*': {
'rate_limit': '100/s',
'time_limit': 300, # 5 minutes
'soft_time_limit': 240, # 4 minutes
}
}
# config/development.py
"""Development environment configuration."""
from .base import BaseConfig
class DevelopmentConfig(BaseConfig):
"""Development configuration."""
# Development-specific settings
worker_log_level = 'DEBUG'
worker_reload = True
task_always_eager = False # Enable for synchronous testing
# Reduced limits for development
worker_max_tasks_per_child = 100
result_expires = 300 # 5 minutes
# Development queues
task_routes = {
'app.tasks.test.*': {'queue': 'test'},
'app.tasks.dev.*': {'queue': 'development'},
}
task_queues = (
Queue('default'),
Queue('test'),
Queue('development'),
)
# config/production.py
"""Production environment configuration."""
from .base import BaseConfig
from kombu import Queue, Exchange
class ProductionConfig(BaseConfig):
"""Production configuration."""
# Production broker with connection pooling
broker_url = os.environ.get('CELERY_BROKER_URL')
broker_pool_limit = 10
broker_connection_timeout = 30
broker_connection_retry_on_startup = True
broker_connection_max_retries = 3
# Redis-specific production settings
redis_max_connections = 20
redis_retry_on_timeout = True
redis_socket_connect_timeout = 30
redis_socket_timeout = 30
# Worker optimizations
worker_prefetch_multiplier = 4
worker_max_tasks_per_child = 10000
worker_log_level = 'INFO'
worker_proc_alive_timeout = 60
# Task timeouts and limits
task_time_limit = 1800 # 30 minutes
task_soft_time_limit = 1500 # 25 minutes
# Result backend optimization
result_backend_transport_options = {
'retry_on_timeout': True,
'retry_policy': {
'timeout': 5.0
}
}
# Production queues with exchanges
default_exchange = Exchange('default', type='direct')
email_exchange = Exchange('email', type='direct')
image_exchange = Exchange('image', type='topic')
reports_exchange = Exchange('reports', type='fanout')
task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('email', email_exchange, routing_key='email'),
Queue('image_high', image_exchange, routing_key='image.high'),
Queue('image_normal', image_exchange, routing_key='image.normal'),
Queue('reports', reports_exchange),
Queue('background', routing_key='background'),
)
# Complex routing
task_routes = {
'app.tasks.email.*': {
'queue': 'email',
'exchange': 'email',
'routing_key': 'email',
},
'app.tasks.image.priority.*': {
'queue': 'image_high',
'exchange': 'image',
'routing_key': 'image.high',
'priority': 8,
},
'app.tasks.image.*': {
'queue': 'image_normal',
'exchange': 'image',
'routing_key': 'image.normal',
},
'app.tasks.reports.*': {
'queue': 'reports',
'exchange': 'reports',
},
}
# Security settings
worker_hijack_root_logger = False
task_reject_on_worker_lost = True
# Monitoring
worker_send_task_events = True
task_send_sent_event = True
# Performance annotations
task_annotations = {
'app.tasks.email.*': {
'rate_limit': '200/s',
'time_limit': 300,
},
'app.tasks.image.*': {
'rate_limit': '50/s',
'time_limit': 1800,
'soft_time_limit': 1500,
},
'app.tasks.reports.*': {
'rate_limit': '10/s',
'time_limit': 3600,
'soft_time_limit': 3300,
},
}
# config/__init__.py
"""Configuration module."""
import os
config_mapping = {
'development': 'config.development.DevelopmentConfig',
'production': 'config.production.ProductionConfig',
'testing': 'config.testing.TestingConfig',
}
def get_config():
"""Get configuration based on environment."""
env = os.environ.get('CELERY_ENV', 'development')
config_class = config_mapping.get(env)
if not config_class:
raise ValueError(f"Unknown environment: {env}")
module_name, class_name = config_class.rsplit('.', 1)
module = __import__(module_name, fromlist=[class_name])
return getattr(module, class_name)()PythonAdvanced Worker Configuration
# worker_config.py
"""Advanced worker configuration and optimization."""
import multiprocessing
import os
class WorkerConfig:
"""Advanced worker configuration."""
@staticmethod
def get_optimal_concurrency():
"""Calculate optimal worker concurrency."""
cpu_count = multiprocessing.cpu_count()
# For CPU-bound tasks
cpu_bound_concurrency = cpu_count
# For I/O-bound tasks
io_bound_concurrency = cpu_count * 4
# For mixed workloads
mixed_concurrency = cpu_count * 2
return {
'cpu_bound': cpu_bound_concurrency,
'io_bound': io_bound_concurrency,
'mixed': mixed_concurrency,
}
@staticmethod
def get_memory_optimized_config():
"""Get memory-optimized configuration."""
return {
'worker_max_memory_per_child': 200000, # 200MB
'worker_prefetch_multiplier': 1,
'worker_max_tasks_per_child': 1000,
'task_acks_late': True,
'task_reject_on_worker_lost': True,
}
@staticmethod
def get_performance_config():
"""Get performance-optimized configuration."""
return {
'worker_prefetch_multiplier': 4,
'worker_max_tasks_per_child': 10000,
'worker_disable_rate_limits': True,
'task_compression': 'gzip',
'result_compression': 'gzip',
}
# Dynamic configuration based on environment
def configure_worker_dynamically():
"""Configure worker based on system resources."""
import psutil
# Get system information
memory_gb = psutil.virtual_memory().total / (1024**3)
cpu_count = psutil.cpu_count()
config = {}
# Memory-based configuration
if memory_gb < 2:
config.update({
'worker_max_tasks_per_child': 100,
'worker_prefetch_multiplier': 1,
})
elif memory_gb < 8:
config.update({
'worker_max_tasks_per_child': 1000,
'worker_prefetch_multiplier': 2,
})
else:
config.update({
'worker_max_tasks_per_child': 10000,
'worker_prefetch_multiplier': 4,
})
# CPU-based configuration
if cpu_count <= 2:
config['worker_concurrency'] = cpu_count
else:
config['worker_concurrency'] = cpu_count * 2
return configPythonEnvironment-Specific Settings
# environments.py
"""Environment-specific configuration management."""
import os
from typing import Dict, Any
class EnvironmentManager:
"""Manage environment-specific configurations."""
def __init__(self):
self.env = os.environ.get('CELERY_ENV', 'development')
def get_broker_config(self) -> Dict[str, Any]:
"""Get broker configuration for current environment."""
configs = {
'development': {
'broker_url': 'redis://localhost:6379/0',
'broker_connection_retry_on_startup': True,
},
'testing': {
'broker_url': 'memory://',
'task_always_eager': True,
'task_eager_propagates': True,
},
'staging': {
'broker_url': os.environ.get('REDIS_URL'),
'broker_pool_limit': 5,
'broker_connection_timeout': 10,
},
'production': {
'broker_url': os.environ.get('REDIS_URL'),
'broker_pool_limit': 20,
'broker_connection_timeout': 30,
'broker_connection_retry_on_startup': True,
'broker_connection_max_retries': 5,
}
}
return configs.get(self.env, configs['development'])
def get_result_backend_config(self) -> Dict[str, Any]:
"""Get result backend configuration."""
configs = {
'development': {
'result_backend': 'redis://localhost:6379/0',
'result_expires': 300, # 5 minutes
},
'testing': {
'result_backend': 'cache+memory://',
'result_expires': 60,
},
'production': {
'result_backend': os.environ.get('REDIS_URL'),
'result_expires': 3600, # 1 hour
'result_persistent': True,
'result_backend_transport_options': {
'retry_on_timeout': True,
}
}
}
return configs.get(self.env, configs['development'])
def get_logging_config(self) -> Dict[str, Any]:
"""Get logging configuration."""
configs = {
'development': {
'worker_log_level': 'DEBUG',
'worker_log_color': True,
'worker_redirect_stdouts_level': 'DEBUG',
},
'production': {
'worker_log_level': 'INFO',
'worker_log_color': False,
'worker_redirect_stdouts_level': 'WARNING',
'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s',
}
}
return configs.get(self.env, configs['development'])
# Configuration factory
def create_app_config():
"""Create application configuration."""
env_manager = EnvironmentManager()
config = {}
config.update(env_manager.get_broker_config())
config.update(env_manager.get_result_backend_config())
config.update(env_manager.get_logging_config())
# Common settings
config.update({
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
'timezone': 'UTC',
'enable_utc': True,
})
return configPythonChapter 7: Error Handling and Retries
Exception Handling Architecture
graph TB
subgraph "Task Execution Flow"
A[Task Starts] --> B{Task Executes}
B -->|Success| C[Return Result]
B -->|Exception| D[Exception Handler]
end
subgraph "Error Handling Options"
D --> E{Retry Logic}
E -->|Auto Retry| F[Automatic Retry]
E -->|Manual Retry| G[Custom Retry Logic]
E -->|No Retry| H[Mark as Failed]
F --> I[Exponential Backoff]
F --> J[Fixed Delay]
F --> K[Custom Strategy]
G --> L[Business Logic Check]
G --> M[Conditional Retry]
G --> N[Circuit Breaker]
end
subgraph "Final States"
C --> O[SUCCESS]
H --> P[FAILURE]
I --> Q{Max Retries?}
J --> Q
K --> Q
L --> Q
M --> Q
N --> Q
Q -->|No| B
Q -->|Yes| P
end
style A fill:#e8f5e8
style C fill:#e8f5e8
style D fill:#ffebee
style H fill:#ffebee
style P fill:#ffebeeComprehensive Error Handling
# error_handling.py
"""Comprehensive error handling examples."""
from celery import Celery
from celery.exceptions import Retry, Ignore, WorkerLostError
import random
import time
import logging
from typing import Optional, Dict, Any
app = Celery('error_handling_demo')
# Custom exceptions
class BusinessLogicError(Exception):
"""Exception for business logic failures."""
pass
class ExternalServiceError(Exception):
"""Exception for external service failures."""
pass
class ValidationError(Exception):
"""Exception for input validation failures."""
pass
# Basic exception handling
@app.task(bind=True)
def basic_error_handling(self, data):
"""Basic task with exception handling."""
try:
# Simulate processing
if not data:
raise ValidationError("Data cannot be empty")
if data.get('fail'):
raise BusinessLogicError("Simulated business logic failure")
return f"Processed: {data}"
except ValidationError as exc:
# Don't retry validation errors
logging.error(f"Validation error in task {self.request.id}: {exc}")
raise Ignore()
except BusinessLogicError as exc:
# Log business logic errors but don't retry
logging.error(f"Business logic error in task {self.request.id}: {exc}")
raise
except Exception as exc:
# Unexpected errors - log and re-raise
logging.error(f"Unexpected error in task {self.request.id}: {exc}")
raise
# Automatic retry configuration
@app.task(
bind=True,
autoretry_for=(ExternalServiceError, ConnectionError),
retry_kwargs={'max_retries': 3, 'countdown': 60},
retry_backoff=True,
retry_backoff_max=700,
retry_jitter=True
)
def auto_retry_task(self, url):
"""Task with automatic retry configuration."""
try:
# Simulate external service call
import requests
response = requests.get(url, timeout=10)
if response.status_code >= 500:
raise ExternalServiceError(f"Server error: {response.status_code}")
return response.json()
except requests.RequestException as exc:
logging.warning(f"Request failed, will retry: {exc}")
raise ExternalServiceError(f"Request failed: {exc}")
# Manual retry with custom logic
@app.task(bind=True)
def manual_retry_task(self, operation_data):
"""Task with manual retry logic."""
try:
# Simulate operation
success_rate = operation_data.get('success_rate', 0.5)
if random.random() > success_rate:
raise ExternalServiceError("Operation failed")
return f"Operation succeeded: {operation_data}"
except ExternalServiceError as exc:
# Custom retry logic
current_retry = self.request.retries
max_retries = 5
if current_retry < max_retries:
# Calculate exponential backoff with jitter
base_delay = 2 ** current_retry
jitter = random.uniform(0.5, 1.5)
countdown = base_delay * jitter
logging.warning(
f"Task failed (attempt {current_retry + 1}/{max_retries + 1}). "
f"Retrying in {countdown:.2f} seconds. Error: {exc}"
)
raise self.retry(countdown=countdown, exc=exc)
else:
logging.error(f"Task failed after {max_retries + 1} attempts: {exc}")
raise
# Conditional retry logic
@app.task(bind=True)
def conditional_retry_task(self, data):
"""Task with conditional retry based on error type."""
try:
# Simulate different types of failures
error_type = data.get('error_type')
if error_type == 'temporary':
raise ExternalServiceError("Temporary service unavailable")
elif error_type == 'permanent':
raise BusinessLogicError("Invalid data format")
elif error_type == 'network':
raise ConnectionError("Network timeout")
return f"Success: {data}"
except ExternalServiceError as exc:
# Retry external service errors
if self.request.retries < 3:
raise self.retry(countdown=60, exc=exc)
raise
except ConnectionError as exc:
# Retry network errors with exponential backoff
if self.request.retries < 5:
countdown = (2 ** self.request.retries) * 60
raise self.retry(countdown=countdown, exc=exc)
raise
except BusinessLogicError as exc:
# Don't retry business logic errors
logging.error(f"Business logic error: {exc}")
raise
# Circuit breaker pattern implementation
class CircuitBreaker:
"""Simple circuit breaker implementation."""
def __init__(self, failure_threshold=5, recovery_timeout=300):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise ExternalServiceError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as exc:
self.on_failure()
raise
def on_success(self):
"""Handle successful call."""
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
# Global circuit breaker instance
external_service_breaker = CircuitBreaker()
@app.task(bind=True)
def circuit_breaker_task(self, service_url):
"""Task using circuit breaker pattern."""
def external_service_call():
import requests
response = requests.get(service_url, timeout=10)
if response.status_code >= 500:
raise ExternalServiceError(f"Service error: {response.status_code}")
return response.json()
try:
return external_service_breaker.call(external_service_call)
except ExternalServiceError as exc:
if self.request.retries < 3:
raise self.retry(countdown=300, exc=exc) # Wait 5 minutes
raise
# Dead letter queue implementation
@app.task(bind=True)
def dead_letter_handler(self, original_task_data, error_info):
"""Handle tasks that have exhausted all retries."""
logging.error(
f"Task moved to dead letter queue: "
f"Task: {original_task_data.get('task_name')}, "
f"Args: {original_task_data.get('args')}, "
f"Error: {error_info}"
)
# Store in database for manual review
# Send notification to administrators
# Implement recovery logic if needed
return f"Dead letter processed: {original_task_data['task_id']}"
@app.task(bind=True)
def task_with_dead_letter(self, data):
"""Task that sends failed attempts to dead letter queue."""
try:
# Simulate unreliable operation
if random.random() < 0.8: # 80% failure rate
raise ExternalServiceError("Simulated failure")
return f"Success: {data}"
except ExternalServiceError as exc:
if self.request.retries < 3:
raise self.retry(countdown=60, exc=exc)
else:
# Send to dead letter queue
dead_letter_data = {
'task_id': self.request.id,
'task_name': self.name,
'args': self.request.args,
'kwargs': self.request.kwargs,
'retries': self.request.retries,
}
error_info = {
'error_type': type(exc).__name__,
'error_message': str(exc),
'final_attempt': True,
}
dead_letter_handler.delay(dead_letter_data, error_info)
# Mark task as failed
raisePythonAdvanced Retry Strategies
# retry_strategies.py
"""Advanced retry strategies and patterns."""
import time
import random
import math
from datetime import datetime, timedelta
from celery import Celery
from typing import Optional, Callable, Any
app = Celery('retry_strategies')
class RetryStrategy:
"""Base class for retry strategies."""
def get_delay(self, retry_count: int) -> float:
"""Get delay in seconds for given retry count."""
raise NotImplementedError
class ExponentialBackoffStrategy(RetryStrategy):
"""Exponential backoff with optional jitter."""
def __init__(self, base_delay=1, max_delay=300, jitter=True, backoff_factor=2):
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
self.backoff_factor = backoff_factor
def get_delay(self, retry_count: int) -> float:
"""Calculate exponential backoff delay."""
delay = self.base_delay * (self.backoff_factor ** retry_count)
delay = min(delay, self.max_delay)
if self.jitter:
# Add ±25% jitter
jitter_factor = random.uniform(0.75, 1.25)
delay *= jitter_factor
return delay
class LinearBackoffStrategy(RetryStrategy):
"""Linear backoff strategy."""
def __init__(self, base_delay=60, increment=30, max_delay=600):
self.base_delay = base_delay
self.increment = increment
self.max_delay = max_delay
def get_delay(self, retry_count: int) -> float:
"""Calculate linear backoff delay."""
delay = self.base_delay + (self.increment * retry_count)
return min(delay, self.max_delay)
class FibonacciBackoffStrategy(RetryStrategy):
"""Fibonacci sequence backoff strategy."""
def __init__(self, base_delay=1, max_delay=600):
self.base_delay = base_delay
self.max_delay = max_delay
def get_delay(self, retry_count: int) -> float:
"""Calculate Fibonacci backoff delay."""
def fibonacci(n):
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
delay = self.base_delay * fibonacci(retry_count + 1)
return min(delay, self.max_delay)
class AdaptiveRetryStrategy(RetryStrategy):
"""Adaptive retry strategy based on recent success rates."""
def __init__(self, base_delay=60, min_delay=30, max_delay=900):
self.base_delay = base_delay
self.min_delay = min_delay
self.max_delay = max_delay
self.recent_attempts = []
self.window_size = 100
def record_attempt(self, success: bool):
"""Record the result of an attempt."""
self.recent_attempts.append({
'success': success,
'timestamp': time.time()
})
# Keep only recent attempts
if len(self.recent_attempts) > self.window_size:
self.recent_attempts.pop(0)
def get_success_rate(self) -> float:
"""Calculate recent success rate."""
if not self.recent_attempts:
return 0.5 # Default assumption
successes = sum(1 for attempt in self.recent_attempts if attempt['success'])
return successes / len(self.recent_attempts)
def get_delay(self, retry_count: int) -> float:
"""Calculate adaptive delay based on success rate."""
success_rate = self.get_success_rate()
# Lower success rate = longer delay
failure_factor = 1 / (success_rate + 0.1) # Avoid division by zero
delay = self.base_delay * failure_factor * (retry_count + 1)
return max(self.min_delay, min(delay, self.max_delay))
# Task with custom retry strategy
@app.task(bind=True)
def task_with_custom_retry(self, data, strategy_name='exponential'):
"""Task using custom retry strategies."""
strategies = {
'exponential': ExponentialBackoffStrategy(base_delay=2, max_delay=300),
'linear': LinearBackoffStrategy(base_delay=60, increment=30),
'fibonacci': FibonacciBackoffStrategy(base_delay=5, max_delay=600),
'adaptive': AdaptiveRetryStrategy(base_delay=60),
}
strategy = strategies.get(strategy_name, strategies['exponential'])
try:
# Simulate unreliable operation
if random.random() < 0.7: # 70% failure rate
raise ExternalServiceError("Simulated failure")
# Record success for adaptive strategy
if isinstance(strategy, AdaptiveRetryStrategy):
strategy.record_attempt(True)
return f"Success with {strategy_name} strategy: {data}"
except ExternalServiceError as exc:
# Record failure for adaptive strategy
if isinstance(strategy, AdaptiveRetryStrategy):
strategy.record_attempt(False)
if self.request.retries < 5:
delay = strategy.get_delay(self.request.retries)
logging.warning(
f"Task failed (retry {self.request.retries + 1}/6) "
f"using {strategy_name} strategy. "
f"Retrying in {delay:.2f} seconds."
)
raise self.retry(countdown=delay, exc=exc)
raise
# Time-based retry limits
@app.task(bind=True)
def time_limited_retry_task(self, data, max_retry_time=3600):
"""Task with time-based retry limits (1 hour)."""
start_time = getattr(self.request, 'start_time', None)
if start_time is None:
# First attempt - store start time
self.request.start_time = time.time()
start_time = self.request.start_time
try:
# Simulate operation
if random.random() < 0.6:
raise ExternalServiceError("Simulated failure")
return f"Success: {data}"
except ExternalServiceError as exc:
elapsed_time = time.time() - start_time
if elapsed_time < max_retry_time:
# Calculate delay with exponential backoff
delay = min(60 * (2 ** self.request.retries), 900) # Max 15 minutes
logging.warning(
f"Task failed after {elapsed_time:.0f}s. "
f"Retrying in {delay}s. "
f"Time limit: {max_retry_time}s"
)
raise self.retry(countdown=delay, exc=exc)
else:
logging.error(
f"Task exceeded time limit of {max_retry_time}s. "
f"Total elapsed: {elapsed_time:.0f}s"
)
raise
# Conditional retry based on error analysis
@app.task(bind=True)
def smart_retry_task(self, data):
"""Task with intelligent retry logic based on error analysis."""
def analyze_error(error):
"""Analyze error to determine retry strategy."""
error_str = str(error).lower()
if 'timeout' in error_str or 'connection' in error_str:
return {'should_retry': True, 'delay': 30, 'max_retries': 5}
elif 'rate limit' in error_str:
return {'should_retry': True, 'delay': 300, 'max_retries': 3}
elif 'server error' in error_str or '500' in error_str:
return {'should_retry': True, 'delay': 120, 'max_retries': 4}
elif 'authentication' in error_str or '401' in error_str:
return {'should_retry': False, 'reason': 'Authentication error'}
elif 'not found' in error_str or '404' in error_str:
return {'should_retry': False, 'reason': 'Resource not found'}
else:
return {'should_retry': True, 'delay': 60, 'max_retries': 3}
try:
# Simulate various types of failures
failure_types = [
"Connection timeout",
"Rate limit exceeded",
"Internal server error",
"Authentication failed",
"Resource not found",
"Unknown error"
]
if random.random() < 0.7:
error_type = random.choice(failure_types)
raise ExternalServiceError(error_type)
return f"Success: {data}"
except ExternalServiceError as exc:
retry_info = analyze_error(exc)
if not retry_info['should_retry']:
logging.error(f"Not retrying: {retry_info.get('reason', 'Unknown')}")
raise
max_retries = retry_info['max_retries']
if self.request.retries < max_retries:
delay = retry_info['delay']
logging.warning(
f"Retrying task due to: {exc}. "
f"Attempt {self.request.retries + 1}/{max_retries + 1}. "
f"Delay: {delay}s"
)
raise self.retry(countdown=delay, exc=exc)
logging.error(f"Task failed after {max_retries + 1} attempts: {exc}")
raisePythonThis continues the comprehensive Celery book with detailed error handling and retry strategies. The examples show progressive complexity from basic error handling to sophisticated retry mechanisms with circuit breakers and adaptive strategies.
Chapter 8: Monitoring and Debugging
Monitoring Architecture
graph TB
subgraph "Monitoring Components"
A[Celery Events] --> B[Event Monitor]
C[Worker Status] --> B
D[Task States] --> B
E[Queue Metrics] --> B
end
subgraph "Monitoring Tools"
B --> F[Flower Web UI]
B --> G[Custom Dashboard]
B --> H[Prometheus/Grafana]
B --> I[Application Logs]
end
subgraph "Alerting"
F --> J[Email Alerts]
G --> K[Slack Notifications]
H --> L[PagerDuty]
I --> M[Log Aggregation]
end
subgraph "Metrics"
N[Task Throughput]
O[Worker Utilization]
P[Queue Depth]
Q[Error Rates]
R[Response Times]
end
B --> N
B --> O
B --> P
B --> Q
B --> R
style A fill:#e8f5e8
style F fill:#e3f2fd
style G fill:#fff3e0
style H fill:#fce4ecComprehensive Monitoring Setup
# monitoring.py
"""Comprehensive monitoring and metrics collection."""
import time
import logging
import json
from datetime import datetime, timedelta
from collections import defaultdict, deque
from celery import Celery
from celery.events import EventReceiver
from celery.signals import (
task_sent, task_received, task_started, task_success,
task_failure, task_retry, worker_ready, worker_shutdown
)
import threading
app = Celery('monitoring_demo')
class TaskMetrics:
"""Collect and track task metrics."""
def __init__(self):
self.task_counts = defaultdict(int)
self.task_times = defaultdict(list)
self.task_errors = defaultdict(list)
self.recent_tasks = deque(maxlen=1000)
self.worker_stats = defaultdict(dict)
self.queue_stats = defaultdict(dict)
self.lock = threading.Lock()
def record_task_sent(self, task_id, task_name, queue):
"""Record when a task is sent."""
with self.lock:
self.task_counts[f"{task_name}.sent"] += 1
self.recent_tasks.append({
'task_id': task_id,
'task_name': task_name,
'queue': queue,
'event': 'sent',
'timestamp': time.time()
})
def record_task_started(self, task_id, task_name, worker):
"""Record when a task starts executing."""
with self.lock:
self.task_counts[f"{task_name}.started"] += 1
self.worker_stats[worker]['current_tasks'] = self.worker_stats[worker].get('current_tasks', 0) + 1
def record_task_success(self, task_id, task_name, runtime, worker):
"""Record successful task completion."""
with self.lock:
self.task_counts[f"{task_name}.success"] += 1
self.task_times[task_name].append(runtime)
self.worker_stats[worker]['current_tasks'] = max(0, self.worker_stats[worker].get('current_tasks', 1) - 1)
self.worker_stats[worker]['total_completed'] = self.worker_stats[worker].get('total_completed', 0) + 1
def record_task_failure(self, task_id, task_name, exception, worker):
"""Record task failure."""
with self.lock:
self.task_counts[f"{task_name}.failure"] += 1
self.task_errors[task_name].append({
'timestamp': time.time(),
'exception': str(exception),
'worker': worker
})
self.worker_stats[worker]['current_tasks'] = max(0, self.worker_stats[worker].get('current_tasks', 1) - 1)
self.worker_stats[worker]['total_failed'] = self.worker_stats[worker].get('total_failed', 0) + 1
def get_task_stats(self, task_name):
"""Get statistics for a specific task."""
with self.lock:
sent = self.task_counts.get(f"{task_name}.sent", 0)
started = self.task_counts.get(f"{task_name}.started", 0)
success = self.task_counts.get(f"{task_name}.success", 0)
failure = self.task_counts.get(f"{task_name}.failure", 0)
times = self.task_times.get(task_name, [])
avg_time = sum(times) / len(times) if times else 0
recent_errors = [
err for err in self.task_errors.get(task_name, [])
if time.time() - err['timestamp'] < 3600 # Last hour
]
return {
'sent': sent,
'started': started,
'success': success,
'failure': failure,
'success_rate': success / max(1, success + failure),
'avg_runtime': avg_time,
'recent_errors': len(recent_errors),
'pending': sent - started,
'executing': started - success - failure
}
def get_worker_stats(self):
"""Get worker statistics."""
with self.lock:
return dict(self.worker_stats)
def get_system_overview(self):
"""Get system-wide overview."""
with self.lock:
total_tasks = sum(
count for key, count in self.task_counts.items()
if key.endswith('.sent')
)
total_success = sum(
count for key, count in self.task_counts.items()
if key.endswith('.success')
)
total_failure = sum(
count for key, count in self.task_counts.items()
if key.endswith('.failure')
)
return {
'total_tasks': total_tasks,
'total_success': total_success,
'total_failure': total_failure,
'overall_success_rate': total_success / max(1, total_success + total_failure),
'active_workers': len(self.worker_stats),
'recent_activity': len(self.recent_tasks)
}
# Global metrics collector
metrics = TaskMetrics()
# Signal handlers for automatic metrics collection
@task_sent.connect
def task_sent_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Handle task sent signal."""
queue = kwds.get('routing_key', 'default')
metrics.record_task_sent(task_id, task, queue)
@task_started.connect
def task_started_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Handle task started signal."""
worker = sender.hostname if sender else 'unknown'
metrics.record_task_started(task_id, task, worker)
@task_success.connect
def task_success_handler(sender=None, task_id=None, task=None, result=None, runtime=None, **kwds):
"""Handle task success signal."""
worker = sender.hostname if sender else 'unknown'
metrics.record_task_success(task_id, task, runtime, worker)
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwds):
"""Handle task failure signal."""
worker = sender.hostname if sender else 'unknown'
task_name = kwds.get('task', 'unknown')
metrics.record_task_failure(task_id, task_name, exception, worker)
# Custom monitoring tasks
@app.task
def system_health_check():
"""Perform system health check."""
stats = metrics.get_system_overview()
worker_stats = metrics.get_worker_stats()
health_report = {
'timestamp': datetime.now().isoformat(),
'system_stats': stats,
'worker_count': len(worker_stats),
'workers': worker_stats,
'alerts': []
}
# Check for alerts
if stats['overall_success_rate'] < 0.9:
health_report['alerts'].append({
'level': 'warning',
'message': f"Low success rate: {stats['overall_success_rate']:.2%}"
})
if len(worker_stats) == 0:
health_report['alerts'].append({
'level': 'critical',
'message': "No active workers detected"
})
# Check individual worker health
for worker, stats in worker_stats.items():
current_tasks = stats.get('current_tasks', 0)
if current_tasks > 100: # Assuming high load threshold
health_report['alerts'].append({
'level': 'warning',
'message': f"Worker {worker} has high load: {current_tasks} current tasks"
})
return health_report
@app.task
def generate_performance_report():
"""Generate detailed performance report."""
report = {
'timestamp': datetime.now().isoformat(),
'period': 'last_hour',
'task_performance': {},
'worker_performance': metrics.get_worker_stats(),
'system_overview': metrics.get_system_overview()
}
# Get performance data for each task type
task_names = set()
for key in metrics.task_counts.keys():
task_name = key.rsplit('.', 1)[0]
task_names.add(task_name)
for task_name in task_names:
report['task_performance'][task_name] = metrics.get_task_stats(task_name)
return reportPythonFlower Integration and Custom Dashboards
# flower_integration.py
"""Enhanced Flower integration and custom monitoring."""
import json
import time
from celery.events import EventReceiver
from kombu import Connection
import threading
import logging
class CeleryEventMonitor:
"""Real-time Celery event monitor."""
def __init__(self, broker_url):
self.broker_url = broker_url
self.running = False
self.event_handlers = {}
self.metrics = TaskMetrics()
def register_handler(self, event_type, handler):
"""Register event handler."""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def start_monitoring(self):
"""Start event monitoring in a separate thread."""
def monitor():
with Connection(self.broker_url) as connection:
recv = EventReceiver(connection, handlers={
'task-sent': self.on_task_sent,
'task-received': self.on_task_received,
'task-started': self.on_task_started,
'task-succeeded': self.on_task_succeeded,
'task-failed': self.on_task_failed,
'task-retried': self.on_task_retried,
'worker-online': self.on_worker_online,
'worker-offline': self.on_worker_offline,
'worker-heartbeat': self.on_worker_heartbeat,
})
self.running = True
logging.info("Celery event monitoring started")
try:
recv.capture(limit=None, timeout=None, wakeup=True)
except KeyboardInterrupt:
logging.info("Event monitoring stopped")
finally:
self.running = False
thread = threading.Thread(target=monitor, daemon=True)
thread.start()
return thread
def on_task_sent(self, event):
"""Handle task-sent event."""
for handler in self.event_handlers.get('task-sent', []):
handler(event)
def on_task_received(self, event):
"""Handle task-received event."""
for handler in self.event_handlers.get('task-received', []):
handler(event)
def on_task_started(self, event):
"""Handle task-started event."""
for handler in self.event_handlers.get('task-started', []):
handler(event)
def on_task_succeeded(self, event):
"""Handle task-succeeded event."""
for handler in self.event_handlers.get('task-succeeded', []):
handler(event)
def on_task_failed(self, event):
"""Handle task-failed event."""
for handler in self.event_handlers.get('task-failed', []):
handler(event)
def on_task_retried(self, event):
"""Handle task-retried event."""
for handler in self.event_handlers.get('task-retried', []):
handler(event)
def on_worker_online(self, event):
"""Handle worker-online event."""
logging.info(f"Worker came online: {event.get('hostname')}")
for handler in self.event_handlers.get('worker-online', []):
handler(event)
def on_worker_offline(self, event):
"""Handle worker-offline event."""
logging.warning(f"Worker went offline: {event.get('hostname')}")
for handler in self.event_handlers.get('worker-offline', []):
handler(event)
def on_worker_heartbeat(self, event):
"""Handle worker-heartbeat event."""
for handler in self.event_handlers.get('worker-heartbeat', []):
handler(event)
# Custom dashboard implementation
from flask import Flask, jsonify, render_template
import json
dashboard_app = Flask(__name__)
monitor = CeleryEventMonitor('redis://localhost:6379/0')
@dashboard_app.route('/api/metrics')
def get_metrics():
"""Get current metrics."""
return jsonify({
'system_overview': monitor.metrics.get_system_overview(),
'worker_stats': monitor.metrics.get_worker_stats(),
'timestamp': time.time()
})
@dashboard_app.route('/api/task/<task_name>')
def get_task_metrics(task_name):
"""Get metrics for specific task."""
return jsonify(monitor.metrics.get_task_stats(task_name))
@dashboard_app.route('/api/health')
def health_check():
"""Health check endpoint."""
stats = monitor.metrics.get_system_overview()
is_healthy = (
stats['overall_success_rate'] > 0.95 and
stats['active_workers'] > 0
)
return jsonify({
'healthy': is_healthy,
'stats': stats,
'timestamp': time.time()
})
@dashboard_app.route('/')
def dashboard():
"""Main dashboard page."""
return render_template('dashboard.html')
# Dashboard HTML template (dashboard.html)
DASHBOARD_HTML = """
<!DOCTYPE html>
<html>
<head>
<title>Celery Monitoring Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.metric-card {
border: 1px solid #ddd;
border-radius: 8px;
padding: 15px;
margin: 10px;
display: inline-block;
min-width: 200px;
}
.metric-value { font-size: 2em; font-weight: bold; }
.metric-label { color: #666; }
.alert { background-color: #f8d7da; border: 1px solid #f5c6cb; padding: 10px; margin: 10px 0; border-radius: 4px; }
.success { color: #28a745; }
.warning { color: #ffc107; }
.danger { color: #dc3545; }
</style>
</head>
<body>
<h1>Celery Monitoring Dashboard</h1>
<div id="overview">
<div class="metric-card">
<div class="metric-value" id="total-tasks">-</div>
<div class="metric-label">Total Tasks</div>
</div>
<div class="metric-card">
<div class="metric-value" id="success-rate">-</div>
<div class="metric-label">Success Rate</div>
</div>
<div class="metric-card">
<div class="metric-value" id="active-workers">-</div>
<div class="metric-label">Active Workers</div>
</div>
</div>
<div id="alerts"></div>
<canvas id="taskChart" width="400" height="200"></canvas>
<script>
function updateDashboard() {
fetch('/api/metrics')
.then(response => response.json())
.then(data => {
const overview = data.system_overview;
document.getElementById('total-tasks').textContent = overview.total_tasks;
document.getElementById('success-rate').textContent =
(overview.overall_success_rate * 100).toFixed(1) + '%';
document.getElementById('active-workers').textContent = overview.active_workers;
// Update chart
updateChart(overview);
})
.catch(error => console.error('Error:', error));
}
function updateChart(data) {
// Chart update logic here
}
// Update every 5 seconds
setInterval(updateDashboard, 5000);
updateDashboard();
</script>
</body>
</html>
"""
# Alerting system
class AlertManager:
"""Manage alerts and notifications."""
def __init__(self):
self.alert_rules = []
self.notification_channels = []
def add_rule(self, name, condition, severity='warning'):
"""Add an alert rule."""
self.alert_rules.append({
'name': name,
'condition': condition,
'severity': severity
})
def add_notification_channel(self, channel):
"""Add notification channel."""
self.notification_channels.append(channel)
def check_alerts(self, metrics):
"""Check all alert rules against current metrics."""
alerts = []
for rule in self.alert_rules:
try:
if rule['condition'](metrics):
alert = {
'name': rule['name'],
'severity': rule['severity'],
'timestamp': time.time(),
'metrics': metrics
}
alerts.append(alert)
self.send_alert(alert)
except Exception as e:
logging.error(f"Error checking alert rule {rule['name']}: {e}")
return alerts
def send_alert(self, alert):
"""Send alert through all notification channels."""
for channel in self.notification_channels:
try:
channel.send(alert)
except Exception as e:
logging.error(f"Error sending alert: {e}")
# Email notification channel
import smtplib
from email.mime.text import MIMEText
class EmailNotificationChannel:
"""Email notification channel."""
def __init__(self, smtp_host, smtp_port, username, password, recipients):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
self.recipients = recipients
def send(self, alert):
"""Send alert via email."""
subject = f"Celery Alert: {alert['name']} ({alert['severity']})"
body = f"""
Alert: {alert['name']}
Severity: {alert['severity']}
Time: {datetime.fromtimestamp(alert['timestamp'])}
Metrics:
{json.dumps(alert['metrics'], indent=2)}
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.username
msg['To'] = ', '.join(self.recipients)
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
# Setup example
alert_manager = AlertManager()
# Add alert rules
alert_manager.add_rule(
'Low Success Rate',
lambda m: m.get_system_overview()['overall_success_rate'] < 0.9,
'warning'
)
alert_manager.add_rule(
'No Active Workers',
lambda m: m.get_system_overview()['active_workers'] == 0,
'critical'
)
# Add notification channels
email_channel = EmailNotificationChannel(
smtp_host='smtp.gmail.com',
smtp_port=587,
username='alerts@company.com',
password='password',
recipients=['admin@company.com']
)
alert_manager.add_notification_channel(email_channel)PythonChapter 9: Workflows and Chains
Workflow Architecture
graph TB
subgraph "Workflow Patterns"
A[Simple Chain] --> B[Task 1 → Task 2 → Task 3]
C[Group] --> D[Task 1, Task 2, Task 3 in parallel]
E[Chord] --> F[Group → Callback Task]
G[Map] --> H[Same Task with different args]
I[Starmap] --> J[Task with argument unpacking]
K[Chain of Groups] --> L[Complex Workflow]
end
subgraph "Workflow Control"
M[Conditional Execution]
N[Error Handling]
O[Partial Failures]
P[Workflow State]
Q[Result Aggregation]
end
B --> M
D --> N
F --> O
H --> P
J --> Q
L --> M
style A fill:#e8f5e8
style C fill:#e3f2fd
style E fill:#fff3e0
style G fill:#fce4ec
style I fill:#f3e5f5
style K fill:#e1f5feAdvanced Workflow Implementation
# workflows.py
"""Advanced workflow patterns and implementations."""
from celery import Celery, group, chain, chord, chunks, signature
from celery.result import GroupResult, ResultSet
import time
import random
import logging
from typing import List, Dict, Any, Optional
app = Celery('workflow_demo')
# Basic workflow tasks
@app.task
def download_data(url: str) -> Dict[str, Any]:
"""Download data from URL."""
time.sleep(random.uniform(1, 3)) # Simulate download
return {
'url': url,
'size': random.randint(1000, 10000),
'data': f"content_from_{url.split('/')[-1]}",
'timestamp': time.time()
}
@app.task
def process_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Process downloaded data."""
time.sleep(random.uniform(0.5, 2)) # Simulate processing
processed = {
'original_size': data['size'],
'processed_size': data['size'] * random.uniform(0.7, 1.3),
'url': data['url'],
'processed_at': time.time(),
'processing_time': random.uniform(0.5, 2)
}
return processed
@app.task
def validate_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate processed data."""
time.sleep(random.uniform(0.2, 0.8))
# Simulate validation logic
is_valid = random.random() > 0.1 # 90% success rate
result = {
'url': data['url'],
'is_valid': is_valid,
'validation_time': time.time(),
'errors': [] if is_valid else ['Invalid data format'],
'original_data': data
}
if not is_valid:
raise ValueError(f"Data validation failed for {data['url']}")
return result
@app.task
def aggregate_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate multiple results."""
time.sleep(random.uniform(0.5, 1))
valid_results = [r for r in results if r.get('is_valid', False)]
total_size = sum(r['original_data']['processed_size'] for r in valid_results)
return {
'total_items': len(results),
'valid_items': len(valid_results),
'total_size': total_size,
'success_rate': len(valid_results) / len(results) if results else 0,
'aggregated_at': time.time()
}
@app.task
def send_notification(result: Dict[str, Any], recipient: str) -> str:
"""Send notification about workflow completion."""
time.sleep(random.uniform(0.3, 0.7))
message = f"""
Workflow completed for {recipient}
Total items: {result['total_items']}
Valid items: {result['valid_items']}
Success rate: {result['success_rate']:.2%}
Total size: {result['total_size']:.2f}
"""
return f"Notification sent to {recipient}: {message.strip()}"
# Simple workflow chains
def create_simple_chain(urls: List[str]) -> chain:
"""Create a simple sequential chain."""
# Download → Process → Validate for each URL
return chain(
download_data.s(urls[0]),
process_data.s(),
validate_data.s()
)
def create_parallel_processing(urls: List[str]) -> group:
"""Create parallel processing workflow."""
# Process multiple URLs in parallel
return group(
chain(
download_data.s(url),
process_data.s(),
validate_data.s()
) for url in urls
)
def create_chord_workflow(urls: List[str], recipient: str) -> chord:
"""Create chord workflow (parallel + callback)."""
# Process all URLs in parallel, then aggregate and notify
parallel_tasks = group(
chain(
download_data.s(url),
process_data.s(),
validate_data.s()
) for url in urls
)
return chord(
parallel_tasks,
chain(
aggregate_results.s(),
send_notification.s(recipient)
)
)
# Advanced workflow patterns
class WorkflowBuilder:
"""Build complex workflows with error handling."""
def __init__(self):
self.tasks = []
self.error_handlers = {}
def add_task(self, task_sig, error_handler=None):
"""Add task to workflow."""
self.tasks.append(task_sig)
if error_handler:
self.error_handlers[len(self.tasks) - 1] = error_handler
return self
def add_parallel_group(self, task_sigs, error_handler=None):
"""Add parallel group to workflow."""
group_sig = group(task_sigs)
return self.add_task(group_sig, error_handler)
def build_chain(self):
"""Build chain from added tasks."""
if not self.tasks:
raise ValueError("No tasks added to workflow")
return chain(*self.tasks)
def build_resilient_chain(self):
"""Build chain with error handling."""
# Implementation would include try/catch logic
# This is a simplified version
return self.build_chain()
# Conditional workflow execution
@app.task(bind=True)
def conditional_task(self, data: Dict[str, Any], condition_func: str) -> Dict[str, Any]:
"""Execute task based on condition."""
# Define condition functions
conditions = {
'size_check': lambda d: d.get('size', 0) > 5000,
'url_check': lambda d: 'important' in d.get('url', ''),
'time_check': lambda d: time.time() - d.get('timestamp', 0) < 3600,
}
condition = conditions.get(condition_func)
if not condition:
raise ValueError(f"Unknown condition: {condition_func}")
if condition(data):
# Execute additional processing
time.sleep(random.uniform(1, 2))
data['conditional_processing'] = True
data['processed_at'] = time.time()
else:
data['conditional_processing'] = False
data['skipped_reason'] = f"Failed condition: {condition_func}"
return data
def create_conditional_workflow(urls: List[str]) -> chain:
"""Create workflow with conditional execution."""
return chain(
# Download all data in parallel
group(download_data.s(url) for url in urls),
# Process each item conditionally
conditional_task.s('size_check'),
# Final aggregation
aggregate_results.s()
)
# Workflow with retry and error handling
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def resilient_download(self, url: str) -> Dict[str, Any]:
"""Download with automatic retry."""
if random.random() < 0.2: # 20% failure rate
raise ConnectionError(f"Failed to download from {url}")
return download_data(url)
@app.task(bind=True)
def error_recovery_task(self, failed_results: List[Any], backup_urls: List[str]) -> List[Dict[str, Any]]:
"""Recover from failed tasks using backup sources."""
recovered_results = []
for i, result in enumerate(failed_results):
if isinstance(result, Exception) and i < len(backup_urls):
# Try backup URL
try:
backup_result = download_data(backup_urls[i])
recovered_results.append(backup_result)
except Exception as e:
logging.error(f"Backup also failed: {e}")
recovered_results.append({'error': str(e), 'url': backup_urls[i]})
else:
recovered_results.append(result)
return recovered_results
# Dynamic workflow generation
class DynamicWorkflowGenerator:
"""Generate workflows based on runtime conditions."""
@staticmethod
def generate_etl_workflow(config: Dict[str, Any]) -> Any:
"""Generate ETL workflow based on configuration."""
source_type = config.get('source_type', 'url')
processing_steps = config.get('processing_steps', [])
output_format = config.get('output_format', 'json')
workflow_tasks = []
# Extract phase
if source_type == 'url':
extract_task = download_data.s(config['source'])
elif source_type == 'file':
extract_task = read_file.s(config['source'])
else:
raise ValueError(f"Unknown source type: {source_type}")
workflow_tasks.append(extract_task)
# Transform phase
for step in processing_steps:
if step == 'validate':
workflow_tasks.append(validate_data.s())
elif step == 'process':
workflow_tasks.append(process_data.s())
elif step == 'conditional':
workflow_tasks.append(conditional_task.s(config.get('condition', 'size_check')))
# Load phase
if output_format == 'json':
workflow_tasks.append(save_as_json.s(config.get('output_path')))
elif output_format == 'csv':
workflow_tasks.append(save_as_csv.s(config.get('output_path')))
return chain(*workflow_tasks)
@staticmethod
def generate_fan_out_fan_in(data_sources: List[str], aggregation_func: str) -> chord:
"""Generate fan-out/fan-in pattern."""
# Fan-out: process each source independently
fan_out_tasks = group(
chain(
download_data.s(source),
process_data.s(),
validate_data.s()
) for source in data_sources
)
# Fan-in: aggregate results
if aggregation_func == 'sum':
fan_in_task = aggregate_results.s()
elif aggregation_func == 'average':
fan_in_task = calculate_average.s()
else:
fan_in_task = aggregate_results.s()
return chord(fan_out_tasks, fan_in_task)
# Workflow state management
class WorkflowState:
"""Manage workflow state and progress."""
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.start_time = time.time()
self.tasks = {}
self.current_step = 0
self.total_steps = 0
self.status = 'running'
def add_task(self, task_id: str, task_name: str, status: str = 'pending'):
"""Add task to workflow state."""
self.tasks[task_id] = {
'name': task_name,
'status': status,
'start_time': None,
'end_time': None,
'result': None,
'error': None
}
self.total_steps += 1
def update_task(self, task_id: str, status: str, result: Any = None, error: str = None):
"""Update task status."""
if task_id in self.tasks:
task = self.tasks[task_id]
task['status'] = status
if status == 'started':
task['start_time'] = time.time()
elif status in ['success', 'failure']:
task['end_time'] = time.time()
if result is not None:
task['result'] = result
if error:
task['error'] = error
if status == 'success':
self.current_step += 1
def get_progress(self) -> Dict[str, Any]:
"""Get workflow progress."""
completed_tasks = sum(1 for task in self.tasks.values() if task['status'] in ['success', 'failure'])
return {
'workflow_id': self.workflow_id,
'status': self.status,
'progress': completed_tasks / max(1, self.total_steps),
'current_step': self.current_step,
'total_steps': self.total_steps,
'elapsed_time': time.time() - self.start_time,
'tasks': self.tasks
}
# Workflow execution examples
def execute_simple_workflow():
"""Execute simple workflow example."""
urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3'
]
# Simple chain
workflow = create_simple_chain(urls[:1])
result = workflow.apply_async()
print(f"Simple workflow started: {result.id}")
return result
def execute_parallel_workflow():
"""Execute parallel workflow example."""
urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3',
'https://api.example.com/data/4'
]
# Parallel processing
workflow = create_parallel_processing(urls)
result = workflow.apply_async()
print(f"Parallel workflow started: {result.id}")
return result
def execute_chord_workflow():
"""Execute chord workflow example."""
urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3',
'https://api.example.com/data/4',
'https://api.example.com/data/5'
]
# Chord workflow (parallel + callback)
workflow = create_chord_workflow(urls, 'admin@example.com')
result = workflow.apply_async()
print(f"Chord workflow started: {result.id}")
return result
# Workflow monitoring
def monitor_workflow(result):
"""Monitor workflow execution."""
while not result.ready():
if hasattr(result, 'children') and result.children:
# Group or chord result
completed = sum(1 for child in result.children if child.ready())
total = len(result.children)
print(f"Progress: {completed}/{total} tasks completed")
else:
# Single task result
print(f"Status: {result.status}")
time.sleep(2)
if result.successful():
print(f"Workflow completed successfully: {result.result}")
else:
print(f"Workflow failed: {result.result}")
return result.result
# Example usage
if __name__ == '__main__':
# Execute different workflow patterns
print("=== Simple Workflow ===")
simple_result = execute_simple_workflow()
monitor_workflow(simple_result)
print("\n=== Parallel Workflow ===")
parallel_result = execute_parallel_workflow()
monitor_workflow(parallel_result)
print("\n=== Chord Workflow ===")
chord_result = execute_chord_workflow()
monitor_workflow(chord_result)PythonThis continues our comprehensive Celery book with advanced monitoring, debugging capabilities, and sophisticated workflow patterns. The examples demonstrate real-world scenarios with proper error handling, state management, and monitoring capabilities.
Chapter 10: Performance Optimization
Performance Analysis Framework
graph TB
subgraph "Performance Metrics"
A[Throughput] --> E[Tasks/Second]
B[Latency] --> F[Task Duration]
C[Resource Usage] --> G[CPU/Memory]
D[Queue Depth] --> H[Pending Tasks]
end
subgraph "Optimization Areas"
I[Worker Optimization]
J[Task Optimization]
K[Broker Optimization]
L[Network Optimization]
M[Memory Optimization]
end
subgraph "Optimization Techniques"
N[Connection Pooling]
O[Batch Processing]
P[Prefetching Control]
Q[Serialization Tuning]
R[Concurrency Tuning]
end
E --> I
F --> J
G --> K
H --> L
I --> N
J --> O
K --> P
L --> Q
M --> R
style A fill:#e8f5e8
style B fill:#e3f2fd
style C fill:#fff3e0
style D fill:#fce4ecComprehensive Performance Optimization
# performance_optimization.py
"""Comprehensive performance optimization techniques."""
import time
import psutil
import threading
import multiprocessing
from collections import deque, defaultdict
from contextlib import contextmanager
from celery import Celery
from celery.signals import task_prerun, task_postrun
import logging
app = Celery('performance_optimized')
class PerformanceProfiler:
"""Profile task and worker performance."""
def __init__(self):
self.task_metrics = defaultdict(lambda: {
'count': 0,
'total_time': 0,
'max_time': 0,
'min_time': float('inf'),
'memory_usage': deque(maxlen=100),
'cpu_usage': deque(maxlen=100)
})
self.system_metrics = {
'peak_memory': 0,
'peak_cpu': 0,
'total_tasks': 0
}
self.lock = threading.Lock()
@contextmanager
def profile_task(self, task_name):
"""Context manager for profiling task execution."""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
start_cpu = psutil.cpu_percent()
try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
end_cpu = psutil.cpu_percent()
duration = end_time - start_time
memory_used = end_memory - start_memory
cpu_used = end_cpu - start_cpu
with self.lock:
metrics = self.task_metrics[task_name]
metrics['count'] += 1
metrics['total_time'] += duration
metrics['max_time'] = max(metrics['max_time'], duration)
metrics['min_time'] = min(metrics['min_time'], duration)
metrics['memory_usage'].append(memory_used)
metrics['cpu_usage'].append(cpu_used)
self.system_metrics['peak_memory'] = max(
self.system_metrics['peak_memory'], end_memory
)
self.system_metrics['peak_cpu'] = max(
self.system_metrics['peak_cpu'], end_cpu
)
self.system_metrics['total_tasks'] += 1
def get_task_stats(self, task_name):
"""Get performance statistics for a task."""
with self.lock:
metrics = self.task_metrics[task_name]
if metrics['count'] == 0:
return None
avg_time = metrics['total_time'] / metrics['count']
avg_memory = sum(metrics['memory_usage']) / len(metrics['memory_usage']) if metrics['memory_usage'] else 0
avg_cpu = sum(metrics['cpu_usage']) / len(metrics['cpu_usage']) if metrics['cpu_usage'] else 0
return {
'count': metrics['count'],
'avg_time': avg_time,
'max_time': metrics['max_time'],
'min_time': metrics['min_time'] if metrics['min_time'] != float('inf') else 0,
'avg_memory_mb': avg_memory,
'avg_cpu_percent': avg_cpu,
'throughput': metrics['count'] / metrics['total_time'] if metrics['total_time'] > 0 else 0
}
def get_system_stats(self):
"""Get system-wide performance statistics."""
return self.system_metrics.copy()
# Global profiler instance
profiler = PerformanceProfiler()
# Optimized task implementations
@app.task(bind=True)
def optimized_data_processing(self, data_batch, chunk_size=1000):
"""Optimized task for processing large datasets."""
task_name = self.name
with profiler.profile_task(task_name):
results = []
# Process in chunks to manage memory
for i in range(0, len(data_batch), chunk_size):
chunk = data_batch[i:i + chunk_size]
# Efficient processing logic
chunk_results = [
process_single_item(item) for item in chunk
]
results.extend(chunk_results)
# Periodic garbage collection for large datasets
if i % (chunk_size * 10) == 0:
import gc
gc.collect()
return {
'processed_count': len(results),
'results': results[:100], # Return only first 100 for memory efficiency
'total_batches': len(data_batch) // chunk_size + 1
}
def process_single_item(item):
"""Optimized single item processing."""
# Simulate processing with minimal memory allocation
return {
'id': item.get('id'),
'processed': True,
'value': item.get('value', 0) * 2
}
# Connection pooling optimization
class OptimizedCeleryConfig:
"""Optimized Celery configuration for high performance."""
# Broker optimizations
broker_url = 'redis://localhost:6379/0'
broker_pool_limit = 20 # Increase connection pool
broker_connection_timeout = 30
broker_connection_retry_on_startup = True
broker_connection_max_retries = 5
# Result backend optimizations
result_backend = 'redis://localhost:6379/0'
result_backend_transport_options = {
'retry_on_timeout': True,
'retry_policy': {'timeout': 5.0}
}
# Worker optimizations
worker_prefetch_multiplier = 4 # Optimize based on task duration
worker_max_tasks_per_child = 10000 # Prevent memory leaks
worker_max_memory_per_child = 200000 # 200MB limit
# Task optimizations
task_compression = 'gzip'
result_compression = 'gzip'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# Performance settings
task_acks_late = True # Better fault tolerance
task_reject_on_worker_lost = True
worker_disable_rate_limits = True # For high throughput
# Monitoring
worker_send_task_events = True
task_send_sent_event = True
app.config_from_object(OptimizedCeleryConfig)
# Batch processing optimization
@app.task(bind=True)
def batch_processor(self, items, batch_size=100):
"""Process items in optimized batches."""
task_name = self.name
with profiler.profile_task(task_name):
total_items = len(items)
processed_count = 0
results = []
for i in range(0, total_items, batch_size):
batch = items[i:i + batch_size]
# Process batch efficiently
batch_results = process_batch_optimized(batch)
results.extend(batch_results)
processed_count += len(batch)
# Update progress
progress = processed_count / total_items
self.update_state(
state='PROGRESS',
meta={'current': processed_count, 'total': total_items, 'progress': progress}
)
# Yield control periodically
if i % (batch_size * 10) == 0:
time.sleep(0.001) # Micro-sleep to yield
return {
'processed_count': processed_count,
'success_rate': len([r for r in results if r.get('success')]) / processed_count,
'total_time': time.time()
}
def process_batch_optimized(batch):
"""Optimized batch processing function."""
# Use list comprehension for efficiency
return [
{
'id': item['id'],
'success': True,
'result': item.get('value', 0) ** 2
}
for item in batch
if item.get('value') is not None
]
# Memory-efficient task for large files
@app.task(bind=True)
def process_large_file(self, file_path, chunk_size=8192):
"""Process large files with memory efficiency."""
task_name = self.name
with profiler.profile_task(task_name):
processed_lines = 0
processed_size = 0
try:
with open(file_path, 'r', buffering=chunk_size) as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
# Process chunk
lines = chunk.count('\n')
processed_lines += lines
processed_size += len(chunk.encode('utf-8'))
# Update progress
self.update_state(
state='PROGRESS',
meta={
'lines_processed': processed_lines,
'bytes_processed': processed_size
}
)
except IOError as e:
raise Exception(f"Error processing file {file_path}: {e}")
return {
'file_path': file_path,
'lines_processed': processed_lines,
'bytes_processed': processed_size
}
# CPU-intensive task optimization
@app.task(bind=True)
def cpu_intensive_task(self, data, use_multiprocessing=True):
"""CPU-intensive task with multiprocessing optimization."""
task_name = self.name
with profiler.profile_task(task_name):
if use_multiprocessing and len(data) > 1000:
# Use multiprocessing for large datasets
num_processes = min(multiprocessing.cpu_count(), 8)
chunk_size = len(data) // num_processes
with multiprocessing.Pool(processes=num_processes) as pool:
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
results = pool.map(cpu_intensive_function, chunks)
# Flatten results
final_result = []
for chunk_result in results:
final_result.extend(chunk_result)
return {
'processed_count': len(final_result),
'processes_used': num_processes,
'results': final_result
}
else:
# Single-threaded processing for smaller datasets
result = cpu_intensive_function(data)
return {
'processed_count': len(result),
'processes_used': 1,
'results': result
}
def cpu_intensive_function(data_chunk):
"""CPU-intensive processing function."""
results = []
for item in data_chunk:
# Simulate CPU-intensive computation
value = item.get('value', 1)
computed = sum(i ** 2 for i in range(value % 1000))
results.append({
'id': item.get('id'),
'computed_value': computed,
'original_value': value
})
return results
# I/O optimization with connection pooling
import redis
from contextlib import contextmanager
class RedisConnectionPool:
"""Optimized Redis connection pool."""
def __init__(self, host='localhost', port=6379, max_connections=20):
self.pool = redis.ConnectionPool(
host=host,
port=port,
max_connections=max_connections,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={}
)
@contextmanager
def get_connection(self):
"""Get Redis connection from pool."""
client = redis.Redis(connection_pool=self.pool)
try:
yield client
finally:
client.close()
# Global connection pool
redis_pool = RedisConnectionPool()
@app.task(bind=True)
def io_optimized_task(self, keys_to_process):
"""I/O optimized task using connection pooling."""
task_name = self.name
with profiler.profile_task(task_name):
results = []
with redis_pool.get_connection() as redis_client:
# Use pipeline for bulk operations
pipe = redis_client.pipeline(transaction=False)
# Batch Redis operations
for key in keys_to_process:
pipe.get(key)
# Execute all operations at once
redis_results = pipe.execute()
# Process results
for i, value in enumerate(redis_results):
if value:
results.append({
'key': keys_to_process[i],
'value': value.decode('utf-8') if isinstance(value, bytes) else value,
'processed': True
})
return {
'processed_count': len(results),
'total_keys': len(keys_to_process),
'success_rate': len(results) / len(keys_to_process)
}
# Performance monitoring task
@app.task
def performance_report():
"""Generate performance report."""
system_stats = profiler.get_system_stats()
# Get stats for all tracked tasks
task_stats = {}
for task_name in profiler.task_metrics.keys():
stats = profiler.get_task_stats(task_name)
if stats:
task_stats[task_name] = stats
# System resource usage
current_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
current_cpu = psutil.cpu_percent(interval=1)
return {
'timestamp': time.time(),
'system_stats': system_stats,
'current_memory_mb': current_memory,
'current_cpu_percent': current_cpu,
'task_performance': task_stats
}
# Performance optimization utilities
class PerformanceOptimizer:
"""Utilities for performance optimization."""
@staticmethod
def optimize_worker_settings(task_type='mixed'):
"""Get optimized worker settings based on task type."""
settings = {
'cpu_bound': {
'pool': 'prefork',
'concurrency': multiprocessing.cpu_count(),
'prefetch_multiplier': 1,
'max_tasks_per_child': 1000,
},
'io_bound': {
'pool': 'eventlet',
'concurrency': 500,
'prefetch_multiplier': 10,
'max_tasks_per_child': 10000,
},
'mixed': {
'pool': 'threads',
'concurrency': multiprocessing.cpu_count() * 2,
'prefetch_multiplier': 4,
'max_tasks_per_child': 5000,
}
}
return settings.get(task_type, settings['mixed'])
@staticmethod
def calculate_optimal_batch_size(item_size_bytes, available_memory_mb=1000):
"""Calculate optimal batch size based on memory constraints."""
available_bytes = available_memory_mb * 1024 * 1024
safety_factor = 0.8 # Use 80% of available memory
optimal_batch_size = int((available_bytes * safety_factor) / item_size_bytes)
# Ensure reasonable bounds
return max(10, min(optimal_batch_size, 10000))
@staticmethod
def monitor_memory_usage():
"""Monitor current memory usage."""
process = psutil.Process()
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024,
'vms_mb': memory_info.vms / 1024 / 1024,
'percent': process.memory_percent(),
'available_mb': psutil.virtual_memory().available / 1024 / 1024
}
# Example optimization implementations
def run_performance_comparison():
"""Compare performance of different implementations."""
# Test data
test_data = [{'id': i, 'value': i * 2} for i in range(10000)]
print("=== Performance Comparison ===")
# Test batch processing
start_time = time.time()
result1 = batch_processor.delay(test_data, batch_size=100)
batch_time = time.time() - start_time
print(f"Batch processing (100 items/batch): {batch_time:.2f}s")
# Test larger batches
start_time = time.time()
result2 = batch_processor.delay(test_data, batch_size=1000)
large_batch_time = time.time() - start_time
print(f"Batch processing (1000 items/batch): {large_batch_time:.2f}s")
# Wait for results and compare
batch_result = result1.get()
large_batch_result = result2.get()
print(f"Small batches processed: {batch_result['processed_count']}")
print(f"Large batches processed: {large_batch_result['processed_count']}")
# Generate performance report
report = performance_report.delay()
print("Performance report:")
print(report.get())
if __name__ == '__main__':
run_performance_comparison()PythonChapter 11: Security and Authentication
Security Architecture
graph TB
subgraph "Security Layers"
A[Transport Security] --> B[TLS/SSL Encryption]
C[Message Security] --> D[Message Signing/Encryption]
E[Access Control] --> F[Authentication & Authorization]
G[Network Security] --> H[Firewalls & VPNs]
end
subgraph "Authentication Methods"
I[Username/Password]
J[Certificate-based]
K[Token-based]
L[SASL Authentication]
end
subgraph "Security Threats"
M[Message Tampering]
N[Unauthorized Access]
O[Data Interception]
P[Privilege Escalation]
end
subgraph "Mitigation Strategies"
Q[Message Encryption]
R[Access Controls]
S[Secure Channels]
T[Principle of Least Privilege]
end
B --> S
D --> Q
F --> R
H --> S
M --> Q
N --> R
O --> S
P --> T
style A fill:#e8f5e8
style C fill:#e3f2fd
style E fill:#fff3e0
style G fill:#fce4ecComprehensive Security Implementation
# security.py
"""Comprehensive security implementation for Celery."""
import os
import ssl
import hmac
import hashlib
import secrets
import base64
import json
from datetime import datetime, timedelta
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from celery import Celery
from kombu.serialization import register
import logging
# Secure configuration class
class SecureCeleryConfig:
"""Secure Celery configuration with authentication and encryption."""
# Secure broker configuration with SSL
broker_use_ssl = {
'keyfile': '/path/to/client-key.pem',
'certfile': '/path/to/client-cert.pem',
'ca_certs': '/path/to/ca-cert.pem',
'cert_reqs': ssl.CERT_REQUIRED,
'ssl_version': ssl.PROTOCOL_TLSv1_2,
'ciphers': 'ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS'
}
# Redis SSL configuration
redis_backend_use_ssl = {
'ssl_keyfile': '/path/to/client-key.pem',
'ssl_certfile': '/path/to/client-cert.pem',
'ssl_ca_certs': '/path/to/ca-cert.pem',
'ssl_cert_reqs': ssl.CERT_REQUIRED,
}
# Secure serialization
task_serializer = 'auth_json'
accept_content = ['auth_json']
result_serializer = 'auth_json'
# Security settings
worker_hijack_root_logger = False
task_reject_on_worker_lost = True
# Authentication
broker_url = 'rediss://username:password@redis.example.com:6380/0'
result_backend = 'rediss://username:password@redis.example.com:6380/0'
class MessageSecurity:
"""Handle message encryption and authentication."""
def __init__(self, secret_key=None):
if secret_key is None:
secret_key = os.environ.get('CELERY_SECRET_KEY')
if not secret_key:
raise ValueError("CELERY_SECRET_KEY environment variable required")
self.secret_key = secret_key.encode() if isinstance(secret_key, str) else secret_key
self.fernet = self._create_cipher()
def _create_cipher(self):
"""Create Fernet cipher from secret key."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'celery_salt', # In production, use a random salt
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(self.secret_key))
return Fernet(key)
def encrypt_message(self, data):
"""Encrypt message data."""
try:
json_data = json.dumps(data, sort_keys=True).encode()
encrypted_data = self.fernet.encrypt(json_data)
return base64.urlsafe_b64encode(encrypted_data).decode()
except Exception as e:
raise ValueError(f"Encryption failed: {e}")
def decrypt_message(self, encrypted_data):
"""Decrypt message data."""
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(encrypted_bytes)
return json.loads(decrypted_data.decode())
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def sign_message(self, data):
"""Create HMAC signature for message."""
message = json.dumps(data, sort_keys=True).encode()
signature = hmac.new(
self.secret_key,
message,
hashlib.sha256
).hexdigest()
return signature
def verify_signature(self, data, signature):
"""Verify HMAC signature."""
expected_signature = self.sign_message(data)
return hmac.compare_digest(signature, expected_signature)
# Global security instance
message_security = MessageSecurity()
# Custom serializer with authentication and encryption
def secure_dumps(obj):
"""Serialize with encryption and authentication."""
timestamp = datetime.utcnow().isoformat()
# Add timestamp to prevent replay attacks
message_data = {
'payload': obj,
'timestamp': timestamp,
'version': '1.0'
}
# Create signature
signature = message_security.sign_message(message_data)
# Encrypt the entire message
encrypted_message = message_security.encrypt_message(message_data)
# Final message structure
final_message = {
'encrypted_data': encrypted_message,
'signature': signature,
'algorithm': 'AES-256-GCM',
'auth': 'HMAC-SHA256'
}
return json.dumps(final_message)
def secure_loads(data):
"""Deserialize with decryption and authentication."""
try:
message = json.loads(data)
# Decrypt the message
decrypted_data = message_security.decrypt_message(message['encrypted_data'])
# Verify signature
if not message_security.verify_signature(decrypted_data, message['signature']):
raise ValueError("Message signature verification failed")
# Check timestamp to prevent replay attacks (optional)
timestamp = datetime.fromisoformat(decrypted_data['timestamp'])
age = datetime.utcnow() - timestamp
if age > timedelta(hours=1): # 1 hour timeout
raise ValueError("Message too old, possible replay attack")
return decrypted_data['payload']
except Exception as e:
logging.error(f"Message deserialization failed: {e}")
raise
# Register custom secure serializer
register('auth_json', secure_dumps, secure_loads,
content_type='application/x-auth-json',
content_encoding='utf-8')
# Secure Celery app
app = Celery('secure_app')
app.config_from_object(SecureCeleryConfig)
# Role-based access control
class RoleBasedAccessControl:
"""Implement role-based access control for tasks."""
def __init__(self):
self.roles = {
'admin': {
'permissions': ['*'], # All permissions
'task_patterns': ['*']
},
'operator': {
'permissions': ['execute', 'monitor'],
'task_patterns': ['app.tasks.operations.*', 'app.tasks.monitoring.*']
},
'user': {
'permissions': ['execute'],
'task_patterns': ['app.tasks.user.*']
},
'readonly': {
'permissions': ['monitor'],
'task_patterns': ['app.tasks.monitoring.*']
}
}
self.user_roles = {} # Store user -> role mappings
def assign_role(self, user_id, role):
"""Assign role to user."""
if role not in self.roles:
raise ValueError(f"Unknown role: {role}")
self.user_roles[user_id] = role
def check_permission(self, user_id, task_name, permission='execute'):
"""Check if user has permission to perform action on task."""
user_role = self.user_roles.get(user_id)
if not user_role:
return False
role_config = self.roles[user_role]
# Check permission
if '*' not in role_config['permissions'] and permission not in role_config['permissions']:
return False
# Check task pattern
task_patterns = role_config['task_patterns']
if '*' in task_patterns:
return True
for pattern in task_patterns:
if pattern.endswith('*'):
if task_name.startswith(pattern[:-1]):
return True
elif pattern == task_name:
return True
return False
# Global access control
access_control = RoleBasedAccessControl()
# Secure task decorator
def secure_task(*args, **kwargs):
"""Decorator for secure tasks with access control."""
def decorator(func):
# Get user context from task
def wrapper(self, *task_args, **task_kwargs):
# Extract user context (in real implementation, this would come from authentication)
user_id = task_kwargs.pop('_user_id', 'anonymous')
# Check permissions
if not access_control.check_permission(user_id, self.name, 'execute'):
raise PermissionError(f"User {user_id} not authorized to execute {self.name}")
# Log access
logging.info(f"User {user_id} executing task {self.name}")
try:
return func(self, *task_args, **task_kwargs)
except Exception as e:
logging.error(f"Task {self.name} failed for user {user_id}: {e}")
raise
# Apply Celery task decorator
return app.task(bind=True, *args, **kwargs)(wrapper)
return decorator
# Example secure tasks
@secure_task
def secure_user_task(self, user_data):
"""Secure task for user operations."""
return f"Processing user data: {user_data.get('name', 'Unknown')}"
@secure_task
def secure_admin_task(self, admin_operation):
"""Secure task for admin operations."""
return f"Executing admin operation: {admin_operation}"
@secure_task
def secure_financial_task(self, transaction_data):
"""Secure task for financial operations."""
# Additional security for sensitive operations
if not transaction_data.get('verified'):
raise ValueError("Transaction not verified")
return f"Processing transaction: {transaction_data['amount']}"
# Certificate-based authentication
class CertificateAuth:
"""Certificate-based authentication for workers and clients."""
def __init__(self, ca_cert_path, client_cert_path, client_key_path):
self.ca_cert_path = ca_cert_path
self.client_cert_path = client_cert_path
self.client_key_path = client_key_path
def verify_certificate(self, cert_path):
"""Verify certificate against CA."""
try:
# Load CA certificate
with open(self.ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
# Load client certificate
with open(cert_path, 'rb') as f:
client_cert = x509.load_pem_x509_certificate(f.read())
# Verify certificate chain (simplified)
ca_public_key = ca_cert.public_key()
try:
ca_public_key.verify(
client_cert.signature,
client_cert.tbs_certificate_bytes,
padding.PKCS1v15(),
client_cert.signature_hash_algorithm
)
return True
except Exception:
return False
except Exception as e:
logging.error(f"Certificate verification failed: {e}")
return False
def get_ssl_context(self):
"""Create SSL context for secure connections."""
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.check_hostname = False # For self-signed certificates
context.verify_mode = ssl.CERT_REQUIRED
# Load client certificate
context.load_cert_chain(self.client_cert_path, self.client_key_path)
# Load CA certificate
context.load_verify_locations(self.ca_cert_path)
return context
# Audit logging
class SecurityAuditLogger:
"""Security audit logging system."""
def __init__(self, log_file='security_audit.log'):
self.logger = logging.getLogger('security_audit')
self.logger.setLevel(logging.INFO)
# Create file handler
handler = logging.FileHandler(log_file)
handler.setLevel(logging.INFO)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_task_execution(self, user_id, task_name, status, details=None):
"""Log task execution for audit."""
log_entry = {
'event_type': 'task_execution',
'user_id': user_id,
'task_name': task_name,
'status': status,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
self.logger.info(json.dumps(log_entry))
def log_authentication(self, user_id, method, success, ip_address=None):
"""Log authentication attempts."""
log_entry = {
'event_type': 'authentication',
'user_id': user_id,
'method': method,
'success': success,
'ip_address': ip_address,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(log_entry))
def log_security_violation(self, user_id, violation_type, details):
"""Log security violations."""
log_entry = {
'event_type': 'security_violation',
'user_id': user_id,
'violation_type': violation_type,
'details': details,
'timestamp': datetime.utcnow().isoformat(),
'severity': 'HIGH'
}
self.logger.warning(json.dumps(log_entry))
# Global audit logger
audit_logger = SecurityAuditLogger()
# Security middleware
from celery.signals import before_task_publish, task_prerun, task_postrun
@before_task_publish.connect
def secure_task_publish(sender=None, headers=None, body=None, properties=None, **kwargs):
"""Security check before task publish."""
# Add security headers
if headers is None:
headers = {}
headers['security_version'] = '1.0'
headers['published_at'] = datetime.utcnow().isoformat()
@task_prerun.connect
def secure_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Security check before task execution."""
# Extract user context
user_id = kwargs.get('_user_id', 'system') if kwargs else 'system'
# Log task start
audit_logger.log_task_execution(user_id, task, 'started')
@task_postrun.connect
def secure_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
"""Security logging after task execution."""
# Extract user context
user_id = kwargs.get('_user_id', 'system') if kwargs else 'system'
# Log task completion
audit_logger.log_task_execution(
user_id, task, state,
details={'task_id': task_id, 'return_value_type': type(retval).__name__}
)
# Example usage
def setup_security():
"""Setup security configuration."""
# Setup roles
access_control.assign_role('admin_user', 'admin')
access_control.assign_role('operator_user', 'operator')
access_control.assign_role('regular_user', 'user')
# Log security setup
audit_logger.log_authentication('system', 'setup', True)
print("Security configuration completed")
if __name__ == '__main__':
setup_security()
# Example secure task execution
try:
result = secure_user_task.delay(
{'name': 'John Doe', 'operation': 'profile_update'},
_user_id='regular_user'
)
print(f"Task result: {result.get()}")
except PermissionError as e:
print(f"Permission denied: {e}")PythonThis continues our comprehensive Celery book with advanced performance optimization techniques and comprehensive security implementations. The examples demonstrate real-world production scenarios with proper security measures, authentication, encryption, and audit logging.
Chapter 12: Production Deployment
Production Architecture Design
graph TB
subgraph "Load Balancer Layer"
LB[Load Balancer]
end
subgraph "Application Layer"
A1[App Server 1]
A2[App Server 2]
A3[App Server N]
end
subgraph "Message Infrastructure"
MB[Message Broker Cluster]
RB[Result Backend Cluster]
end
subgraph "Worker Clusters"
WC1[Worker Cluster 1CPU Tasks]
WC2[Worker Cluster 2I/O Tasks]
WC3[Worker Cluster 3Priority Tasks]
end
subgraph "Monitoring & Management"
M1[Flower Monitoring]
M2[Prometheus/Grafana]
M3[Log Aggregation]
M4[Alerting System]
end
subgraph "Storage Layer"
DB[(Primary Database)]
CACHE[(Cache Layer)]
FS[File Storage]
end
LB --> A1
LB --> A2
LB --> A3
A1 --> MB
A2 --> MB
A3 --> MB
MB --> WC1
MB --> WC2
MB --> WC3
WC1 --> RB
WC2 --> RB
WC3 --> RB
WC1 --> DB
WC2 --> CACHE
WC3 --> FS
MB -.-> M1
WC1 -.-> M2
WC2 -.-> M3
WC3 -.-> M4
style LB fill:#e8f5e8
style MB fill:#e3f2fd
style RB fill:#fff3e0
style WC1 fill:#fce4ec
style WC2 fill:#f3e5f5
style WC3 fill:#e1f5feProduction Deployment Strategy
# production_deployment.py
"""Production deployment configuration and management."""
import os
import logging
import signal
import sys
from pathlib import Path
from celery import Celery
from kombu import Queue, Exchange
import redis.sentinel
import psutil
class ProductionConfig:
"""Production-ready Celery configuration."""
# Environment-based configuration
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'production')
# Redis Sentinel for high availability
REDIS_SENTINELS = [
('sentinel1.example.com', 26379),
('sentinel2.example.com', 26379),
('sentinel3.example.com', 26379),
]
REDIS_MASTER_NAME = 'mymaster'
REDIS_PASSWORD = os.environ.get('REDIS_PASSWORD')
# Broker configuration with failover
broker_url = f'sentinel://:{REDIS_PASSWORD}@sentinel1.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel2.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel3.example.com:26379'
broker_transport_options = {
'sentinels': REDIS_SENTINELS,
'password': REDIS_PASSWORD,
'service_name': REDIS_MASTER_NAME,
'socket_keepalive': True,
'socket_keepalive_options': {
'TCP_KEEPIDLE': 1,
'TCP_KEEPINTVL': 3,
'TCP_KEEPCNT': 5,
},
'retry_on_timeout': True,
'health_check_interval': 30,
}
# Result backend with failover
result_backend = f'sentinel://:{REDIS_PASSWORD}@sentinel1.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel2.example.com:26379;sentinel://:{REDIS_PASSWORD}@sentinel3.example.com:26379'
result_backend_transport_options = broker_transport_options
# Connection settings
broker_pool_limit = 20
broker_connection_timeout = 30
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10
# Worker settings
worker_prefetch_multiplier = 1
worker_max_tasks_per_child = 1000
worker_max_memory_per_child = 200000 # 200MB
worker_disable_rate_limits = False
worker_proc_alive_timeout = 60
# Task settings
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
task_compression = 'gzip'
result_compression = 'gzip'
# Reliability settings
task_acks_late = True
task_reject_on_worker_lost = True
task_track_started = True
# Timeout settings
task_time_limit = 3600 # 1 hour
task_soft_time_limit = 3300 # 55 minutes
# Result settings
result_expires = 86400 # 24 hours
result_persistent = True
# Monitoring
worker_send_task_events = True
task_send_sent_event = True
# Security
worker_hijack_root_logger = False
# Queue configuration for different environments
if ENVIRONMENT == 'production':
# Production queues with priority and routing
task_queues = (
Queue('critical', Exchange('critical', type='direct'), routing_key='critical',
queue_arguments={'x-max-priority': 10}),
Queue('high', Exchange('high', type='direct'), routing_key='high',
queue_arguments={'x-max-priority': 8}),
Queue('normal', Exchange('normal', type='direct'), routing_key='normal',
queue_arguments={'x-max-priority': 5}),
Queue('low', Exchange('low', type='direct'), routing_key='low',
queue_arguments={'x-max-priority': 2}),
Queue('email', Exchange('email', type='direct'), routing_key='email'),
Queue('reports', Exchange('reports', type='direct'), routing_key='reports'),
Queue('background', Exchange('background', type='direct'), routing_key='background'),
)
task_routes = {
'app.tasks.critical.*': {'queue': 'critical', 'priority': 9},
'app.tasks.notifications.*': {'queue': 'email'},
'app.tasks.reports.*': {'queue': 'reports'},
'app.tasks.background.*': {'queue': 'background', 'priority': 1},
}
# Logging configuration
worker_log_level = 'INFO'
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
class DeploymentManager:
"""Manage production deployment lifecycle."""
def __init__(self, app_name='celery_app'):
self.app_name = app_name
self.app = Celery(app_name)
self.app.config_from_object(ProductionConfig)
self.logger = self._setup_logging()
def _setup_logging(self):
"""Setup production logging."""
logger = logging.getLogger(self.app_name)
logger.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# File handler
log_file = f'/var/log/{self.app_name}/celery.log'
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
# Formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
def health_check(self):
"""Perform health check of the Celery system."""
try:
# Check broker connectivity
inspect = self.app.control.inspect()
stats = inspect.stats()
if not stats:
return {'status': 'unhealthy', 'reason': 'No workers available'}
# Check worker health
unhealthy_workers = []
for worker, worker_stats in stats.items():
if worker_stats.get('pool', {}).get('max-concurrency', 0) == 0:
unhealthy_workers.append(worker)
if unhealthy_workers:
return {
'status': 'degraded',
'reason': f'Unhealthy workers: {unhealthy_workers}'
}
# Check queue lengths
active_queues = inspect.active_queues()
high_queue_lengths = {}
for worker, queues in active_queues.items():
for queue_info in queues:
# In production, implement actual queue length checking
# This is a simplified version
pass
return {
'status': 'healthy',
'workers': len(stats),
'timestamp': os.time.time()
}
except Exception as e:
self.logger.error(f"Health check failed: {e}")
return {'status': 'unhealthy', 'reason': str(e)}
def graceful_shutdown(self, signum=None, frame=None):
"""Implement graceful shutdown."""
self.logger.info("Initiating graceful shutdown...")
try:
# Stop accepting new tasks
self.app.control.cancel_consumer('*')
# Wait for current tasks to complete
inspect = self.app.control.inspect()
active_tasks = inspect.active()
if active_tasks:
self.logger.info(f"Waiting for {len(active_tasks)} active tasks to complete...")
# Implementation would wait for tasks to finish
# with a timeout
# Shutdown workers
self.app.control.shutdown()
self.logger.info("Graceful shutdown completed")
except Exception as e:
self.logger.error(f"Error during graceful shutdown: {e}")
finally:
sys.exit(0)
def setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown."""
signal.signal(signal.SIGTERM, self.graceful_shutdown)
signal.signal(signal.SIGINT, self.graceful_shutdown)
def start_worker(self, queue_config):
"""Start worker with specific configuration."""
self.setup_signal_handlers()
worker_args = [
'--app', self.app_name,
'--loglevel', queue_config.get('loglevel', 'INFO'),
'--concurrency', str(queue_config.get('concurrency', 4)),
'--pool', queue_config.get('pool', 'prefork'),
]
if queue_config.get('queues'):
worker_args.extend(['--queues', ','.join(queue_config['queues'])])
if queue_config.get('hostname'):
worker_args.extend(['--hostname', queue_config['hostname']])
self.logger.info(f"Starting worker with args: {worker_args}")
# In production, this would use the Celery worker
# self.app.worker_main(worker_args)
class DockerDeployment:
"""Docker-based deployment configuration."""
@staticmethod
def generate_dockerfile():
"""Generate Dockerfile for production deployment."""
dockerfile_content = """
FROM python:3.11-slim
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV CELERY_ENV=production
# Install system dependencies
RUN apt-get update && apt-get install -y \\
build-essential \\
libpq-dev \\
&& rm -rf /var/lib/apt/lists/*
# Create app directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN groupadd -r celery && useradd -r -g celery celery
RUN chown -R celery:celery /app
USER celery
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \\
CMD python -c "from health_check import check_health; exit(0 if check_health() else 1)"
# Default command
CMD ["celery", "worker", "--app=app.celery", "--loglevel=info"]
"""
return dockerfile_content
@staticmethod
def generate_docker_compose():
"""Generate docker-compose.yml for production."""
compose_content = """
version: '3.8'
services:
redis-master:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD}
volumes:
- redis_data:/data
ports:
- "6379:6379"
environment:
- REDIS_PASSWORD=${REDIS_PASSWORD}
healthcheck:
test: ["CMD", "redis-cli", "--raw", "incr", "ping"]
interval: 30s
timeout: 10s
retries: 3
redis-sentinel:
image: redis:7-alpine
command: redis-sentinel /etc/redis/sentinel.conf
volumes:
- ./sentinel.conf:/etc/redis/sentinel.conf
depends_on:
- redis-master
deploy:
replicas: 3
celery-worker-high:
build: .
command: celery worker --app=app.celery --loglevel=info --queues=critical,high --concurrency=4
environment:
- CELERY_ENV=production
- REDIS_PASSWORD=${REDIS_PASSWORD}
depends_on:
- redis-master
volumes:
- ./logs:/var/log/celery
deploy:
replicas: 2
resources:
limits:
memory: 512M
reservations:
memory: 256M
celery-worker-normal:
build: .
command: celery worker --app=app.celery --loglevel=info --queues=normal,email --concurrency=8
environment:
- CELERY_ENV=production
- REDIS_PASSWORD=${REDIS_PASSWORD}
depends_on:
- redis-master
volumes:
- ./logs:/var/log/celery
deploy:
replicas: 3
resources:
limits:
memory: 1G
reservations:
memory: 512M
celery-worker-background:
build: .
command: celery worker --app=app.celery --loglevel=info --queues=background,reports --concurrency=2
environment:
- CELERY_ENV=production
- REDIS_PASSWORD=${REDIS_PASSWORD}
depends_on:
- redis-master
volumes:
- ./logs:/var/log/celery
deploy:
replicas: 1
resources:
limits:
memory: 256M
reservations:
memory: 128M
flower:
build: .
command: celery flower --app=app.celery --port=5555
ports:
- "5555:5555"
environment:
- CELERY_ENV=production
- REDIS_PASSWORD=${REDIS_PASSWORD}
depends_on:
- redis-master
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- grafana_data:/var/lib/grafana
volumes:
redis_data:
prometheus_data:
grafana_data:
"""
return compose_content
class KubernetesDeployment:
"""Kubernetes deployment configuration."""
@staticmethod
def generate_deployment_yaml():
"""Generate Kubernetes deployment YAML."""
deployment_yaml = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker-high
labels:
app: celery-worker
tier: high
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
tier: high
template:
metadata:
labels:
app: celery-worker
tier: high
spec:
containers:
- name: celery-worker
image: myapp/celery:latest
command: ["celery", "worker"]
args:
- "--app=app.celery"
- "--loglevel=info"
- "--queues=critical,high"
- "--concurrency=4"
env:
- name: CELERY_ENV
value: "production"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
exec:
command:
- celery
- inspect
- ping
- -A
- app.celery
initialDelaySeconds: 30
periodSeconds: 60
readinessProbe:
exec:
command:
- celery
- inspect
- active
- -A
- app.celery
initialDelaySeconds: 15
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: celery-worker-service
spec:
selector:
app: celery-worker
ports:
- protocol: TCP
port: 80
targetPort: 5555
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-high
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
"""
return deployment_yaml
# Monitoring and alerting for production
class ProductionMonitoring:
"""Production monitoring and alerting system."""
def __init__(self, deployment_manager):
self.deployment_manager = deployment_manager
self.metrics = {
'task_success_rate': 0.0,
'average_task_duration': 0.0,
'queue_lengths': {},
'worker_count': 0,
'system_resources': {}
}
def collect_metrics(self):
"""Collect system metrics."""
try:
inspect = self.deployment_manager.app.control.inspect()
# Worker statistics
stats = inspect.stats()
self.metrics['worker_count'] = len(stats) if stats else 0
# Queue statistics
active_queues = inspect.active_queues()
if active_queues:
for worker, queues in active_queues.items():
for queue_info in queues:
queue_name = queue_info['name']
# In production, get actual queue lengths from broker
self.metrics['queue_lengths'][queue_name] = 0
# System resources
self.metrics['system_resources'] = {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent
}
return self.metrics
except Exception as e:
self.deployment_manager.logger.error(f"Failed to collect metrics: {e}")
return None
def check_alerts(self):
"""Check for alert conditions."""
alerts = []
# High CPU usage
if self.metrics['system_resources'].get('cpu_percent', 0) > 80:
alerts.append({
'severity': 'warning',
'message': f"High CPU usage: {self.metrics['system_resources']['cpu_percent']:.1f}%"
})
# High memory usage
if self.metrics['system_resources'].get('memory_percent', 0) > 85:
alerts.append({
'severity': 'warning',
'message': f"High memory usage: {self.metrics['system_resources']['memory_percent']:.1f}%"
})
# No workers available
if self.metrics['worker_count'] == 0:
alerts.append({
'severity': 'critical',
'message': "No Celery workers available"
})
# High queue lengths
for queue_name, length in self.metrics['queue_lengths'].items():
if length > 1000: # Threshold
alerts.append({
'severity': 'warning',
'message': f"High queue length for {queue_name}: {length}"
})
return alerts
# Production deployment scripts
def deploy_production():
"""Deploy to production environment."""
print("Deploying Celery application to production...")
# Initialize deployment manager
deployment_manager = DeploymentManager('production_app')
# Perform health check before deployment
health = deployment_manager.health_check()
if health['status'] == 'unhealthy':
print(f"Pre-deployment health check failed: {health['reason']}")
return False
# Deploy workers
worker_configs = [
{
'name': 'critical-worker',
'queues': ['critical'],
'concurrency': 2,
'pool': 'prefork',
'loglevel': 'INFO'
},
{
'name': 'high-worker',
'queues': ['high'],
'concurrency': 4,
'pool': 'prefork',
'loglevel': 'INFO'
},
{
'name': 'normal-worker',
'queues': ['normal', 'email'],
'concurrency': 8,
'pool': 'eventlet',
'loglevel': 'INFO'
},
{
'name': 'background-worker',
'queues': ['background', 'reports'],
'concurrency': 2,
'pool': 'prefork',
'loglevel': 'WARNING'
}
]
for config in worker_configs:
print(f"Starting {config['name']}...")
# In production, this would start actual workers
# deployment_manager.start_worker(config)
# Setup monitoring
monitoring = ProductionMonitoring(deployment_manager)
print("Production deployment completed successfully!")
return True
if __name__ == '__main__':
deploy_production()PythonChapter 13: Real-World Case Studies
Case Study 1: E-commerce Order Processing System
graph TB
subgraph "Order Processing Flow"
A[Customer Order] --> B[Order Validation]
B --> C[Inventory Check]
C --> D[Payment Processing]
D --> E[Order Fulfillment]
E --> F[Shipping Notification]
F --> G[Order Complete]
end
subgraph "Celery Tasks"
H[validate_order_task]
I[check_inventory_task]
J[process_payment_task]
K[fulfill_order_task]
L[send_notification_task]
M[update_analytics_task]
end
subgraph "Error Handling"
N[Payment Failed] --> O[Refund Process]
P[Inventory Shortage] --> Q[Backorder Process]
R[Shipping Error] --> S[Customer Support Alert]
end
B --> H
C --> I
D --> J
E --> K
F --> L
G --> M
J --> N
I --> P
K --> R
style A fill:#e8f5e8
style G fill:#e8f5e8
style N fill:#ffebee
style P fill:#ffebee
style R fill:#ffebeeE-commerce Implementation
# ecommerce_case_study.py
"""
E-commerce order processing system using Celery.
Handles complex workflows with error recovery and monitoring.
"""
from celery import Celery, chain, group, chord, signature
from celery.exceptions import Retry
import time
import random
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import json
# Configuration
app = Celery('ecommerce_orders')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_serializer='json',
accept_content=['json'],
result_serializer='json',
task_routes={
'ecommerce.orders.critical.*': {'queue': 'critical'},
'ecommerce.orders.payment.*': {'queue': 'payment'},
'ecommerce.orders.fulfillment.*': {'queue': 'fulfillment'},
'ecommerce.orders.notifications.*': {'queue': 'notifications'},
}
)
@dataclass
class OrderItem:
product_id: str
quantity: int
price: float
variant_id: Optional[str] = None
@dataclass
class Order:
order_id: str
customer_id: str
items: List[OrderItem]
shipping_address: Dict[str, str]
payment_method: Dict[str, str]
total_amount: float
currency: str = 'USD'
status: str = 'pending'
created_at: datetime = None
class OrderProcessingError(Exception):
"""Custom exception for order processing errors."""
pass
class PaymentError(OrderProcessingError):
"""Payment-specific errors."""
pass
class InventoryError(OrderProcessingError):
"""Inventory-specific errors."""
pass
# Core order processing tasks
@app.task(bind=True, name='ecommerce.orders.critical.validate_order')
def validate_order_task(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate order data and customer information."""
try:
order = Order(**order_data)
# Simulate validation logic
time.sleep(random.uniform(0.1, 0.5))
# Validation checks
if order.total_amount <= 0:
raise OrderProcessingError("Invalid order total")
if not order.customer_id:
raise OrderProcessingError("Missing customer information")
if not order.items:
raise OrderProcessingError("Order has no items")
# Customer verification (simulate external API call)
if random.random() < 0.05: # 5% failure rate
raise OrderProcessingError("Customer verification failed")
# Update order status
order.status = 'validated'
logging.info(f"Order {order.order_id} validated successfully")
return {
'order_id': order.order_id,
'status': order.status,
'validated_at': datetime.now().isoformat(),
'order_data': order_data
}
except Exception as exc:
logging.error(f"Order validation failed for {order_data.get('order_id')}: {exc}")
# Retry logic for temporary failures
if isinstance(exc, OrderProcessingError) and "verification failed" in str(exc):
if self.request.retries < 3:
raise self.retry(countdown=60, exc=exc)
raise
@app.task(bind=True, name='ecommerce.orders.fulfillment.check_inventory')
def check_inventory_task(self, validated_order: Dict[str, Any]) -> Dict[str, Any]:
"""Check inventory availability for all order items."""
try:
order_data = validated_order['order_data']
order = Order(**order_data)
inventory_results = []
total_available = True
for item in order.items:
# Simulate inventory check
time.sleep(random.uniform(0.1, 0.3))
# Simulate occasional inventory shortage
available_quantity = random.randint(0, item.quantity + 5)
is_available = available_quantity >= item.quantity
if not is_available:
total_available = False
inventory_results.append({
'product_id': item.product_id,
'requested_quantity': item.quantity,
'available_quantity': available_quantity,
'is_available': is_available
})
if not total_available:
# Handle partial inventory
shortage_items = [
item for item in inventory_results
if not item['is_available']
]
raise InventoryError(f"Insufficient inventory for items: {shortage_items}")
# Reserve inventory
reservation_id = f"res_{order.order_id}_{int(time.time())}"
logging.info(f"Inventory reserved for order {order.order_id}")
return {
'order_id': order.order_id,
'inventory_status': 'reserved',
'reservation_id': reservation_id,
'inventory_results': inventory_results,
'reserved_at': datetime.now().isoformat(),
'order_data': order_data
}
except InventoryError as exc:
logging.warning(f"Inventory shortage for order {validated_order['order_id']}: {exc}")
# Trigger backorder process
handle_inventory_shortage.delay(validated_order, str(exc))
raise
except Exception as exc:
logging.error(f"Inventory check failed for order {validated_order['order_id']}: {exc}")
if self.request.retries < 2:
raise self.retry(countdown=30, exc=exc)
raise
@app.task(bind=True, name='ecommerce.orders.payment.process_payment')
def process_payment_task(self, inventory_result: Dict[str, Any]) -> Dict[str, Any]:
"""Process payment for the order."""
try:
order_data = inventory_result['order_data']
order = Order(**order_data)
# Simulate payment processing
time.sleep(random.uniform(1, 3))
# Simulate payment gateway interaction
payment_success_rate = 0.95 # 95% success rate
if random.random() > payment_success_rate:
error_types = ['card_declined', 'insufficient_funds', 'expired_card', 'gateway_error']
error_type = random.choice(error_types)
raise PaymentError(f"Payment failed: {error_type}")
# Generate transaction details
transaction_id = f"txn_{order.order_id}_{int(time.time())}"
payment_result = {
'transaction_id': transaction_id,
'amount': order.total_amount,
'currency': order.currency,
'payment_method': order.payment_method.get('type', 'card'),
'status': 'completed',
'processed_at': datetime.now().isoformat()
}
logging.info(f"Payment processed for order {order.order_id}: {transaction_id}")
return {
'order_id': order.order_id,
'payment_status': 'completed',
'payment_result': payment_result,
'reservation_id': inventory_result['reservation_id'],
'order_data': order_data
}
except PaymentError as exc:
logging.error(f"Payment failed for order {inventory_result['order_id']}: {exc}")
# Release inventory reservation
release_inventory_reservation.delay(inventory_result['reservation_id'])
# Handle payment failure
handle_payment_failure.delay(inventory_result, str(exc))
raise
except Exception as exc:
logging.error(f"Payment processing error for order {inventory_result['order_id']}: {exc}")
if self.request.retries < 2:
raise self.retry(countdown=60, exc=exc)
raise
@app.task(bind=True, name='ecommerce.orders.fulfillment.fulfill_order')
def fulfill_order_task(self, payment_result: Dict[str, Any]) -> Dict[str, Any]:
"""Fulfill the order by preparing items for shipment."""
try:
order_data = payment_result['order_data']
order = Order(**order_data)
# Simulate fulfillment process
time.sleep(random.uniform(2, 5))
fulfillment_items = []
for item in order.items:
# Simulate item picking and packing
fulfillment_items.append({
'product_id': item.product_id,
'quantity': item.quantity,
'picked_at': datetime.now().isoformat(),
'packed': True
})
# Generate shipment details
tracking_number = f"TRK{order.order_id}{random.randint(1000, 9999)}"
estimated_delivery = datetime.now() + timedelta(days=random.randint(2, 7))
fulfillment_result = {
'tracking_number': tracking_number,
'estimated_delivery': estimated_delivery.isoformat(),
'fulfillment_items': fulfillment_items,
'fulfilled_at': datetime.now().isoformat(),
'carrier': random.choice(['UPS', 'FedEx', 'DHL', 'USPS'])
}
logging.info(f"Order {order.order_id} fulfilled: {tracking_number}")
return {
'order_id': order.order_id,
'fulfillment_status': 'completed',
'fulfillment_result': fulfillment_result,
'payment_result': payment_result['payment_result'],
'order_data': order_data
}
except Exception as exc:
logging.error(f"Fulfillment failed for order {payment_result['order_id']}: {exc}")
if self.request.retries < 2:
raise self.retry(countdown=120, exc=exc)
raise
@app.task(name='ecommerce.orders.notifications.send_confirmation')
def send_order_confirmation(fulfillment_result: Dict[str, Any]) -> Dict[str, Any]:
"""Send order confirmation and shipping notification."""
try:
order_data = fulfillment_result['order_data']
order = Order(**order_data)
fulfillment = fulfillment_result['fulfillment_result']
# Simulate email sending
time.sleep(random.uniform(0.5, 1.5))
notification_data = {
'order_id': order.order_id,
'customer_id': order.customer_id,
'tracking_number': fulfillment['tracking_number'],
'estimated_delivery': fulfillment['estimated_delivery'],
'total_amount': order.total_amount,
'notification_sent_at': datetime.now().isoformat()
}
logging.info(f"Order confirmation sent for {order.order_id}")
return {
'order_id': order.order_id,
'notification_status': 'sent',
'notification_data': notification_data,
'final_status': 'completed'
}
except Exception as exc:
logging.error(f"Failed to send confirmation for order {fulfillment_result['order_id']}: {exc}")
raise
# Error handling tasks
@app.task(name='ecommerce.orders.fulfillment.handle_inventory_shortage')
def handle_inventory_shortage(order_data: Dict[str, Any], error_message: str):
"""Handle inventory shortage by creating backorder."""
try:
order_id = order_data['order_id']
# Create backorder
backorder_id = f"BO_{order_id}_{int(time.time())}"
# Notify customer about backorder
send_backorder_notification.delay(order_data, backorder_id)
# Schedule inventory recheck
recheck_inventory.apply_async(
args=[order_data, backorder_id],
countdown=3600 # Recheck in 1 hour
)
logging.info(f"Backorder created for {order_id}: {backorder_id}")
return {'backorder_id': backorder_id, 'status': 'backordered'}
except Exception as exc:
logging.error(f"Failed to handle inventory shortage: {exc}")
raise
@app.task(name='ecommerce.orders.payment.handle_payment_failure')
def handle_payment_failure(order_data: Dict[str, Any], error_message: str):
"""Handle payment failure."""
try:
order_id = order_data['order_id']
# Send payment failure notification
send_payment_failure_notification.delay(order_data, error_message)
# Log for fraud analysis
log_payment_failure.delay(order_data, error_message)
logging.warning(f"Payment failure handled for order {order_id}")
return {'status': 'payment_failed', 'error': error_message}
except Exception as exc:
logging.error(f"Failed to handle payment failure: {exc}")
raise
@app.task(name='ecommerce.orders.fulfillment.release_inventory_reservation')
def release_inventory_reservation(reservation_id: str):
"""Release inventory reservation."""
try:
# Simulate inventory release
time.sleep(random.uniform(0.1, 0.3))
logging.info(f"Inventory reservation released: {reservation_id}")
return {'reservation_id': reservation_id, 'status': 'released'}
except Exception as exc:
logging.error(f"Failed to release reservation {reservation_id}: {exc}")
raise
# Notification tasks
@app.task(name='ecommerce.orders.notifications.send_backorder_notification')
def send_backorder_notification(order_data: Dict[str, Any], backorder_id: str):
"""Send backorder notification to customer."""
# Implementation would send actual email
logging.info(f"Backorder notification sent for {order_data['order_id']}")
@app.task(name='ecommerce.orders.notifications.send_payment_failure_notification')
def send_payment_failure_notification(order_data: Dict[str, Any], error_message: str):
"""Send payment failure notification."""
# Implementation would send actual email
logging.info(f"Payment failure notification sent for {order_data['order_id']}")
@app.task(name='ecommerce.orders.analytics.log_payment_failure')
def log_payment_failure(order_data: Dict[str, Any], error_message: str):
"""Log payment failure for analysis."""
# Implementation would log to analytics system
logging.info(f"Payment failure logged for analysis: {order_data['order_id']}")
@app.task(name='ecommerce.orders.fulfillment.recheck_inventory')
def recheck_inventory(order_data: Dict[str, Any], backorder_id: str):
"""Recheck inventory for backordered items."""
try:
# Trigger inventory check again
result = check_inventory_task.delay(order_data)
if result.successful():
# If inventory is now available, continue with order
continue_order_processing.delay(result.get(), backorder_id)
logging.info(f"Inventory rechecked for backorder {backorder_id}")
except Exception as exc:
logging.error(f"Failed to recheck inventory for {backorder_id}: {exc}")
@app.task(name='ecommerce.orders.fulfillment.continue_order_processing')
def continue_order_processing(inventory_result: Dict[str, Any], backorder_id: str):
"""Continue order processing after inventory becomes available."""
# Continue with payment and fulfillment
order_workflow = chain(
process_payment_task.s(inventory_result),
fulfill_order_task.s(),
send_order_confirmation.s()
)
result = order_workflow.apply_async()
logging.info(f"Continued processing for backorder {backorder_id}")
return result
# Main order processing workflow
def create_order_workflow(order_data: Dict[str, Any]):
"""Create the complete order processing workflow."""
return chain(
validate_order_task.s(order_data),
check_inventory_task.s(),
process_payment_task.s(),
fulfill_order_task.s(),
send_order_confirmation.s()
)
# Batch order processing
@app.task(name='ecommerce.orders.batch.process_orders')
def process_order_batch(order_batch: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Process multiple orders efficiently."""
try:
# Create workflows for all orders
workflows = []
for order_data in order_batch:
workflow = create_order_workflow(order_data)
workflows.append(workflow)
# Execute all workflows in parallel
job = group(workflows)
result = job.apply_async()
# Monitor progress
batch_id = f"batch_{int(time.time())}"
logging.info(f"Processing order batch {batch_id} with {len(order_batch)} orders")
return {
'batch_id': batch_id,
'total_orders': len(order_batch),
'job_id': result.id,
'status': 'processing'
}
except Exception as exc:
logging.error(f"Failed to process order batch: {exc}")
raise
# Example usage and testing
def simulate_order_processing():
"""Simulate order processing for testing."""
# Sample order data
sample_order = {
'order_id': f'ORD-{int(time.time())}',
'customer_id': 'CUST-12345',
'items': [
{
'product_id': 'PROD-001',
'quantity': 2,
'price': 29.99
},
{
'product_id': 'PROD-002',
'quantity': 1,
'price': 49.99
}
],
'shipping_address': {
'street': '123 Main St',
'city': 'Anytown',
'state': 'CA',
'zip': '12345',
'country': 'US'
},
'payment_method': {
'type': 'credit_card',
'last_four': '1234'
},
'total_amount': 109.97,
'currency': 'USD',
'created_at': datetime.now().isoformat()
}
# Process single order
print("Processing single order...")
workflow = create_order_workflow(sample_order)
result = workflow.apply_async()
print(f"Order workflow started: {result.id}")
# Monitor workflow progress
try:
final_result = result.get(timeout=300) # 5 minute timeout
print(f"Order processed successfully: {final_result}")
except Exception as e:
print(f"Order processing failed: {e}")
# Process batch of orders
print("\nProcessing order batch...")
order_batch = [sample_order.copy() for _ in range(5)]
# Update order IDs for batch
for i, order in enumerate(order_batch):
order['order_id'] = f'ORD-BATCH-{int(time.time())}-{i}'
batch_result = process_order_batch.delay(order_batch)
print(f"Batch processing started: {batch_result.get()}")
if __name__ == '__main__':
simulate_order_processing()PythonThis case study demonstrates a complete e-commerce order processing system with:
- Complex Workflow Management: Multi-step order processing with proper error handling
- Error Recovery: Sophisticated handling of payment failures and inventory shortages
- Batch Processing: Efficient handling of multiple orders
- Real-world Complexity: Realistic business logic with external service integrations
- Monitoring and Logging: Comprehensive logging for business intelligence
The system handles the complete order lifecycle from validation through fulfillment, with proper error recovery mechanisms and customer notifications. This represents a production-ready implementation that could handle thousands of orders per day.
Case Study 2: Data Pipeline Automation
graph TB
subgraph "Data Sources"
A[API Endpoints]
B[CSV Files]
C[Database Tables]
D[Real-time Streams]
end
subgraph "Extraction Layer"
E[Extract API Data]
F[Extract File Data]
G[Extract DB Data]
H[Extract Stream Data]
end
subgraph "Transformation Layer"
I[Data Validation]
J[Data Cleaning]
K[Data Enrichment]
L[Data Aggregation]
end
subgraph "Loading Layer"
M[Load to Data Warehouse]
N[Load to Analytics DB]
O[Generate Reports]
P[Update Dashboards]
end
A --> E
B --> F
C --> G
D --> H
E --> I
F --> I
G --> I
H --> I
I --> J
J --> K
K --> L
L --> M
L --> N
L --> O
L --> P
style A fill:#e8f5e8
style E fill:#e3f2fd
style I fill:#fff3e0
style M fill:#fce4ecCase Study 3: Image Processing Service
Complete Image Processing Pipeline
# image_processing_service.py
"""
Comprehensive image processing service using Celery.
Handles upload, processing, optimization, and CDN distribution.
"""
from celery import Celery, group, chain, chord
from PIL import Image, ImageFilter, ImageEnhance
import boto3
import os
import logging
from typing import Dict, List, Tuple, Any
import time
import hashlib
import json
app = Celery('image_service')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
task_routes={
'image.upload.*': {'queue': 'upload'},
'image.process.*': {'queue': 'processing'},
'image.optimize.*': {'queue': 'optimization'},
'image.distribute.*': {'queue': 'distribution'},
}
)
# AWS S3 client
s3_client = boto3.client('s3')
class ImageProcessingConfig:
"""Configuration for image processing pipeline."""
UPLOAD_BUCKET = 'uploads-bucket'
PROCESSED_BUCKET = 'processed-images-bucket'
CDN_BUCKET = 'cdn-images-bucket'
# Supported formats
SUPPORTED_FORMATS = ['JPEG', 'PNG', 'WEBP', 'GIF']
# Processing variations
THUMBNAIL_SIZES = [(150, 150), (300, 300), (600, 600)]
RESPONSIVE_WIDTHS = [480, 768, 1024, 1200, 1920]
# Quality settings
JPEG_QUALITY = 85
WEBP_QUALITY = 80
PNG_OPTIMIZE = True
@app.task(bind=True, name='image.upload.validate_upload')
def validate_image_upload(self, upload_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate uploaded image file."""
try:
file_path = upload_data['file_path']
file_size = upload_data['file_size']
content_type = upload_data['content_type']
# Size validation (max 50MB)
if file_size > 50 * 1024 * 1024:
raise ValueError("File size exceeds 50MB limit")
# Content type validation
allowed_types = ['image/jpeg', 'image/png', 'image/webp', 'image/gif']
if content_type not in allowed_types:
raise ValueError(f"Unsupported content type: {content_type}")
# Open and validate image
with Image.open(file_path) as img:
# Check image format
if img.format not in ImageProcessingConfig.SUPPORTED_FORMATS:
raise ValueError(f"Unsupported image format: {img.format}")
# Check dimensions (min 100x100, max 8000x8000)
width, height = img.size
if width < 100 or height < 100:
raise ValueError("Image too small (minimum 100x100)")
if width > 8000 or height > 8000:
raise ValueError("Image too large (maximum 8000x8000)")
# Generate metadata
metadata = {
'original_format': img.format,
'original_size': img.size,
'mode': img.mode,
'has_transparency': img.mode in ['RGBA', 'LA'] or 'transparency' in img.info,
'file_size': file_size
}
# Generate unique image ID
image_id = hashlib.sha256(f"{file_path}{time.time()}".encode()).hexdigest()[:16]
logging.info(f"Image validated: {image_id}")
return {
'image_id': image_id,
'status': 'validated',
'metadata': metadata,
'upload_data': upload_data
}
except Exception as exc:
logging.error(f"Image validation failed: {exc}")
raise
@app.task(bind=True, name='image.upload.upload_to_s3')
def upload_original_to_s3(self, validation_result: Dict[str, Any]) -> Dict[str, Any]:
"""Upload original image to S3."""
try:
image_id = validation_result['image_id']
upload_data = validation_result['upload_data']
file_path = upload_data['file_path']
# Upload to S3
s3_key = f"originals/{image_id}/original.{validation_result['metadata']['original_format'].lower()}"
s3_client.upload_file(
file_path,
ImageProcessingConfig.UPLOAD_BUCKET,
s3_key,
ExtraArgs={
'ContentType': upload_data['content_type'],
'Metadata': {
'image_id': image_id,
'original_filename': upload_data.get('filename', ''),
'upload_timestamp': str(time.time())
}
}
)
# Clean up local file
os.remove(file_path)
logging.info(f"Original image uploaded to S3: {s3_key}")
return {
'image_id': image_id,
'original_s3_key': s3_key,
'bucket': ImageProcessingConfig.UPLOAD_BUCKET,
'metadata': validation_result['metadata'],
'status': 'uploaded'
}
except Exception as exc:
logging.error(f"S3 upload failed for {validation_result['image_id']}: {exc}")
raise
@app.task(bind=True, name='image.process.generate_thumbnails')
def generate_thumbnails(self, upload_result: Dict[str, Any]) -> Dict[str, Any]:
"""Generate thumbnail versions of the image."""
try:
image_id = upload_result['image_id']
s3_key = upload_result['original_s3_key']
bucket = upload_result['bucket']
# Download original from S3
local_path = f"/tmp/{image_id}_original"
s3_client.download_file(bucket, s3_key, local_path)
thumbnail_results = []
with Image.open(local_path) as img:
for size in ImageProcessingConfig.THUMBNAIL_SIZES:
# Create thumbnail
thumbnail = img.copy()
thumbnail.thumbnail(size, Image.Resampling.LANCZOS)
# Save thumbnail
thumb_path = f"/tmp/{image_id}_thumb_{size[0]}x{size[1]}.jpg"
thumbnail.save(
thumb_path,
'JPEG',
quality=ImageProcessingConfig.JPEG_QUALITY,
optimize=True
)
# Upload thumbnail to S3
thumb_s3_key = f"thumbnails/{image_id}/thumb_{size[0]}x{size[1]}.jpg"
s3_client.upload_file(
thumb_path,
ImageProcessingConfig.PROCESSED_BUCKET,
thumb_s3_key,
ExtraArgs={'ContentType': 'image/jpeg'}
)
thumbnail_results.append({
'size': size,
's3_key': thumb_s3_key,
'local_path': thumb_path
})
os.remove(thumb_path)
os.remove(local_path)
logging.info(f"Generated {len(thumbnail_results)} thumbnails for {image_id}")
return {
'image_id': image_id,
'thumbnails': thumbnail_results,
'metadata': upload_result['metadata'],
'status': 'thumbnails_generated'
}
except Exception as exc:
logging.error(f"Thumbnail generation failed for {upload_result['image_id']}: {exc}")
raise
@app.task(bind=True, name='image.process.generate_responsive')
def generate_responsive_images(self, upload_result: Dict[str, Any]) -> Dict[str, Any]:
"""Generate responsive image variations."""
try:
image_id = upload_result['image_id']
s3_key = upload_result['original_s3_key']
bucket = upload_result['bucket']
# Download original from S3
local_path = f"/tmp/{image_id}_original"
s3_client.download_file(bucket, s3_key, local_path)
responsive_results = []
with Image.open(local_path) as img:
original_width, original_height = img.size
for width in ImageProcessingConfig.RESPONSIVE_WIDTHS:
if width >= original_width:
continue # Skip sizes larger than original
# Calculate proportional height
height = int((width * original_height) / original_width)
# Resize image
resized = img.resize((width, height), Image.Resampling.LANCZOS)
# Generate multiple formats
formats = [
('webp', ImageProcessingConfig.WEBP_QUALITY),
('jpg', ImageProcessingConfig.JPEG_QUALITY)
]
for format_name, quality in formats:
if format_name == 'webp':
format_ext = 'webp'
pil_format = 'WEBP'
content_type = 'image/webp'
else:
format_ext = 'jpg'
pil_format = 'JPEG'
content_type = 'image/jpeg'
# Save resized image
resized_path = f"/tmp/{image_id}_responsive_{width}w.{format_ext}"
resized.save(
resized_path,
pil_format,
quality=quality,
optimize=True
)
# Upload to S3
responsive_s3_key = f"responsive/{image_id}/{width}w.{format_ext}"
s3_client.upload_file(
resized_path,
ImageProcessingConfig.PROCESSED_BUCKET,
responsive_s3_key,
ExtraArgs={'ContentType': content_type}
)
responsive_results.append({
'width': width,
'height': height,
'format': format_name,
's3_key': responsive_s3_key,
'file_size': os.path.getsize(resized_path)
})
os.remove(resized_path)
os.remove(local_path)
logging.info(f"Generated {len(responsive_results)} responsive images for {image_id}")
return {
'image_id': image_id,
'responsive_images': responsive_results,
'metadata': upload_result['metadata'],
'status': 'responsive_generated'
}
except Exception as exc:
logging.error(f"Responsive image generation failed for {upload_result['image_id']}: {exc}")
raise
@app.task(bind=True, name='image.optimize.apply_filters')
def apply_image_filters(self, upload_result: Dict[str, Any], filters: List[str]) -> Dict[str, Any]:
"""Apply image filters and enhancements."""
try:
image_id = upload_result['image_id']
s3_key = upload_result['original_s3_key']
bucket = upload_result['bucket']
if not filters:
return {
'image_id': image_id,
'filtered_images': [],
'status': 'no_filters_applied'
}
# Download original from S3
local_path = f"/tmp/{image_id}_original"
s3_client.download_file(bucket, s3_key, local_path)
filtered_results = []
with Image.open(local_path) as img:
for filter_name in filters:
filtered_img = img.copy()
# Apply specific filters
if filter_name == 'blur':
filtered_img = filtered_img.filter(ImageFilter.BLUR)
elif filter_name == 'sharpen':
filtered_img = filtered_img.filter(ImageFilter.SHARPEN)
elif filter_name == 'enhance_contrast':
enhancer = ImageEnhance.Contrast(filtered_img)
filtered_img = enhancer.enhance(1.2)
elif filter_name == 'enhance_brightness':
enhancer = ImageEnhance.Brightness(filtered_img)
filtered_img = enhancer.enhance(1.1)
elif filter_name == 'grayscale':
filtered_img = filtered_img.convert('L').convert('RGB')
# Save filtered image
filtered_path = f"/tmp/{image_id}_filtered_{filter_name}.jpg"
filtered_img.save(
filtered_path,
'JPEG',
quality=ImageProcessingConfig.JPEG_QUALITY,
optimize=True
)
# Upload to S3
filtered_s3_key = f"filtered/{image_id}/{filter_name}.jpg"
s3_client.upload_file(
filtered_path,
ImageProcessingConfig.PROCESSED_BUCKET,
filtered_s3_key,
ExtraArgs={'ContentType': 'image/jpeg'}
)
filtered_results.append({
'filter': filter_name,
's3_key': filtered_s3_key,
'file_size': os.path.getsize(filtered_path)
})
os.remove(filtered_path)
os.remove(local_path)
logging.info(f"Applied {len(filters)} filters to {image_id}")
return {
'image_id': image_id,
'filtered_images': filtered_results,
'status': 'filters_applied'
}
except Exception as exc:
logging.error(f"Filter application failed for {upload_result['image_id']}: {exc}")
raise
@app.task(bind=True, name='image.distribute.update_cdn')
def distribute_to_cdn(self, processing_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Distribute processed images to CDN."""
try:
# Aggregate all processed images
image_id = processing_results[0]['image_id']
all_images = []
for result in processing_results:
if 'thumbnails' in result:
all_images.extend(result['thumbnails'])
if 'responsive_images' in result:
all_images.extend(result['responsive_images'])
if 'filtered_images' in result:
all_images.extend(result['filtered_images'])
cdn_urls = []
for image_info in all_images:
s3_key = image_info['s3_key']
# Copy to CDN bucket
copy_source = {
'Bucket': ImageProcessingConfig.PROCESSED_BUCKET,
'Key': s3_key
}
cdn_key = f"cdn/{s3_key}"
s3_client.copy_object(
CopySource=copy_source,
Bucket=ImageProcessingConfig.CDN_BUCKET,
Key=cdn_key,
ExtraArgs={'ACL': 'public-read'}
)
# Generate CDN URL
cdn_url = f"https://cdn.example.com/{cdn_key}"
cdn_urls.append({
'type': image_info.get('size') or image_info.get('width') or image_info.get('filter'),
'url': cdn_url,
'format': image_info.get('format', 'jpeg')
})
logging.info(f"Distributed {len(cdn_urls)} images to CDN for {image_id}")
return {
'image_id': image_id,
'cdn_urls': cdn_urls,
'status': 'distributed_to_cdn',
'distribution_timestamp': time.time()
}
except Exception as exc:
logging.error(f"CDN distribution failed: {exc}")
raise
@app.task(name='image.process.finalize_processing')
def finalize_image_processing(distribution_result: Dict[str, Any]) -> Dict[str, Any]:
"""Finalize image processing and update database."""
try:
image_id = distribution_result['image_id']
# Update database with processing results
# (In real implementation, update your database)
# Generate image manifest
manifest = {
'image_id': image_id,
'processing_completed_at': time.time(),
'cdn_urls': distribution_result['cdn_urls'],
'status': 'completed'
}
# Store manifest in database or cache
logging.info(f"Image processing completed for {image_id}")
return manifest
except Exception as exc:
logging.error(f"Failed to finalize processing for {distribution_result['image_id']}: {exc}")
raise
# Main image processing workflow
def create_image_processing_workflow(upload_data: Dict[str, Any], filters: List[str] = None):
"""Create complete image processing workflow."""
filters = filters or []
# Validation and upload
validation_chain = chain(
validate_image_upload.s(upload_data),
upload_original_to_s3.s()
)
# Parallel processing
processing_group = group([
generate_thumbnails.s(),
generate_responsive_images.s(),
apply_image_filters.s(filters) if filters else signature('dummy_task', args=[])
])
# Distribution and finalization
distribution_chain = chain(
distribute_to_cdn.s(),
finalize_image_processing.s()
)
# Complete workflow
workflow = chain(
validation_chain,
chord(processing_group, distribution_chain)
)
return workflow
# Batch image processing
@app.task(name='image.batch.process_images')
def process_image_batch(upload_batch: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Process multiple images in batch."""
try:
batch_id = f"batch_{int(time.time())}"
workflows = []
for upload_data in upload_batch:
workflow = create_image_processing_workflow(upload_data)
workflows.append(workflow)
# Execute all workflows in parallel
job = group(workflows)
result = job.apply_async()
logging.info(f"Processing image batch {batch_id} with {len(upload_batch)} images")
return {
'batch_id': batch_id,
'total_images': len(upload_batch),
'job_id': result.id,
'status': 'processing'
}
except Exception as exc:
logging.error(f"Failed to process image batch: {exc}")
raise
# Example usage
def simulate_image_processing():
"""Simulate image processing workflow."""
# Sample upload data
sample_upload = {
'file_path': '/tmp/sample_image.jpg',
'file_size': 2048000, # 2MB
'content_type': 'image/jpeg',
'filename': 'sample_image.jpg',
'uploader_id': 'user123'
}
# Create sample image file for testing
# (In real implementation, this would be an actual uploaded file)
# Single image processing
print("Processing single image...")
filters = ['enhance_contrast', 'sharpen']
workflow = create_image_processing_workflow(sample_upload, filters)
result = workflow.apply_async()
print(f"Image processing workflow started: {result.id}")
# Batch processing
print("Processing image batch...")
batch_uploads = [sample_upload.copy() for _ in range(3)]
batch_result = process_image_batch.delay(batch_uploads)
print(f"Batch processing started: {batch_result.get()}")
if __name__ == '__main__':
simulate_image_processing()PythonAppendix A: Configuration Reference
Complete Configuration Options
# complete_celery_config.py
"""Complete Celery configuration reference with all options."""
class CompleteCeleryConfig:
"""Comprehensive Celery configuration with all available options."""
# ========== BROKER SETTINGS ==========
# Broker URL and transport
broker_url = 'redis://localhost:6379/0'
broker_transport = 'redis' # redis, pyamqp, sqs, etc.
# Connection settings
broker_connection_timeout = 30
broker_connection_retry_on_startup = True
broker_connection_max_retries = 100
broker_failover_strategy = 'round-robin'
broker_heartbeat = 120
broker_heartbeat_checkrate = 2.0
# Pool settings
broker_pool_limit = 10
broker_channel_error_retry = False
# Transport options
broker_transport_options = {
'region': 'us-east-1', # For SQS
'predefined_queues': {},
'socket_keepalive': True,
'socket_keepalive_options': {},
'socket_timeout': 30.0,
'retry_on_timeout': True,
'max_connections': 20,
}
# Login method
broker_login_method = 'AMQPLAIN'
# SSL settings
broker_use_ssl = {
'keyfile': '/path/to/key.pem',
'certfile': '/path/to/cert.pem',
'ca_certs': '/path/to/ca.pem',
'cert_reqs': 'ssl.CERT_REQUIRED',
'ssl_version': 'ssl.PROTOCOL_TLS',
'ciphers': None
}
# ========== TASK SETTINGS ==========
# Serialization
task_serializer = 'json'
task_compression = 'gzip'
task_compression_level = 6
# Task execution
task_always_eager = False
task_eager_propagates = True
task_ignore_result = False
task_store_eager_result = True
# Task routing
task_routes = {}
task_default_queue = 'celery'
task_default_exchange = 'celery'
task_default_exchange_type = 'direct'
task_default_routing_key = 'celery'
# Task annotations
task_annotations = {
'*': {
'rate_limit': '100/s',
'time_limit': 300,
'soft_time_limit': 240,
}
}
# Task inheritance
task_inherit_parent_priority = True
task_default_priority = 5
task_queue_max_priority = 10
# Task tracking
task_track_started = False
task_send_sent_event = False
# Task time limits
task_time_limit = 60 * 60 # 1 hour
task_soft_time_limit = 60 * 55 # 55 minutes
# Task acknowledgment
task_acks_late = False
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
# Task retry settings
task_publish_retry = True
task_publish_retry_policy = {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
# ========== RESULT BACKEND SETTINGS ==========
# Result backend URL
result_backend = 'redis://localhost:6379/0'
# Result serialization
result_serializer = 'json'
result_compression = 'gzip'
result_compression_level = 6
# Result expiration
result_expires = 3600 # 1 hour
result_persistent = True
# Result caching
result_cache_max = 100
result_backend_always_retry = False
result_backend_max_retries = 3
result_backend_retry_delay = 0
# Result backend transport options
result_backend_transport_options = {
'master_name': 'sentinel-master',
'socket_keepalive': True,
'socket_keepalive_options': {},
'socket_timeout': 30.0,
'retry_on_timeout': True,
}
# ========== WORKER SETTINGS ==========
# Worker concurrency
worker_concurrency = 4
worker_prefetch_multiplier = 4
# Worker processes
worker_max_tasks_per_child = 1000
worker_max_memory_per_child = 200000 # 200MB
worker_disable_rate_limits = False
# Worker behavior
worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARNING'
worker_hijack_root_logger = True
worker_log_color = True
# Worker state
worker_state_db = '/var/lib/celery/worker.db'
worker_timer_precision = 1.0
# Worker autoscaling
worker_autoscaler = 'celery.worker.autoscale:Autoscaler'
worker_autoscale_max = 10
worker_autoscale_min = 2
# Worker process management
worker_proc_alive_timeout = 60.0
worker_lost_wait = 10.0
# Worker consumer settings
worker_consumer = 'celery.worker.consumer:Consumer'
worker_direct = False
worker_enable_remote_control = True
# Worker heartbeat
worker_send_task_events = False
# ========== LOGGING SETTINGS ==========
# Worker logging
worker_log_level = 'INFO'
worker_log_format = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
worker_task_log_format = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
# ========== SECURITY SETTINGS ==========
# Content types
accept_content = ['json']
# Security key
security_key = 'your-secret-key'
security_certificate = '/path/to/cert.pem'
security_cert_store = '/path/to/cert-store'
# ========== BEAT SETTINGS ==========
# Beat scheduler
beat_scheduler = 'celery.beat:PersistentScheduler'
beat_schedule_filename = 'celerybeat-schedule'
beat_sync_every = 0
beat_max_loop_interval = 300
# Timezone
timezone = 'UTC'
enable_utc = True
# ========== MONITORING SETTINGS ==========
# Events
worker_send_task_events = True
task_send_sent_event = True
# Monitoring
control_queue_ttl = 300.0
control_queue_expires = 10.0
control_exchange = 'celeryctl'
# ========== DATABASE SETTINGS ==========
# Database result backend
database_url = 'postgresql://user:pass@localhost/celery'
database_engine_options = {
'echo': False,
'pool_recycle': 3600,
'pool_size': 10,
'pool_timeout': 30,
'max_overflow': 20,
}
database_table_names = {
'task': 'celery_taskmeta',
'group': 'celery_tasksetmeta',
}
database_table_schemas = {
'task': 'celery',
'group': 'celery',
}
# ========== CACHE SETTINGS ==========
# Cache backend
cache_backend = 'memcached://127.0.0.1:11211/'
cache_backend_options = {
'binary': True,
'behaviors': {
'tcp_nodelay': True,
'tcp_keepalive': True,
}
}
# ========== ADVANCED SETTINGS ==========
# Include
include = []
# Imports
imports = []
# Late binding
task_create_missing_queues = True
# Queue configuration
task_queues = ()
# Result chord settings
result_chord_retry_delay = 1.0
result_chord_retry_max = 3
# Deprecated settings (for reference)
# CELERY_ALWAYS_EAGER = task_always_eager
# CELERY_EAGER_PROPAGATES_EXCEPTIONS = task_eager_propagates
# BROKER_URL = broker_url
# CELERY_RESULT_BACKEND = result_backendPythonAppendix B: CLI Commands Reference
Complete Celery CLI Reference
# ========== WORKER COMMANDS ==========
# Start worker
celery -A myapp worker --loglevel=info
# Worker with specific queues
celery -A myapp worker --loglevel=info -Q queue1,queue2
# Worker with concurrency
celery -A myapp worker --loglevel=info --concurrency=8
# Worker with specific pool
celery -A myapp worker --pool=eventlet --concurrency=1000
# Worker with hostname
celery -A myapp worker --hostname=worker1@%h
# Worker with autoscaling
celery -A myapp worker --autoscale=10,3
# Worker with time limits
celery -A myapp worker --time-limit=300 --soft-time-limit=240
# Worker with max tasks per child
celery -A myapp worker --max-tasks-per-child=1000
# Worker with max memory per child
celery -A myapp worker --max-memory-per-child=200000
# ========== BEAT COMMANDS ==========
# Start beat scheduler
celery -A myapp beat --loglevel=info
# Beat with custom schedule file
celery -A myapp beat --schedule=/var/run/celerybeat-schedule
# Beat with pidfile
celery -A myapp beat --pidfile=/var/run/celerybeat.pid
# ========== FLOWER COMMANDS ==========
# Start flower monitoring
celery -A myapp flower
# Flower with custom port
celery -A myapp flower --port=5555
# Flower with authentication
celery -A myapp flower --basic_auth=user1:password1,user2:password2
# ========== MONITORING COMMANDS ==========
# List active tasks
celery -A myapp inspect active
# List scheduled tasks
celery -A myapp inspect scheduled
# List reserved tasks
celery -A myapp inspect reserved
# Worker statistics
celery -A myapp inspect stats
# Registered tasks
celery -A myapp inspect registered
# Worker ping
celery -A myapp inspect ping
# Queue lengths (if supported by broker)
celery -A myapp inspect active_queues
# ========== CONTROL COMMANDS ==========
# Enable events
celery -A myapp control enable_events
# Disable events
celery -A myapp control disable_events
# Cancel task
celery -A myapp control revoke task_id
# Cancel task and terminate
celery -A myapp control revoke task_id --terminate
# Shutdown workers
celery -A myapp control shutdown
# Restart worker pool
celery -A myapp control pool_restart
# Add consumer
celery -A myapp control add_consumer queue_name
# Cancel consumer
celery -A myapp control cancel_consumer queue_name
# Set rate limit
celery -A myapp control rate_limit task_name 100/s
# Time limit
celery -A myapp control time_limit task_name 300 240
# ========== UTILITY COMMANDS ==========
# Purge all messages
celery -A myapp purge
# Purge specific queue
celery -A myapp purge -Q queue_name
# List queues
celery -A myapp amqp queue.declare queue_name
# Migration commands
celery -A myapp migrate
# Shell
celery -A myapp shell
# Check configuration
celery -A myapp report
# Events monitoring
celery -A myapp events
# Graph workflow
celery -A myapp graph
# ========== RESULT COMMANDS ==========
# Get task result
celery -A myapp result task_id
# Get task info
celery -A myapp inspect task_info task_id
# ========== MULTI COMMANDS ==========
# Start multiple workers
celery multi start worker1 worker2 -A myapp
# Stop multiple workers
celery multi stop worker1 worker2
# Restart multiple workers
celery multi restart worker1 worker2
# Show worker status
celery multi show worker1 worker2
# Start workers with different configs
celery multi start worker1 worker2 \
-A myapp \
-c 4 \
--loglevel=info \
-Q:worker1 queue1,queue2 \
-Q:worker2 queue3,queue4 \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n.log
# ========== ADVANCED EXAMPLES ==========
# Production worker with full configuration
celery -A myapp worker \
--loglevel=info \
--concurrency=8 \
--pool=prefork \
--queues=high,normal,low \
--hostname=worker1@%h \
--time-limit=3600 \
--soft-time-limit=3300 \
--max-tasks-per-child=1000 \
--max-memory-per-child=200000 \
--pidfile=/var/run/celery/worker1.pid \
--logfile=/var/log/celery/worker1.log \
--autoscale=10,2
# High-performance I/O worker
celery -A myapp worker \
--pool=eventlet \
--concurrency=1000 \
--queues=io_intensive \
--loglevel=warning \
--without-mingle \
--without-gossip
# CPU-intensive worker
celery -A myapp worker \
--pool=prefork \
--concurrency=4 \
--queues=cpu_intensive \
--time-limit=7200 \
--max-tasks-per-child=10
# Development worker with debugging
celery -A myapp worker \
--loglevel=debug \
--pool=solo \
--queues=debug \
--without-heartbeat \
--without-mingle \
--without-gossip
# Monitoring and inspection examples
celery -A myapp inspect active --destination=celery@worker1
celery -A myapp control revoke task_id --destination=celery@worker1
celery -A myapp control rate_limit myapp.tasks.heavy_task 10/mBashAppendix C: Troubleshooting Guide
Common Issues and Solutions
Worker Issues
Problem: Workers not starting
# Check configuration
celery -A myapp report
# Check broker connectivity
celery -A myapp inspect ping
# Start worker with debug logging
celery -A myapp worker --loglevel=debugBashProblem: High memory usage
# Add to configuration
worker_max_memory_per_child = 200000 # 200MB
worker_max_tasks_per_child = 1000BashProblem: Tasks hanging
# Add time limits
task_time_limit = 300
task_soft_time_limit = 240
# Or in task annotation
@app.task(time_limit=300, soft_time_limit=240)
def my_task():
passPythonBroker Issues
Problem: Connection errors
# Increase connection timeout
broker_connection_timeout = 30
broker_connection_retry_on_startup = True
broker_connection_max_retries = 10INIProblem: Redis connection pool exhausted
# Configure Redis pool
redis_max_connections = 20
redis_retry_on_timeout = TrueINIPerformance Issues
Problem: Low throughput
# Optimize prefetch
worker_prefetch_multiplier = 1 # For long tasks
worker_prefetch_multiplier = 10 # For short tasks
# Use appropriate pool
worker_pool = 'eventlet' # For I/O bound
worker_pool = 'prefork' # For CPU boundINIProblem: Queue backlog
# Check queue lengths
celery -A myapp inspect active_queues
# Add more workers
celery multi start worker1 worker2 worker3 -A myapp
# Use autoscaling
celery -A myapp worker --autoscale=10,2BashFinal Summary
This comprehensive guide to Python Celery covers everything from basic concepts to expert-level implementations. The book demonstrates:
- Complete Learning Path: From installation to production deployment
- Real-World Examples: E-commerce order processing, image processing pipelines, and data automation
- Production-Ready Code: Security, monitoring, error handling, and scalability
- Best Practices: Configuration management, performance optimization, and troubleshooting
- Visual Documentation: Mermaid.js diagrams explaining complex architectures and workflows
The examples provided are production-ready and can serve as templates for building robust, scalable distributed systems using Celery. Each chapter builds upon previous concepts while introducing new advanced techniques, making this a complete reference for developers at all skill levels. write
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
