A comprehensive guide to mastering asynchronous programming in Python with practical examples, best practices, and real-world applications.
Table of Contents
- Prerequisites and Setup
- Introduction to Asynchronous Programming
- Getting Started with Asyncio
- Understanding Coroutines
- Event Loop Deep Dive
- Tasks and Futures
- Synchronization Primitives
- Asyncio Streams
- Error Handling and Best Practices
- Performance and Optimization
- Real-World Applications
- Troubleshooting and Debugging
- Asyncio vs Other Frameworks
- Appendix: Quick Reference
1. Prerequisites and Setup
Required Python Knowledge
Before diving into asyncio, you should be comfortable with:
Essential Python Concepts:
- Functions and generators (
yield,yield from) - Exception handling (
try,except,finally) - Context managers (
withstatements) - Decorators (basic understanding)
- Object-oriented programming (classes, methods)
Intermediate Concepts:
- Python’s
importsystem and modules - Basic understanding of threads vs processes
- File I/O operations
- Network programming basics (optional but helpful)
Python Version Requirements
import sys
print(f"Python version: {sys.version}")
# Asyncio is available in Python 3.4+
# For best experience, use Python 3.7+ for asyncio.run()
# Python 3.11+ includes TaskGroup for better error handling
assert sys.version_info >= (3, 7), "Python 3.7+ recommended for this guide"PythonDevelopment Environment Setup
Required Libraries:
# Core asyncio is built-in, but install these for examples
pip install aiohttp aiofiles asyncpg websockets
# For advanced examples
pip install redis psutil beautifulsoup4 jwt
# Development tools
pip install pytest pytest-asyncio black isort mypyBashIDE/Editor Recommendations:
- VS Code: Excellent asyncio debugging support
- PyCharm Professional: Advanced async debugging
- Vim/Neovim: With Python LSP support
Mental Model: What Makes Asyncio Different?
graph LR
subgraph "Traditional Synchronous"
A[Request 1] --> B[Wait for Response]
B --> C[Request 2]
C --> D[Wait for Response]
D --> E[Request 3]
end
subgraph "Asynchronous"
F[Request 1] --> G[Switch to Request 2]
G --> H[Switch to Request 3]
H --> I[Response 1 Ready]
I --> J[Response 2 Ready]
J --> K[Response 3 Ready]
endKey Mental Shifts:
- Cooperative Multitasking: Functions voluntarily yield control
- Event-Driven: React to events (I/O completion, timers)
- Single-Threaded: No thread-safety concerns, but blocking operations halt everything
- Explicit Concurrency: You control when and how concurrency happens
2. Introduction to Asynchronous Programming
What is Asynchronous Programming?
Asynchronous programming is a paradigm that allows code to run concurrently without blocking the main thread. Unlike traditional synchronous programming where operations execute sequentially, async programming enables multiple operations to run “simultaneously.”
Key Concept: Concurrency vs Parallelism
- Concurrency: Dealing with multiple tasks at once (not necessarily simultaneously)
- Parallelism: Actually executing multiple tasks simultaneously (requires multiple cores)
- Asyncio: Provides concurrency in a single thread through cooperative multitasking
graph TD
A[Synchronous Programming] --> B[Task 1: 3s]
B --> C[Task 2: 2s]
C --> D[Task 3: 1s]
D --> E[Total: 6s]
F[Asynchronous Programming] --> G[Task 1: 3s]
F --> H[Task 2: 2s]
F --> I[Task 3: 1s]
G --> J[Total: ~3s]
H --> J
I --> J
style E fill:#ffcccc
style J fill:#ccffccUnderstanding Blocking vs Non-Blocking Operations
Blocking Operations (Bad for asyncio):
import time
import requests
# These BLOCK the entire event loop!
time.sleep(1) # ❌ Blocks everything
requests.get('http://...') # ❌ Blocks everything
open('file.txt').read() # ❌ Blocks everythingPythonNon-Blocking Operations (Good for asyncio):
import asyncio
import aiohttp
import aiofiles
# These yield control back to the event loop
await asyncio.sleep(1) # ✅ Cooperative
async with aiohttp.ClientSession() as session:
await session.get('http://...') # ✅ Cooperative
async with aiofiles.open('file.txt') as f:
await f.read() # ✅ CooperativePythonWhy Use Asyncio?
Perfect For (I/O Bound Tasks):
- 🌐 Network requests: APIs, web scraping, downloads
- 📁 File operations: Reading/writing large files
- 🗄️ Database queries: Multiple database calls
- 🔗 WebSocket connections: Real-time communication
- 🚀 Microservices: Handling many concurrent requests
Not Ideal For (CPU Bound Tasks):
- 🧮 Mathematical computations
- 🖼️ Image/video processing
- 🔐 Cryptographic operations
- 📊 Data analysis (use multiprocessing instead)
Common Misconceptions
❌ “Asyncio makes everything faster”
- Only improves I/O bound operations
- Can be slower for CPU-bound tasks due to overhead
❌ “Asyncio is multithreading”
- Asyncio runs in a single thread
- Uses cooperative multitasking, not preemptive
❌ “Just add async/await everywhere”
- Only use async when you need to wait for I/O
- Overhead exists for function calls
Synchronous vs Asynchronous Comparison
Example: Fetching Multiple URLs
# Synchronous approach - Operations run one after another
import time
import requests
def fetch_data_sync(urls):
"""Fetch URLs one by one (blocking)"""
results = []
for url in urls:
print(f"⏳ Fetching {url}")
response = requests.get(url) # This blocks!
results.append(response.status_code)
print(f"✅ Got {response.status_code} from {url}")
return results
# Takes ~3 seconds for 3 URLs (1 second each)
start = time.time()
urls = ['http://httpbin.org/delay/1'] * 3
results = fetch_data_sync(urls)
print(f"🐌 Sync took: {time.time() - start:.2f} seconds")Python# Asynchronous approach - Operations run concurrently
import asyncio
import aiohttp
import time
async def fetch_data_async(urls):
"""Fetch URLs concurrently (non-blocking)"""
async with aiohttp.ClientSession() as session:
async def fetch_one(url):
print(f"⏳ Starting {url}")
async with session.get(url) as response:
print(f"✅ Got {response.status} from {url}")
return response.status
# Create tasks for concurrent execution
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
# Takes ~1 second for 3 URLs (all run concurrently)
async def main():
start = time.time()
urls = ['http://httpbin.org/delay/1'] * 3
results = await fetch_data_async(urls)
print(f"🚀 Async took: {time.time() - start:.2f} seconds")
# Run the async function
# asyncio.run(main())PythonPerformance Comparison:
- Synchronous: 3 URLs × 1 second = ~3 seconds
- Asynchronous: 3 URLs concurrently = ~1 second
- Speedup: ~3x faster for I/O bound operations!
def fetch_data_sync(urls):
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
return results
# Takes ~3 seconds for 3 URLs
start = time.time()
urls = ['http://httpbin.org/delay/1'] * 3
results = fetch_data_sync(urls)
print(f"Sync took: {time.time() - start:.2f} seconds")
# Asynchronous approach
import asyncio
import aiohttp
async def fetch_data_async(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(fetch_url(session, url))
results = await asyncio.gather(*tasks)
return results
async def fetch_url(session, url):
async with session.get(url) as response:
return response.status
# Takes ~1 second for 3 URLs
async def main():
start = time.time()
urls = ['http://httpbin.org/delay/1'] * 3
results = await fetch_data_async(urls)
print(f"Async took: {time.time() - start:.2f} seconds")
# asyncio.run(main())Python3. Getting Started with Asyncio
Installation and Setup
Asyncio is part of Python’s standard library (Python 3.4+). No additional installation required:
import asyncio
import sys
print(f"Python version: {sys.version}")
print(f"Asyncio version: {asyncio.__version__ if hasattr(asyncio, '__version__') else 'Built-in'}")PythonYour First Async Program
import asyncio
async def hello_world():
"""
A simple async function (coroutine) that demonstrates:
1. async def - declares a coroutine function
2. await - pauses execution and yields control to event loop
"""
print("Hello")
await asyncio.sleep(1) # Non-blocking sleep - other tasks can run
print("World")
# Method 1: Python 3.7+ (recommended)
asyncio.run(hello_world())
# Method 2: Python 3.6 and earlier
# loop = asyncio.get_event_loop()
# loop.run_until_complete(hello_world())PythonKey Asyncio Components
graph TB
A[async def function] --> B[Coroutine Object]
B --> C[Event Loop]
C --> D[Task]
D --> E[Future]
E --> F[Result]
G[await keyword] --> C
H[asyncio.run] --> C
I[asyncio.create_task] --> DBasic Async Patterns
import asyncio
import time
# Pattern 1: Simple async function
async def simple_async():
"""Basic async function that waits and returns a value"""
await asyncio.sleep(1) # Simulates I/O operation
return "Done"
# Pattern 2: Running multiple coroutines sequentially
async def sequential_execution():
"""Runs coroutines one after another (not concurrent)"""
print("Sequential execution started")
start_time = time.time()
result1 = await simple_async() # Wait for first to complete
result2 = await simple_async() # Then start second
result3 = await simple_async() # Then start third
end_time = time.time()
print(f"Sequential took: {end_time - start_time:.2f} seconds")
return [result1, result2, result3]
# Pattern 3: Running multiple coroutines concurrently
async def concurrent_execution():
"""Runs coroutines concurrently using asyncio.gather"""
print("Concurrent execution started")
start_time = time.time()
# Start all coroutines at once
results = await asyncio.gather(
simple_async(), # These three run concurrently
simple_async(),
simple_async()
)
end_time = time.time()
print(f"Concurrent took: {end_time - start_time:.2f} seconds")
return results
# Pattern 4: Using tasks for better control
async def task_execution():
"""Using tasks gives you more control over execution"""
print("Task execution started")
start_time = time.time()
# Create tasks (starts execution immediately)
task1 = asyncio.create_task(simple_async())
task2 = asyncio.create_task(simple_async())
task3 = asyncio.create_task(simple_async())
# Tasks are already running, just wait for results
result1 = await task1
result2 = await task2
result3 = await task3
end_time = time.time()
print(f"Task execution took: {end_time - start_time:.2f} seconds")
return [result1, result2, result3]
# Pattern 5: Error handling in async code
async def safe_async():
"""Proper error handling in async functions"""
try:
# This might fail
result = await simple_async()
return result
except asyncio.TimeoutError:
print("Operation timed out")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
finally:
print("Cleanup code runs regardless of success/failure")
# Running the examples
async def main():
"""Main function to demonstrate different patterns"""
print("=== Asyncio Patterns Demo ===\n")
# Sequential vs Concurrent comparison
await sequential_execution() # ~3 seconds
print()
await concurrent_execution() # ~1 second
print()
await task_execution() # ~1 second
print()
# Error handling
result = await safe_async()
print(f"Safe async result: {result}")
# Run the demo
if __name__ == "__main__":
asyncio.run(main())PythonWhen to Use async/await
# ✅ GOOD: I/O operations that can benefit from concurrency
async def good_async_usage():
# Network requests
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
data = await response.json()
# File operations
async with aiofiles.open('file.txt', 'r') as f:
content = await f.read()
# Database operations
async with asyncpg.connect('postgresql://...') as conn:
result = await conn.fetch('SELECT * FROM users')
return data, content, result
# ❌ BAD: CPU-intensive operations (no I/O to wait for)
async def bad_async_usage():
# This doesn't benefit from async - just adds overhead
total = 0
for i in range(1000000):
total += i * i # Pure computation
return total
# ✅ BETTER: For CPU-intensive work, use regular functions
def cpu_intensive_work():
total = 0
for i in range(1000000):
total += i * i
return total
# Or use asyncio.run_in_executor for CPU work in async context
async def async_with_cpu_work():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, cpu_intensive_work)
return resultPythonHands-On Exercises
Exercise 1: Basic Async Function
# TODO: Create an async function that simulates downloading a file
# It should take a filename and size, print progress, and return success message
async def download_file(filename: str, size_mb: int):
"""
Exercise: Implement a function that simulates file download
Requirements:
1. Print "Starting download of {filename} ({size_mb}MB)"
2. Use asyncio.sleep() to simulate download time (0.1s per MB)
3. Print progress every 20% completion
4. Return "Downloaded {filename} successfully"
"""
# Your code here
pass
# Test your function
# asyncio.run(download_file("video.mp4", 100))PythonExercise 2: Concurrent Downloads
# TODO: Download multiple files concurrently and show total time saved
async def download_multiple_files():
"""
Exercise: Download multiple files concurrently
Requirements:
1. Create a list of files to download: [("file1.zip", 50), ("file2.mp4", 30), ("file3.pdf", 20)]
2. Download them concurrently using asyncio.gather()
3. Measure and print total time taken
4. Compare with sequential download time
"""
# Your code here
pass
# asyncio.run(download_multiple_files())PythonExercise 3: Error Handling
# TODO: Implement robust error handling for network operations
async def fetch_with_retry(url: str, max_retries: int = 3):
"""
Exercise: Implement retry logic for failed requests
Requirements:
1. Try to fetch the URL
2. If it fails, retry up to max_retries times
3. Use exponential backoff (1s, 2s, 4s delays)
4. Return result or raise exception after all retries fail
"""
# Your code here
passPython4. Understanding Coroutines
What are Coroutines?
Coroutines are special functions that can be paused and resumed. They’re defined with async def and called with await.
stateDiagram-v2
[*] --> Created: async def func()
Created --> Running: await func()
Running --> Suspended: await other_coro()
Suspended --> Running: other_coro() completes
Running --> Completed: return value
Completed --> [*]Coroutine Lifecycle
async def lifecycle_demo():
print("1. Coroutine started")
# Suspension point
await asyncio.sleep(1)
print("2. Resumed after sleep")
# Another suspension
result = await another_coro()
print(f"3. Got result: {result}")
return "Final result"
async def another_coro():
await asyncio.sleep(0.5)
return "Hello from another coroutine"
# Running the demo
async def main():
result = await lifecycle_demo()
print(f"Final: {result}")
asyncio.run(main())PythonCoroutine Composition
# Building complex operations from simple coroutines
async def fetch_user(user_id):
await asyncio.sleep(0.1) # Simulate API call
return {"id": user_id, "name": f"User{user_id}"}
async def fetch_posts(user_id):
await asyncio.sleep(0.2) # Simulate API call
return [f"Post {i} by User{user_id}" for i in range(3)]
async def fetch_user_profile(user_id):
# Fetch user and posts concurrently
user, posts = await asyncio.gather(
fetch_user(user_id),
fetch_posts(user_id)
)
return {
"user": user,
"posts": posts
}
# Usage
async def main():
profile = await fetch_user_profile(123)
print(profile)Python5. Event Loop Deep Dive
Understanding the Event Loop
The event loop is the heart of asyncio. It schedules and executes coroutines, handles I/O events, and manages callbacks.
graph TD
A[Event Loop] --> B[Ready Queue]
A --> C[I/O Selector]
A --> D[Timer Heap]
B --> E[Execute Task]
E --> F{Task Complete?}
F -->|No| G[Suspend Task]
F -->|Yes| H[Remove Task]
G --> I[Schedule Resume]
I --> B
C --> J[I/O Ready]
J --> K[Resume Waiting Task]
K --> B
D --> L[Timer Expired]
L --> M[Resume Delayed Task]
M --> BWorking with Event Loops
import asyncio
# Getting the current event loop
async def loop_info():
loop = asyncio.get_running_loop()
print(f"Loop: {loop}")
print(f"Running: {loop.is_running()}")
print(f"Closed: {loop.is_closed()}")
# Creating and managing loops manually
def manual_loop_management():
# Create a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Run coroutines
loop.run_until_complete(loop_info())
finally:
loop.close()
# Scheduling callbacks
async def callback_example():
loop = asyncio.get_running_loop()
def callback():
print("Callback executed!")
# Schedule callback
loop.call_soon(callback)
loop.call_later(1.0, callback)
await asyncio.sleep(2)PythonEvent Loop Policies
# Custom event loop policy
class CustomEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
loop = super().new_event_loop()
# Customize loop behavior
loop.set_debug(True)
return loop
# Setting custom policy
asyncio.set_event_loop_policy(CustomEventLoopPolicy())Python6. Tasks and Futures
Understanding Tasks
Tasks are wrappers around coroutines that allow them to be scheduled and executed by the event loop.
classDiagram
class Future {
+result()
+exception()
+cancelled()
+done()
+add_done_callback()
}
class Task {
+cancel()
+get_coro()
+get_name()
+set_name()
}
Future <|-- TaskCreating and Managing Tasks
import asyncio
async def background_task(name, duration):
print(f"Task {name} started")
await asyncio.sleep(duration)
print(f"Task {name} completed")
return f"Result from {name}"
async def task_management():
# Creating tasks
task1 = asyncio.create_task(background_task("A", 2))
task2 = asyncio.create_task(background_task("B", 1))
task3 = asyncio.create_task(background_task("C", 3))
# Setting task names
task1.set_name("Long Task")
task2.set_name("Quick Task")
task3.set_name("Slow Task")
# Wait for all tasks
results = await asyncio.gather(task1, task2, task3)
print(f"Results: {results}")
# Task cancellation
async def cancellation_example():
task = asyncio.create_task(background_task("Cancelled", 5))
# Cancel after 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")PythonWorking with Futures
import asyncio
async def future_example():
loop = asyncio.get_running_loop()
# Creating a future
future = loop.create_future()
def set_result():
if not future.done():
future.set_result("Future result!")
# Schedule result setting
loop.call_later(2, set_result)
# Wait for future
result = await future
print(f"Got: {result}")
# Future with exception
async def future_exception_example():
loop = asyncio.get_running_loop()
future = loop.create_future()
def set_exception():
future.set_exception(ValueError("Something went wrong!"))
loop.call_later(1, set_exception)
try:
await future
except ValueError as e:
print(f"Caught exception: {e}")PythonTask Groups (Python 3.11+)
import asyncio
async def task_group_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(background_task("Group A", 1))
task2 = tg.create_task(background_task("Group B", 2))
task3 = tg.create_task(background_task("Group C", 1.5))
print("All tasks in group completed")
print(f"Results: {task1.result()}, {task2.result()}, {task3.result()}")Python7. Synchronization Primitives
Why Synchronization in Async Code?
Even though asyncio is single-threaded, we still need synchronization when sharing resources between coroutines.
sequenceDiagram
participant C1 as Coroutine 1
participant R as Shared Resource
participant C2 as Coroutine 2
C1->>R: Acquire Lock
C2->>R: Try Acquire Lock
R-->>C2: Blocked
C1->>R: Modify Resource
C1->>R: Release Lock
R-->>C2: Lock Acquired
C2->>R: Modify Resource
C2->>R: Release LockLocks
import asyncio
class AsyncCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
# Critical section
current = self._value
await asyncio.sleep(0.1) # Simulate work
self._value = current + 1
@property
def value(self):
return self._value
async def worker(counter, name):
for i in range(5):
await counter.increment()
print(f"Worker {name}: {counter.value}")
async def lock_example():
counter = AsyncCounter()
# Run workers concurrently
await asyncio.gather(
worker(counter, "A"),
worker(counter, "B"),
worker(counter, "C")
)
print(f"Final value: {counter.value}")PythonSemaphores
import asyncio
async def download_file(semaphore, url, session):
async with semaphore:
print(f"Starting download: {url}")
await asyncio.sleep(2) # Simulate download
print(f"Completed download: {url}")
return f"Content from {url}"
async def semaphore_example():
# Limit to 3 concurrent downloads
semaphore = asyncio.Semaphore(3)
urls = [f"http://example.com/file{i}.zip" for i in range(10)]
tasks = []
for url in urls:
task = asyncio.create_task(
download_file(semaphore, url, None)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"Downloaded {len(results)} files")PythonEvents
import asyncio
async def waiter(event, name):
print(f"Waiter {name} waiting for event")
await event.wait()
print(f"Waiter {name} got event!")
async def setter(event):
await asyncio.sleep(2)
print("Setting event")
event.set()
async def event_example():
event = asyncio.Event()
# Start waiters
waiters = [
asyncio.create_task(waiter(event, f"W{i}"))
for i in range(3)
]
# Start setter
setter_task = asyncio.create_task(setter(event))
# Wait for all
await asyncio.gather(*waiters, setter_task)PythonConditions
import asyncio
async def condition_example():
condition = asyncio.Condition()
items = []
async def consumer(name):
async with condition:
while len(items) == 0:
print(f"Consumer {name} waiting for items")
await condition.wait()
item = items.pop(0)
print(f"Consumer {name} got item: {item}")
async def producer():
for i in range(5):
async with condition:
item = f"item_{i}"
items.append(item)
print(f"Produced: {item}")
condition.notify_all()
await asyncio.sleep(1)
# Run producer and consumers
await asyncio.gather(
producer(),
consumer("A"),
consumer("B")
)Python8. Asyncio Streams {#streams}
Understanding Streams
Asyncio streams provide a high-level interface for network programming.
graph LR
A[Client] --> B[StreamWriter]
B --> C[Network]
C --> D[StreamReader]
D --> E[Server]
E --> F[StreamWriter]
F --> C
C --> G[StreamReader]
G --> ATCP Server and Client
import asyncio
# TCP Server
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Client connected: {addr}")
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"Received: {message}")
# Echo back
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
except asyncio.CancelledError:
pass
finally:
print(f"Client disconnected: {addr}")
writer.close()
await writer.wait_closed()
async def start_server():
server = await asyncio.start_server(
handle_client, 'localhost', 8888
)
addr = server.sockets[0].getsockname()
print(f"Server started on {addr}")
async with server:
await server.serve_forever()
# TCP Client
async def tcp_client():
reader, writer = await asyncio.open_connection(
'localhost', 8888
)
try:
# Send messages
messages = ["Hello", "World", "From", "Client"]
for msg in messages:
writer.write(msg.encode())
await writer.drain()
# Read response
data = await reader.read(1024)
print(f"Server response: {data.decode()}")
await asyncio.sleep(1)
finally:
writer.close()
await writer.wait_closed()PythonHTTP Client with aiohttp
import aiohttp
import asyncio
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'content': await response.text()
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def http_client_example():
urls = [
'http://httpbin.org/json',
'http://httpbin.org/user-agent',
'http://httpbin.org/headers'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
if 'error' in result:
print(f"Error fetching {result['url']}: {result['error']}")
else:
print(f"Success: {result['url']} - Status: {result['status']}")PythonThis completes the major sections of the comprehensive asyncio book. Would you like me to continue with the remaining sections (Error Handling, Performance, and Real-World Applications)?
I’ll continue with the remaining sections of the comprehensive Python asyncio book, starting with Error Handling and Best Practices:
9. Error Handling and Best Practices
Exception Propagation in Async Code
Understanding how exceptions flow through async code is crucial for robust applications.
graph TD
A[Coroutine A] --> B[await Coroutine B]
B --> C[Exception Raised]
C --> D[Exception Propagates]
D --> E[Caught in A or Propagates Further]
F[Task Creation] --> G[Background Task]
G --> H[Exception in Task]
H --> I[Exception Stored in Task]
I --> J[Retrieved when awaited]Basic Exception Handling
import asyncio
import aiohttp
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def risky_operation():
"""Simulates an operation that might fail"""
await asyncio.sleep(1)
if random.random() < 0.5:
raise ValueError("Random failure occurred!")
return "Success!"
async def safe_wrapper():
"""Proper exception handling"""
try:
result = await risky_operation()
logger.info(f"Operation succeeded: {result}")
return result
except ValueError as e:
logger.error(f"Operation failed: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise # Re-raise unexpected exceptions
# Multiple exception types
async def http_request_with_retry(url, max_retries=3):
"""HTTP request with retry logic and specific exception handling"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.text()
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except aiohttp.ClientTimeout:
logger.warning(f"Timeout on attempt {attempt + 1}")
except aiohttp.ClientConnectionError:
logger.warning(f"Connection error on attempt {attempt + 1}")
except aiohttp.ClientResponseError as e:
logger.warning(f"HTTP {e.status} on attempt {attempt + 1}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise Exception(f"Failed after {max_retries} attempts")PythonException Handling in Concurrent Operations
import asyncio
async def task_with_exception(task_id, should_fail=False):
"""Task that may raise an exception"""
await asyncio.sleep(1)
if should_fail:
raise ValueError(f"Task {task_id} failed!")
return f"Task {task_id} completed"
# Using asyncio.gather with exception handling
async def gather_with_exceptions():
"""Handle exceptions in asyncio.gather"""
tasks = [
task_with_exception(1, False),
task_with_exception(2, True), # This will fail
task_with_exception(3, False),
]
try:
results = await asyncio.gather(*tasks)
print(f"All results: {results}")
except ValueError as e:
print(f"One task failed: {e}")
# Alternative: Use return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i+1} failed: {result}")
else:
print(f"Task {i+1} succeeded: {result}")
# Using asyncio.as_completed for individual exception handling
async def handle_tasks_individually():
"""Handle each task's exceptions separately"""
tasks = [
asyncio.create_task(task_with_exception(i, i == 2))
for i in range(1, 4)
]
for task in asyncio.as_completed(tasks):
try:
result = await task
print(f"Completed: {result}")
except ValueError as e:
print(f"Task failed: {e}")
# Exception handling with TaskGroup (Python 3.11+)
async def task_group_exception_handling():
"""Using TaskGroup for automatic exception propagation"""
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(task_with_exception(1, False))
task2 = tg.create_task(task_with_exception(2, True)) # Will fail
task3 = tg.create_task(task_with_exception(3, False))
except* ValueError as eg:
print(f"Some tasks failed: {eg.exceptions}")
# Handle multiple exceptions
for exc in eg.exceptions:
print(f"Exception: {exc}")PythonCustom Exception Classes
class AsyncIOError(Exception):
"""Base exception for asyncio-related errors"""
pass
class ConnectionPoolExhausted(AsyncIOError):
"""Raised when connection pool is exhausted"""
pass
class RateLimitExceeded(AsyncIOError):
"""Raised when rate limit is exceeded"""
pass
class AsyncResourceManager:
"""Example of custom exception usage"""
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.active_connections = 0
self._semaphore = asyncio.Semaphore(max_connections)
async def acquire_connection(self):
try:
await asyncio.wait_for(self._semaphore.acquire(), timeout=5.0)
self.active_connections += 1
return f"Connection-{self.active_connections}"
except asyncio.TimeoutError:
raise ConnectionPoolExhausted(
f"Could not acquire connection within timeout. "
f"Active: {self.active_connections}/{self.max_connections}"
)
async def release_connection(self, connection_id):
self.active_connections -= 1
self._semaphore.release()
print(f"Released {connection_id}")
async def use_resource_manager():
"""Example usage with custom exceptions"""
manager = AsyncResourceManager(max_connections=2)
try:
# Acquire multiple connections
conn1 = await manager.acquire_connection()
conn2 = await manager.acquire_connection()
# This should timeout and raise ConnectionPoolExhausted
conn3 = await manager.acquire_connection()
except ConnectionPoolExhausted as e:
print(f"Resource exhausted: {e}")
finally:
# Cleanup
if 'conn1' in locals():
await manager.release_connection(conn1)
if 'conn2' in locals():
await manager.release_connection(conn2)PythonBest Practices for Asyncio
import asyncio
import contextlib
import functools
import signal
import sys
from typing import Optional, List, Any
# 1. Proper resource management
class AsyncContextManager:
"""Example of proper async context manager"""
async def __aenter__(self):
print("Acquiring async resource")
await asyncio.sleep(0.1) # Simulate setup
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing async resource")
await asyncio.sleep(0.1) # Simulate cleanup
if exc_type:
print(f"Exception occurred: {exc_val}")
return False # Don't suppress exceptions
# 2. Graceful shutdown handling
class GracefulShutdown:
"""Handle graceful shutdown of async applications"""
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: List[asyncio.Task] = []
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
print(f"\nReceived signal {signum}, initiating graceful shutdown...")
self.shutdown_event.set()
async def worker(self, worker_id: int):
"""Example worker that responds to shutdown"""
try:
while not self.shutdown_event.is_set():
print(f"Worker {worker_id} doing work...")
await asyncio.sleep(2)
except asyncio.CancelledError:
print(f"Worker {worker_id} was cancelled")
raise
finally:
print(f"Worker {worker_id} cleaning up...")
async def start_workers(self, num_workers: int = 3):
"""Start workers and handle shutdown"""
# Register signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
# Start workers
self.tasks = [
asyncio.create_task(self.worker(i))
for i in range(num_workers)
]
try:
# Wait for shutdown signal
await self.shutdown_event.wait()
# Cancel all tasks
print("Cancelling all tasks...")
for task in self.tasks:
task.cancel()
# Wait for tasks to complete
await asyncio.gather(*self.tasks, return_exceptions=True)
except KeyboardInterrupt:
print("Forced shutdown")
# 3. Async function decorators
def async_retry(max_retries: int = 3, delay: float = 1.0):
"""Decorator for automatic retry with exponential backoff"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
await asyncio.sleep(wait_time)
else:
print(f"All {max_retries} attempts failed")
raise last_exception
return wrapper
return decorator
def async_timeout(seconds: float):
"""Decorator to add timeout to async functions"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Function {func.__name__} timed out after {seconds}s")
return wrapper
return decorator
# Example usage of decorators
@async_retry(max_retries=3, delay=0.5)
@async_timeout(10.0)
async def unreliable_api_call(url: str):
"""Simulated API call that might fail"""
await asyncio.sleep(1)
if random.random() < 0.7: # 70% chance of failure
raise aiohttp.ClientError("API temporarily unavailable")
return {"data": "success"}
# 4. Performance monitoring
class AsyncPerformanceMonitor:
"""Monitor async function performance"""
def __init__(self):
self.stats = {}
def monitor(self, func_name: Optional[str] = None):
"""Decorator to monitor async function performance"""
def decorator(func):
name = func_name or func.__name__
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = asyncio.get_event_loop().time()
try:
result = await func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
duration = asyncio.get_event_loop().time() - start_time
self.record_stats(name, duration, success)
return wrapper
return decorator
def record_stats(self, func_name: str, duration: float, success: bool):
"""Record performance statistics"""
if func_name not in self.stats:
self.stats[func_name] = {
'total_calls': 0,
'total_time': 0.0,
'successful_calls': 0,
'failed_calls': 0,
'avg_time': 0.0
}
stats = self.stats[func_name]
stats['total_calls'] += 1
stats['total_time'] += duration
if success:
stats['successful_calls'] += 1
else:
stats['failed_calls'] += 1
stats['avg_time'] = stats['total_time'] / stats['total_calls']
def get_report(self) -> dict:
"""Get performance report"""
return self.stats.copy()
# 5. Connection pooling best practices
class AsyncHTTPClient:
"""HTTP client with proper connection pooling"""
def __init__(self, max_connections: int = 100, timeout: float = 30.0):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=20,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def get(self, url: str, **kwargs) -> dict:
"""GET request with error handling"""
if not self._session:
raise RuntimeError("Client not initialized. Use 'async with' statement.")
try:
async with self._session.get(url, **kwargs) as response:
response.raise_for_status()
return {
'status': response.status,
'data': await response.json(),
'headers': dict(response.headers)
}
except aiohttp.ClientError as e:
raise AsyncIOError(f"HTTP request failed: {e}")
# Example usage
async def best_practices_example():
"""Demonstrate best practices"""
# Performance monitoring
monitor = AsyncPerformanceMonitor()
@monitor.monitor("api_call")
async def monitored_api_call():
async with AsyncHTTPClient() as client:
return await client.get("https://httpbin.org/json")
# Resource management
async with AsyncContextManager() as resource:
try:
result = await monitored_api_call()
print(f"API call result: {result['status']}")
except AsyncIOError as e:
print(f"API error: {e}")
# Print performance stats
print("Performance stats:", monitor.get_report())
# 6. Common pitfalls and how to avoid them
async def common_pitfalls_examples():
"""Examples of common pitfalls and their solutions"""
# PITFALL 1: Blocking I/O in async functions
# BAD:
# time.sleep(1) # This blocks the entire event loop!
# GOOD:
await asyncio.sleep(1) # This yields control to other coroutines
# PITFALL 2: Not awaiting coroutines
# BAD:
# result = async_function() # This returns a coroutine object, not the result!
# GOOD:
result = await async_function()
# PITFALL 3: Creating tasks without managing them
# BAD:
# asyncio.create_task(some_background_work()) # Fire and forget - might leak
# GOOD:
task = asyncio.create_task(some_background_work())
try:
await task
except Exception as e:
print(f"Background task failed: {e}")
# PITFALL 4: Not handling CancelledError
# BAD:
async def bad_cleanup():
try:
await long_running_operation()
except:
pass # This swallows CancelledError!
# GOOD:
async def good_cleanup():
try:
await long_running_operation()
except asyncio.CancelledError:
print("Operation was cancelled")
# Perform cleanup
raise # Re-raise to maintain cancellation
except Exception as e:
print(f"Operation failed: {e}")
async def async_function():
return "result"
async def some_background_work():
await asyncio.sleep(2)
return "done"
async def long_running_operation():
await asyncio.sleep(10)Python10. Performance and Optimization
Understanding Asyncio Performance
Asyncio performance depends on understanding the event loop behavior and I/O patterns.
graph TD
A[Performance Factors] --> B[Event Loop Efficiency]
A --> C[I/O Wait Time]
A --> D[Task Switching Overhead]
A --> E[Memory Usage]
B --> F[Minimize Blocking Operations]
C --> G[Connection Pooling]
D --> H[Batch Operations]
E --> I[Resource Management]Profiling Asyncio Applications
import asyncio
import cProfile
import pstats
import time
import functools
from typing import Dict, List
import aiohttp
class AsyncProfiler:
"""Profiler for asyncio applications"""
def __init__(self):
self.call_times: Dict[str, List[float]] = {}
self.start_times: Dict[str, float] = {}
def profile_coroutine(self, name: str = None):
"""Decorator to profile coroutine execution time"""
def decorator(func):
coro_name = name or f"{func.__module__}.{func.__name__}"
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start_time
if coro_name not in self.call_times:
self.call_times[coro_name] = []
self.call_times[coro_name].append(duration)
return wrapper
return decorator
def get_stats(self) -> Dict[str, Dict[str, float]]:
"""Get profiling statistics"""
stats = {}
for name, times in self.call_times.items():
stats[name] = {
'calls': len(times),
'total_time': sum(times),
'avg_time': sum(times) / len(times),
'min_time': min(times),
'max_time': max(times)
}
return stats
def print_stats(self):
"""Print formatted statistics"""
stats = self.get_stats()
print("\n" + "="*60)
print("ASYNCIO PERFORMANCE PROFILE")
print("="*60)
print(f"{'Function':<30} {'Calls':<8} {'Total':<10} {'Avg':<10} {'Min':<10} {'Max':<10}")
print("-"*60)
for name, data in sorted(stats.items(), key=lambda x: x[1]['total_time'], reverse=True):
print(f"{name:<30} {data['calls']:<8} {data['total_time']:<10.4f} "
f"{data['avg_time']:<10.4f} {data['min_time']:<10.4f} {data['max_time']:<10.4f}")
# Usage example
profiler = AsyncProfiler()
@profiler.profile_coroutine("fetch_url")
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
@profiler.profile_coroutine("process_data")
async def process_data(data):
# Simulate CPU-bound work
await asyncio.sleep(0.1)
return len(data)
async def profiling_example():
urls = ['http://httpbin.org/delay/1'] * 5
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
content_task = fetch_url(session, url)
tasks.append(content_task)
contents = await asyncio.gather(*tasks)
# Process results
process_tasks = [process_data(content) for content in contents]
results = await asyncio.gather(*process_tasks)
profiler.print_stats()
return resultsPythonMemory Optimization
import asyncio
import weakref
import gc
from typing import Optional, Set
import psutil
import os
class MemoryMonitor:
"""Monitor memory usage in asyncio applications"""
def __init__(self):
self.process = psutil.Process(os.getpid())
self.baseline_memory = self.get_memory_usage()
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def print_memory_info(self, label: str = ""):
"""Print current memory information"""
current = self.get_memory_usage()
diff = current - self.baseline_memory
print(f"Memory {label}: {current:.2f} MB (Δ {diff:+.2f} MB)")
async def monitor_coroutine(self, coro, label: str = ""):
"""Monitor memory usage of a coroutine"""
before = self.get_memory_usage()
result = await coro
after = self.get_memory_usage()
print(f"Memory {label}: {before:.2f} → {after:.2f} MB (Δ {after-before:+.2f} MB)")
return result
class AsyncObjectPool:
"""Object pool to reduce memory allocation"""
def __init__(self, factory, max_size: int = 100):
self.factory = factory
self.max_size = max_size
self.pool: List[Any] = []
self.in_use: Set[Any] = set()
async def acquire(self):
"""Acquire object from pool"""
if self.pool:
obj = self.pool.pop()
else:
obj = await self.factory()
self.in_use.add(obj)
return obj
async def release(self, obj):
"""Release object back to pool"""
if obj in self.in_use:
self.in_use.remove(obj)
if len(self.pool) < self.max_size:
# Reset object state if needed
if hasattr(obj, 'reset'):
await obj.reset()
self.pool.append(obj)
class AsyncResource:
"""Example resource for pooling"""
def __init__(self, resource_id: str):
self.resource_id = resource_id
self.data = bytearray(1024) # Some memory allocation
async def reset(self):
"""Reset resource state"""
self.data = bytearray(1024)
async def use(self):
"""Simulate resource usage"""
await asyncio.sleep(0.1)
return f"Used resource {self.resource_id}"
async def create_resource():
"""Factory function for resources"""
import uuid
return AsyncResource(str(uuid.uuid4())[:8])
# Memory optimization techniques
async def memory_optimization_examples():
"""Demonstrate memory optimization techniques"""
monitor = MemoryMonitor()
monitor.print_memory_info("baseline")
# 1. Object pooling
pool = AsyncObjectPool(create_resource, max_size=10)
async def use_pooled_resource():
resource = await pool.acquire()
try:
result = await resource.use()
return result
finally:
await pool.release(resource)
# Test object pooling
tasks = [use_pooled_resource() for _ in range(50)]
results = await asyncio.gather(*tasks)
monitor.print_memory_info("after pooling")
# 2. Lazy loading and cleanup
class LazyDataLoader:
def __init__(self):
self._data: Optional[bytes] = None
self._refs = weakref.WeakSet()
async def get_data(self):
if self._data is None:
print("Loading large dataset...")
self._data = b"x" * (1024 * 1024) # 1MB of data
return self._data
def register_user(self, user):
self._refs.add(user)
def cleanup_if_unused(self):
if not self._refs:
print("Cleaning up unused data...")
self._data = None
gc.collect()
loader = LazyDataLoader()
# Simulate users
class DataUser:
def __init__(self, loader):
self.loader = loader
loader.register_user(self)
async def process(self):
data = await self.loader.get_data()
await asyncio.sleep(0.1)
return len(data)
# Create users and process data
users = [DataUser(loader) for _ in range(5)]
tasks = [user.process() for user in users]
results = await asyncio.gather(*tasks)
monitor.print_memory_info("after lazy loading")
# Cleanup
del users
loader.cleanup_if_unused()
monitor.print_memory_info("after cleanup")
# 3. Streaming data processing
async def memory_efficient_data_processing():
"""Process large datasets without loading everything into memory"""
async def data_generator():
"""Generate data chunks"""
for i in range(100):
# Simulate reading chunks from file/network
chunk = f"data_chunk_{i}" * 100
yield chunk
await asyncio.sleep(0.01)
async def process_chunk(chunk):
"""Process individual chunk"""
await asyncio.sleep(0.01)
return len(chunk)
total_processed = 0
async for chunk in data_generator():
size = await process_chunk(chunk)
total_processed += size
# Optional: yield control periodically
if total_processed % 10000 == 0:
await asyncio.sleep(0)
return total_processed
result = await monitor.monitor_coroutine(
memory_efficient_data_processing(),
"streaming processing"
)
print(f"Processed {result} bytes")PythonConcurrency Optimization
import asyncio
import aiohttp
from asyncio import Semaphore, Queue
from typing import List, Callable, Any
import time
class OptimizedConcurrencyManager:
"""Manage concurrent operations with optimization"""
def __init__(self, max_concurrent: int = 10, rate_limit: float = 0.1):
self.semaphore = Semaphore(max_concurrent)
self.rate_limit = rate_limit
self.last_request_time = 0
async def execute_with_limits(self, coro):
"""Execute coroutine with concurrency and rate limiting"""
async with self.semaphore:
# Rate limiting
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.rate_limit:
await asyncio.sleep(self.rate_limit - time_since_last)
self.last_request_time = time.time()
return await coro
class BatchProcessor:
"""Process items in batches for better performance"""
def __init__(self, batch_size: int = 10, max_concurrent_batches: int = 3):
self.batch_size = batch_size
self.semaphore = Semaphore(max_concurrent_batches)
async def process_batch(self, items: List[Any], processor: Callable):
"""Process a batch of items"""
async with self.semaphore:
tasks = [processor(item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
async def process_all(self, items: List[Any], processor: Callable):
"""Process all items in batches"""
batches = [
items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)
]
batch_tasks = [
self.process_batch(batch, processor)
for batch in batches
]
batch_results = await asyncio.gather(*batch_tasks)
# Flatten results
results = []
for batch_result in batch_results:
results.extend(batch_result)
return results
class AsyncQueue:
"""Optimized async queue with producer-consumer pattern"""
def __init__(self, maxsize: int = 0):
self.queue = Queue(maxsize=maxsize)
self.producers_finished = asyncio.Event()
self.active_producers = 0
async def producer(self, items: List[Any]):
"""Producer coroutine"""
self.active_producers += 1
try:
for item in items:
await self.queue.put(item)
await asyncio.sleep(0) # Yield control
finally:
self.active_producers -= 1
if self.active_producers == 0:
self.producers_finished.set()
async def consumer(self, processor: Callable):
"""Consumer coroutine"""
results = []
while True:
try:
# Wait for item with timeout
item = await asyncio.wait_for(self.queue.get(), timeout=1.0)
result = await processor(item)
results.append(result)
self.queue.task_done()
except asyncio.TimeoutError:
# Check if producers are done
if self.producers_finished.is_set() and self.queue.empty():
break
return results
async def process_with_queue(self, all_items: List[Any], processor: Callable, num_consumers: int = 3):
"""Process items using producer-consumer pattern"""
# Split items among producers
chunk_size = len(all_items) // 2 + 1
chunks = [all_items[i:i + chunk_size] for i in range(0, len(all_items), chunk_size)]
# Start producers
producer_tasks = [
asyncio.create_task(self.producer(chunk))
for chunk in chunks
]
# Start consumers
consumer_tasks = [
asyncio.create_task(self.consumer(processor))
for _ in range(num_consumers)
]
# Wait for all producers to finish
await asyncio.gather(*producer_tasks)
# Wait for all consumers to finish
consumer_results = await asyncio.gather(*consumer_tasks)
# Flatten results
all_results = []
for results in consumer_results:
all_results.extend(results)
return all_results
# Performance comparison examples
async def performance_comparison():
"""Compare different concurrency approaches"""
async def simulate_io_task(item_id: int):
"""Simulate I/O bound task"""
await asyncio.sleep(0.1) # Simulate network delay
return f"processed_{item_id}"
items = list(range(100))
# 1. Sequential processing (baseline)
print("Testing sequential processing...")
start_time = time.time()
sequential_results = []
for item in items:
result = await simulate_io_task(item)
sequential_results.append(result)
sequential_time = time.time() - start_time
print(f"Sequential: {sequential_time:.2f}s")
# 2. Unlimited concurrency
print("Testing unlimited concurrency...")
start_time = time.time()
unlimited_tasks = [simulate_io_task(item) for item in items]
unlimited_results = await asyncio.gather(*unlimited_tasks)
unlimited_time = time.time() - start_time
print(f"Unlimited concurrency: {unlimited_time:.2f}s")
# 3. Limited concurrency
print("Testing limited concurrency...")
start_time = time.time()
manager = OptimizedConcurrencyManager(max_concurrent=10)
limited_tasks = [manager.execute_with_limits(simulate_io_task(item)) for item in items]
limited_results = await asyncio.gather(*limited_tasks)
limited_time = time.time() - start_time
print(f"Limited concurrency: {limited_time:.2f}s")
# 4. Batch processing
print("Testing batch processing...")
start_time = time.time()
processor = BatchProcessor(batch_size=10, max_concurrent_batches=5)
batch_results = await processor.process_all(items, simulate_io_task)
batch_time = time.time() - start_time
print(f"Batch processing: {batch_time:.2f}s")
# 5. Queue-based processing
print("Testing queue-based processing...")
start_time = time.time()
queue_processor = AsyncQueue()
queue_results = await queue_processor.process_with_queue(items, simulate_io_task, num_consumers=5)
queue_time = time.time() - start_time
print(f"Queue-based: {queue_time:.2f}s")
# Performance summary
print("\n" + "="*50)
print("PERFORMANCE COMPARISON")
print("="*50)
approaches = [
("Sequential", sequential_time),
("Unlimited Concurrency", unlimited_time),
("Limited Concurrency", limited_time),
("Batch Processing", batch_time),
("Queue-based", queue_time)
]
for name, time_taken in approaches:
speedup = sequential_time / time_taken
print(f"{name:<20}: {time_taken:>6.2f}s (speedup: {speedup:>5.1f}x)")
# Connection pooling optimization
class OptimizedHTTPClient:
"""HTTP client optimized for performance"""
def __init__(self):
# Optimized connector settings
self.connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=20, # Connections per host
keepalive_timeout=300, # Keep connections alive longer
enable_cleanup_closed=True,
use_dns_cache=True,
ttl_dns_cache=300,
family=0, # IPv4 and IPv6
ssl=False
)
# Optimized timeout settings
self.timeout = aiohttp.ClientTimeout(
total=30,
connect=5,
sock_read=5
)
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def fetch_multiple(self, urls: List[str], max_concurrent: int = 20):
"""Fetch multiple URLs with optimized concurrency"""
semaphore = Semaphore(max_concurrent)
async def fetch_one(url):
async with semaphore:
try:
async with self.session.get(url) as response:
return {
'url': url,
'status': response.status,
'size': len(await response.text()),
'time': response.headers.get('X-Response-Time', 'unknown')
}
except Exception as e:
return {'url': url, 'error': str(e)}
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def optimization_demo():
"""Demonstrate various optimization techniques"""
print("Running optimization demonstrations...")
# Performance comparison
await performance_comparison()
# HTTP client optimization
urls = [f'http://httpbin.org/delay/0.1?id={i}' for i in range(20)]
async with OptimizedHTTPClient() as client:
print("\nTesting optimized HTTP client...")
start_time = time.time()
results = await client.fetch_multiple(urls, max_concurrent=10)
http_time = time.time() - start_time
successful = len([r for r in results if 'error' not in r])
print(f"HTTP requests: {successful}/{len(urls)} successful in {http_time:.2f}s")
if __name__ == "__main__":
asyncio.run(optimization_demo())Python11. Real-World Applications
Web Scraping at Scale
Building a high-performance web scraper that can handle thousands of URLs efficiently.
graph TD
A[URL Queue] --> B[Scraper Pool]
B --> C[Rate Limiter]
C --> D[HTTP Client]
D --> E[Parser]
E --> F[Data Pipeline]
F --> G[Storage]
H[Retry Logic] --> D
I[Error Handler] --> B
J[Progress Monitor] --> Aimport asyncio
import aiohttp
import aiofiles
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import json
import time
from typing import List, Dict, Set, Optional
from dataclasses import dataclass
import logging
@dataclass
class ScrapingJob:
url: str
depth: int = 0
priority: int = 1
retries: int = 0
metadata: Dict = None
class AsyncWebScraper:
"""Production-ready async web scraper"""
def __init__(self,
max_concurrent: int = 50,
max_retries: int = 3,
delay_between_requests: float = 0.1,
timeout: int = 30):
self.max_concurrent = max_concurrent
self.max_retries = max_retries
self.delay_between_requests = delay_between_requests
# Setup HTTP client with optimized settings
self.connector = aiohttp.TCPConnector(
limit=max_concurrent * 2,
limit_per_host=20,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session: Optional[aiohttp.ClientSession] = None
# Concurrency control
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = asyncio.Semaphore(1)
self.last_request_time = 0
# State tracking
self.visited_urls: Set[str] = set()
self.failed_urls: Set[str] = set()
self.results: List[Dict] = []
# Logging
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def rate_limit(self):
"""Implement rate limiting"""
async with self.rate_limiter:
now = time.time()
time_since_last = now - self.last_request_time
if time_since_last < self.delay_between_requests:
await asyncio.sleep(self.delay_between_requests - time_since_last)
self.last_request_time = time.time()
async def fetch_page(self, job: ScrapingJob) -> Optional[Dict]:
"""Fetch and parse a single page"""
if job.url in self.visited_urls:
return None
async with self.semaphore:
await self.rate_limit()
try:
async with self.session.get(job.url) as response:
if response.status == 200:
content = await response.text()
self.visited_urls.add(job.url)
return {
'url': job.url,
'status': response.status,
'content': content,
'headers': dict(response.headers),
'depth': job.depth,
'timestamp': time.time()
}
else:
self.logger.warning(f"HTTP {response.status} for {job.url}")
return None
except Exception as e:
self.logger.error(f"Error fetching {job.url}: {e}")
if job.retries < self.max_retries:
job.retries += 1
await asyncio.sleep(2 ** job.retries) # Exponential backoff
return await self.fetch_page(job)
else:
self.failed_urls.add(job.url)
return None
def extract_links(self, content: str, base_url: str) -> List[str]:
"""Extract links from HTML content"""
try:
soup = BeautifulSoup(content, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(base_url, href)
# Filter valid HTTP URLs
parsed = urlparse(full_url)
if parsed.scheme in ('http', 'https'):
links.append(full_url)
return links
except Exception as e:
self.logger.error(f"Error extracting links: {e}")
return []
def parse_content(self, page_data: Dict) -> Dict:
"""Parse content and extract structured data"""
try:
soup = BeautifulSoup(page_data['content'], 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Extract structured data
parsed_data = {
'url': page_data['url'],
'title': soup.find('title').get_text().strip() if soup.find('title') else '',
'meta_description': '',
'headings': [],
'text_content': soup.get_text(),
'images': [],
'links': self.extract_links(page_data['content'], page_data['url']),
'depth': page_data['depth'],
'timestamp': page_data['timestamp']
}
# Extract meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
parsed_data['meta_description'] = meta_desc.get('content', '')
# Extract headings
for i in range(1, 7):
headings = soup.find_all(f'h{i}')
for heading in headings:
parsed_data['headings'].append({
'level': i,
'text': heading.get_text().strip()
})
# Extract images
for img in soup.find_all('img', src=True):
img_url = urljoin(page_data['url'], img['src'])
parsed_data['images'].append({
'src': img_url,
'alt': img.get('alt', ''),
'title': img.get('title', '')
})
return parsed_data
except Exception as e:
self.logger.error(f"Error parsing content for {page_data['url']}: {e}")
return None
async def scrape_urls(self,
urls: List[str],
max_depth: int = 2,
follow_links: bool = True) -> List[Dict]:
"""Scrape multiple URLs with optional link following"""
# Initialize job queue
job_queue = asyncio.Queue()
# Add initial URLs to queue
for url in urls:
await job_queue.put(ScrapingJob(url=url, depth=0))
# Worker coroutine
async def worker():
worker_results = []
while True:
try:
job = await asyncio.wait_for(job_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
break
# Fetch page
page_data = await self.fetch_page(job)
if page_data:
# Parse content
parsed_data = self.parse_content(page_data)
if parsed_data:
worker_results.append(parsed_data)
# Add child links to queue if following links
if follow_links and job.depth < max_depth:
for link in parsed_data['links'][:5]: # Limit links per page
if link not in self.visited_urls:
child_job = ScrapingJob(
url=link,
depth=job.depth + 1
)
await job_queue.put(child_job)
job_queue.task_done()
return worker_results
# Start workers
workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
# Wait for all jobs to complete
await job_queue.join()
# Cancel workers
for worker_task in workers:
worker_task.cancel()
# Collect results
worker_results = await asyncio.gather(*workers, return_exceptions=True)
all_results = []
for result in worker_results:
if isinstance(result, list):
all_results.extend(result)
return all_results
async def save_results(self, results: List[Dict], filename: str):
"""Save results to JSON file"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(results, indent=2, ensure_ascii=False))
self.logger.info(f"Saved {len(results)} results to {filename}")
# Usage example
async def scraping_example():
"""Demonstrate web scraping"""
urls_to_scrape = [
'https://httpbin.org/html',
'https://httpbin.org/links/10',
'https://example.com'
]
async with AsyncWebScraper(max_concurrent=10) as scraper:
results = await scraper.scrape_urls(
urls_to_scrape,
max_depth=1,
follow_links=True
)
await scraper.save_results(results, 'scraping_results.json')
print(f"Scraped {len(results)} pages")
print(f"Visited URLs: {len(scraper.visited_urls)}")
print(f"Failed URLs: {len(scraper.failed_urls)}")PythonReal-Time Chat Application
Building a WebSocket-based chat application with asyncio.
import asyncio
import websockets
import json
import logging
from typing import Set, Dict, Optional
from datetime import datetime
import jwt
import hashlib
class ChatRoom:
"""Manages a chat room with multiple users"""
def __init__(self, room_id: str):
self.room_id = room_id
self.clients: Set[websockets.WebSocketServerProtocol] = set()
self.message_history: List[Dict] = []
self.max_history = 100
async def add_client(self, websocket, username: str):
"""Add a client to the room"""
self.clients.add(websocket)
websocket.username = username
websocket.room_id = self.room_id
# Send chat history to new client
for message in self.message_history[-10:]: # Last 10 messages
await websocket.send(json.dumps(message))
# Notify others about new user
await self.broadcast({
'type': 'user_joined',
'username': username,
'timestamp': datetime.now().isoformat(),
'user_count': len(self.clients)
}, exclude=websocket)
async def remove_client(self, websocket):
"""Remove a client from the room"""
if websocket in self.clients:
self.clients.remove(websocket)
username = getattr(websocket, 'username', 'Unknown')
# Notify others about user leaving
await self.broadcast({
'type': 'user_left',
'username': username,
'timestamp': datetime.now().isoformat(),
'user_count': len(self.clients)
})
async def broadcast(self, message: Dict, exclude: Optional[websockets.WebSocketServerProtocol] = None):
"""Broadcast message to all clients in room"""
if not self.clients:
return
message_json = json.dumps(message)
# Send to all clients except excluded one
clients_to_send = self.clients - {exclude} if exclude else self.clients
# Handle disconnected clients
disconnected = set()
for client in clients_to_send:
try:
await client.send(message_json)
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
# Remove disconnected clients
for client in disconnected:
await self.remove_client(client)
async def add_message(self, message: Dict):
"""Add message to history and broadcast"""
# Add timestamp and store in history
message['timestamp'] = datetime.now().isoformat()
self.message_history.append(message)
# Limit history size
if len(self.message_history) > self.max_history:
self.message_history = self.message_history[-self.max_history:]
# Broadcast to all clients
await self.broadcast(message)
class ChatServer:
"""WebSocket chat server"""
def __init__(self, secret_key: str = "your-secret-key"):
self.rooms: Dict[str, ChatRoom] = {}
self.secret_key = secret_key
self.logger = logging.getLogger(__name__)
def get_or_create_room(self, room_id: str) -> ChatRoom:
"""Get existing room or create new one"""
if room_id not in self.rooms:
self.rooms[room_id] = ChatRoom(room_id)
return self.rooms[room_id]
def authenticate_user(self, token: str) -> Optional[str]:
"""Authenticate user with JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload.get('username')
except jwt.InvalidTokenError:
return None
def create_token(self, username: str) -> str:
"""Create JWT token for user"""
payload = {
'username': username,
'exp': datetime.utcnow().timestamp() + 3600 # 1 hour expiry
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
async def handle_client(self, websocket, path):
"""Handle individual client connection"""
username = None
room = None
try:
# Wait for authentication message
auth_message = await asyncio.wait_for(websocket.recv(), timeout=10.0)
auth_data = json.loads(auth_message)
if auth_data['type'] == 'auth':
username = self.authenticate_user(auth_data['token'])
if not username:
await websocket.send(json.dumps({
'type': 'error',
'message': 'Authentication failed'
}))
return
room_id = auth_data.get('room_id', 'general')
room = self.get_or_create_room(room_id)
await room.add_client(websocket, username)
await websocket.send(json.dumps({
'type': 'auth_success',
'message': f'Welcome to room {room_id}!'
}))
self.logger.info(f"User {username} joined room {room_id}")
# Handle messages
async for message in websocket:
try:
data = json.loads(message)
await self.process_message(data, websocket, room)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': 'Invalid JSON format'
}))
except Exception as e:
self.logger.error(f"Error processing message: {e}")
except asyncio.TimeoutError:
await websocket.send(json.dumps({
'type': 'error',
'message': 'Authentication timeout'
}))
except websockets.exceptions.ConnectionClosed:
pass
except Exception as e:
self.logger.error(f"Error handling client: {e}")
finally:
if room and websocket in room.clients:
await room.remove_client(websocket)
if username:
self.logger.info(f"User {username} left room {room.room_id}")
async def process_message(self, data: Dict, websocket, room: ChatRoom):
"""Process different types of messages"""
message_type = data.get('type')
username = getattr(websocket, 'username', 'Unknown')
if message_type == 'chat':
# Regular chat message
await room.add_message({
'type': 'chat',
'username': username,
'message': data['message'],
'room_id': room.room_id
})
elif message_type == 'typing':
# Typing indicator
await room.broadcast({
'type': 'typing',
'username': username,
'is_typing': data.get('is_typing', False)
}, exclude=websocket)
elif message_type == 'ping':
# Ping/pong for connection health
await websocket.send(json.dumps({'type': 'pong'}))
else:
await websocket.send(json.dumps({
'type': 'error',
'message': f'Unknown message type: {message_type}'
}))
async def start_server(self, host: str = 'localhost', port: int = 8765):
"""Start the WebSocket server"""
self.logger.info(f"Starting chat server on {host}:{port}")
async with websockets.serve(self.handle_client, host, port):
await asyncio.Future() # Run forever
# Chat client example
class ChatClient:
"""WebSocket chat client"""
def __init__(self, username: str, server_url: str = "ws://localhost:8765"):
self.username = username
self.server_url = server_url
self.websocket = None
self.token = None
async def connect(self, room_id: str = 'general'):
"""Connect to chat server"""
try:
self.websocket = await websockets.connect(self.server_url)
# Create token (in real app, get from auth server)
server = ChatServer()
self.token = server.create_token(self.username)
# Send authentication
auth_message = {
'type': 'auth',
'token': self.token,
'room_id': room_id
}
await self.websocket.send(json.dumps(auth_message))
# Wait for auth response
response = await self.websocket.recv()
auth_response = json.loads(response)
if auth_response['type'] == 'auth_success':
print(f"Connected: {auth_response['message']}")
return True
else:
print(f"Connection failed: {auth_response['message']}")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
async def send_message(self, message: str):
"""Send chat message"""
if self.websocket:
await self.websocket.send(json.dumps({
'type': 'chat',
'message': message
}))
async def listen_for_messages(self):
"""Listen for incoming messages"""
try:
async for message in self.websocket:
data = json.loads(message)
self.handle_message(data)
except websockets.exceptions.ConnectionClosed:
print("Connection closed by server")
except Exception as e:
print(f"Error receiving messages: {e}")
def handle_message(self, data: Dict):
"""Handle incoming message"""
msg_type = data.get('type')
if msg_type == 'chat':
timestamp = data.get('timestamp', '')
username = data.get('username', '')
message = data.get('message', '')
print(f"[{timestamp}] {username}: {message}")
elif msg_type == 'user_joined':
username = data.get('username', '')
count = data.get('user_count', 0)
print(f"*** {username} joined the chat (users online: {count}) ***")
elif msg_type == 'user_left':
username = data.get('username', '')
count = data.get('user_count', 0)
print(f"*** {username} left the chat (users online: {count}) ***")
elif msg_type == 'typing':
username = data.get('username', '')
is_typing = data.get('is_typing', False)
if is_typing:
print(f"*** {username} is typing... ***")
elif msg_type == 'error':
print(f"Error: {data.get('message', 'Unknown error')}")
async def disconnect(self):
"""Disconnect from server"""
if self.websocket:
await self.websocket.close()
# Usage examples
async def run_chat_server():
"""Run the chat server"""
server = ChatServer()
await server.start_server()
async def run_chat_client():
"""Run a chat client"""
client = ChatClient("TestUser")
if await client.connect("general"):
# Start listening for messages in background
listen_task = asyncio.create_task(client.listen_for_messages())
# Send some test messages
await client.send_message("Hello, everyone!")
await asyncio.sleep(1)
await client.send_message("How is everyone doing?")
# Wait a bit then disconnect
await asyncio.sleep(5)
await client.disconnect()
# Cancel listening task
listen_task.cancel()
try:
await listen_task
except asyncio.CancelledError:
passPythonAPI Rate Limiter
Building a sophisticated rate limiting system for APIs.
import asyncio
import time
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import redis.asyncio as redis
import json
class RateLimitStrategy(Enum):
FIXED_WINDOW = "fixed_window"
SLIDING_WINDOW = "sliding_window"
TOKEN_BUCKET = "token_bucket"
LEAKY_BUCKET = "leaky_bucket"
@dataclass
class RateLimitRule:
requests_per_window: int
window_size_seconds: int
strategy: RateLimitStrategy = RateLimitStrategy.SLIDING_WINDOW
burst_allowance: int = 0
class AsyncRateLimiter:
"""Advanced async rate limiter with multiple strategies"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis_client: Optional[redis.Redis] = None
self.local_cache: Dict[str, Dict] = {}
async def __aenter__(self):
self.redis_client = redis.from_url(self.redis_url)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.redis_client:
await self.redis_client.close()
async def check_rate_limit(self,
key: str,
rule: RateLimitRule) -> Tuple[bool, Dict]:
"""Check if request is within rate limit"""
current_time = time.time()
if rule.strategy == RateLimitStrategy.SLIDING_WINDOW:
return await self._sliding_window_check(key, rule, current_time)
elif rule.strategy == RateLimitStrategy.FIXED_WINDOW:
return await self._fixed_window_check(key, rule, current_time)
elif rule.strategy == RateLimitStrategy.TOKEN_BUCKET:
return await self._token_bucket_check(key, rule, current_time)
elif rule.strategy == RateLimitStrategy.LEAKY_BUCKET:
return await self._leaky_bucket_check(key, rule, current_time)
else:
raise ValueError(f"Unknown strategy: {rule.strategy}")
async def _sliding_window_check(self,
key: str,
rule: RateLimitRule,
current_time: float) -> Tuple[bool, Dict]:
"""Sliding window rate limiting"""
window_start = current_time - rule.window_size_seconds
# Lua script for atomic operations
lua_script = """
local key = KEYS[1]
local window_start = tonumber(ARGV[1])
local current_time = tonumber(ARGV[2])
local max_requests = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- Count current requests in window
local current_count = redis.call('ZCARD', key)
if current_count < max_requests then
-- Add current request
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, window_size + 1)
return {1, current_count + 1, max_requests - current_count - 1}
else
return {0, current_count, 0}
end
"""
result = await self.redis_client.eval(
lua_script,
1,
key,
window_start,
current_time,
rule.requests_per_window,
rule.window_size_seconds
)
allowed = bool(result[0])
current_count = result[1]
remaining = result[2]
return allowed, {
'allowed': allowed,
'current_count': current_count,
'remaining': remaining,
'reset_time': current_time + rule.window_size_seconds,
'strategy': rule.strategy.value
}
async def _fixed_window_check(self,
key: str,
rule: RateLimitRule,
current_time: float) -> Tuple[bool, Dict]:
"""Fixed window rate limiting"""
window_id = int(current_time // rule.window_size_seconds)
window_key = f"{key}:window:{window_id}"
# Lua script for atomic increment and check
lua_script = """
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local current_count = redis.call('INCR', key)
if current_count == 1 then
redis.call('EXPIRE', key, window_size)
end
if current_count <= max_requests then
return {1, current_count, max_requests - current_count}
else
return {0, current_count, 0}
end
"""
result = await self.redis_client.eval(
lua_script,
1,
window_key,
rule.requests_per_window,
rule.window_size_seconds
)
allowed = bool(result[0])
current_count = result[1]
remaining = result[2]
reset_time = (window_id + 1) * rule.window_size_seconds
return allowed, {
'allowed': allowed,
'current_count': current_count,
'remaining': remaining,
'reset_time': reset_time,
'strategy': rule.strategy.value
}
async def _token_bucket_check(self,
key: str,
rule: RateLimitRule,
current_time: float) -> Tuple[bool, Dict]:
"""Token bucket rate limiting"""
bucket_key = f"{key}:bucket"
# Lua script for token bucket
lua_script = """
local key = KEYS[1]
local current_time = tonumber(ARGV[1])
local max_tokens = tonumber(ARGV[2])
local refill_rate = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local bucket_data = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket_data[1]) or max_tokens
local last_refill = tonumber(bucket_data[2]) or current_time
-- Calculate tokens to add based on time elapsed
local time_elapsed = current_time - last_refill
local tokens_to_add = math.floor(time_elapsed * refill_rate / window_size)
tokens = math.min(max_tokens, tokens + tokens_to_add)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, window_size * 2)
return {1, tokens, max_tokens}
else
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
redis.call('EXPIRE', key, window_size * 2)
return {0, tokens, max_tokens}
end
"""
result = await self.redis_client.eval(
lua_script,
1,
bucket_key,
current_time,
rule.requests_per_window + rule.burst_allowance,
rule.requests_per_window,
rule.window_size_seconds
)
allowed = bool(result[0])
tokens_remaining = result[1]
max_tokens = result[2]
return allowed, {
'allowed': allowed,
'tokens_remaining': tokens_remaining,
'max_tokens': max_tokens,
'strategy': rule.strategy.value
}
class RateLimitMiddleware:
"""Rate limiting middleware for web applications"""
def __init__(self, limiter: AsyncRateLimiter):
self.limiter = limiter
self.rules: Dict[str, RateLimitRule] = {}
self.default_rule = RateLimitRule(
requests_per_window=100,
window_size_seconds=60,
strategy=RateLimitStrategy.SLIDING_WINDOW
)
def add_rule(self, pattern: str, rule: RateLimitRule):
"""Add rate limiting rule for specific pattern"""
self.rules[pattern] = rule
def get_client_key(self, request) -> str:
"""Extract client identifier from request"""
# In real implementation, use IP, user ID, API key, etc.
client_ip = getattr(request, 'remote_addr', 'unknown')
user_id = getattr(request, 'user_id', None)
if user_id:
return f"user:{user_id}"
else:
return f"ip:{client_ip}"
def get_rule_for_endpoint(self, endpoint: str) -> RateLimitRule:
"""Get rate limiting rule for endpoint"""
for pattern, rule in self.rules.items():
if pattern in endpoint:
return rule
return self.default_rule
async def process_request(self, request) -> Tuple[bool, Dict]:
"""Process request through rate limiter"""
client_key = self.get_client_key(request)
endpoint = getattr(request, 'path', '/')
rule = self.get_rule_for_endpoint(endpoint)
rate_limit_key = f"rate_limit:{client_key}:{endpoint}"
return await self.limiter.check_rate_limit(rate_limit_key, rule)
# Example usage with different web frameworks
class AsyncAPIServer:
"""Example async API server with rate limiting"""
def __init__(self):
self.limiter = None
self.middleware = None
async def __aenter__(self):
self.limiter = AsyncRateLimiter()
await self.limiter.__aenter__()
self.middleware = RateLimitMiddleware(self.limiter)
# Configure rate limiting rules
self.middleware.add_rule('/api/upload', RateLimitRule(
requests_per_window=5,
window_size_seconds=60,
strategy=RateLimitStrategy.TOKEN_BUCKET,
burst_allowance=2
))
self.middleware.add_rule('/api/search', RateLimitRule(
requests_per_window=50,
window_size_seconds=60,
strategy=RateLimitStrategy.SLIDING_WINDOW
))
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.limiter:
await self.limiter.__aexit__(exc_type, exc_val, exc_tb)
async def handle_request(self, request_data: Dict) -> Dict:
"""Handle API request with rate limiting"""
# Mock request object
class MockRequest:
def __init__(self, data):
self.remote_addr = data.get('client_ip', '127.0.0.1')
self.user_id = data.get('user_id')
self.path = data.get('path', '/')
request = MockRequest(request_data)
# Check rate limit
allowed, rate_info = await self.middleware.process_request(request)
if not allowed:
return {
'status': 429,
'error': 'Rate limit exceeded',
'rate_limit_info': rate_info
}
# Process request (mock)
await asyncio.sleep(0.1) # Simulate processing
return {
'status': 200,
'data': 'Request processed successfully',
'rate_limit_info': rate_info
}
# Distributed rate limiter for microservices
class DistributedRateLimiter:
"""Distributed rate limiter for microservices"""
def __init__(self, redis_cluster_nodes: List[str]):
self.redis_nodes = redis_cluster_nodes
self.redis_client = None
async def __aenter__(self):
# In real implementation, use Redis Cluster
self.redis_client = redis.from_url(self.redis_nodes[0])
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.redis_client:
await self.redis_client.close()
async def check_global_rate_limit(self,
service_name: str,
resource_type: str,
identifier: str,
rule: RateLimitRule) -> Tuple[bool, Dict]:
"""Check rate limit across all service instances"""
global_key = f"global_rate_limit:{service_name}:{resource_type}:{identifier}"
# Use consistent hashing to distribute load
key_hash = hash(global_key) % len(self.redis_nodes)
# For this example, we'll use single Redis instance
limiter = AsyncRateLimiter(self.redis_nodes[0])
async with limiter:
return await limiter.check_rate_limit(global_key, rule)
# Usage examples
async def rate_limiter_examples():
"""Demonstrate rate limiting functionality"""
print("=== Rate Limiter Examples ===\n")
# Example 1: Basic rate limiting
async with AsyncRateLimiter() as limiter:
rule = RateLimitRule(
requests_per_window=5,
window_size_seconds=10,
strategy=RateLimitStrategy.SLIDING_WINDOW
)
print("Testing sliding window rate limiting:")
for i in range(8):
allowed, info = await limiter.check_rate_limit("user:123", rule)
print(f"Request {i+1}: {'✓' if allowed else '✗'} - {info}")
await asyncio.sleep(1)
print()
# Example 2: API server with rate limiting
print("Testing API server with rate limiting:")
async with AsyncAPIServer() as server:
test_requests = [
{'client_ip': '192.168.1.1', 'path': '/api/search', 'user_id': 'user1'},
{'client_ip': '192.168.1.1', 'path': '/api/upload', 'user_id': 'user1'},
{'client_ip': '192.168.1.2', 'path': '/api/search', 'user_id': 'user2'},
{'client_ip': '192.168.1.1', 'path': '/api/upload', 'user_id': 'user1'},
]
for i, req_data in enumerate(test_requests):
response = await server.handle_request(req_data)
print(f"Request {i+1} ({req_data['path']}): Status {response['status']}")
if response['status'] == 429:
print(f" Rate limited: {response['rate_limit_info']}")
else:
print(f" Success: {response['rate_limit_info']}")
print()
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)
# Run examples
asyncio.run(rate_limiter_examples())PythonDatabase Connection Pool
Building an efficient async database connection pool.
import asyncio
import asyncpg
import aiosqlite
from typing import Optional, Dict, Any, List, Protocol
from contextlib import asynccontextmanager
import logging
import time
from dataclasses import dataclass
from enum import Enum
class DatabaseType(Enum):
POSTGRESQL = "postgresql"
SQLITE = "sqlite"
MYSQL = "mysql"
@dataclass
class ConnectionConfig:
database_type: DatabaseType
host: str = "localhost"
port: int = 5432
database: str = "testdb"
username: str = "user"
password: str = "password"
min_connections: int = 5
max_connections: int = 20
connection_timeout: float = 10.0
idle_timeout: float = 300.0
max_lifetime: float = 3600.0
class AsyncConnection(Protocol):
"""Protocol for async database connections"""
async def execute(self, query: str, *args) -> Any:
"""Execute a query"""
...
async def fetch(self, query: str, *args) -> List[Dict]:
"""Fetch multiple rows"""
...
async def fetchone(self, query: str, *args) -> Optional[Dict]:
"""Fetch single row"""
...
async def close(self) -> None:
"""Close connection"""
...
class PooledConnection:
"""Wrapper for pooled database connection"""
def __init__(self, connection, pool, connection_id: str):
self.connection = connection
self.pool = pool
self.connection_id = connection_id
self.created_at = time.time()
self.last_used = time.time()
self.in_use = False
self.is_closed = False
async def execute(self, query: str, *args):
"""Execute query through pooled connection"""
self.last_used = time.time()
return await self.connection.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows"""
self.last_used = time.time()
return await self.connection.fetch(query, *args)
async def fetchone(self, query: str, *args):
"""Fetch single row"""
self.last_used = time.time()
return await self.connection.fetchrow(query, *args)
async def close(self):
"""Return connection to pool"""
if not self.is_closed:
await self.pool.release_connection(self)
async def force_close(self):
"""Force close the underlying connection"""
if not self.is_closed:
await self.connection.close()
self.is_closed = True
class AsyncConnectionPool:
"""Async database connection pool"""
def __init__(self, config: ConnectionConfig):
self.config = config
self.connections: List[PooledConnection] = []
self.available_connections: asyncio.Queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(config.max_connections)
self.lock = asyncio.Lock()
self.connection_counter = 0
self.logger = logging.getLogger(__name__)
self._closed = False
async def __aenter__(self):
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def initialize(self):
"""Initialize the connection pool"""
self.logger.info(f"Initializing connection pool with {self.config.min_connections} connections")
# Create minimum number of connections
for _ in range(self.config.min_connections):
connection = await self._create_connection()
if connection:
await self.available_connections.put(connection)
async def _create_connection(self) -> Optional[PooledConnection]:
"""Create a new database connection"""
try:
if self.config.database_type == DatabaseType.POSTGRESQL:
conn = await asyncpg.connect(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.username,
password=self.config.password,
timeout=self.config.connection_timeout
)
elif self.config.database_type == DatabaseType.SQLITE:
conn = await aiosqlite.connect(self.config.database)
else:
raise ValueError(f"Unsupported database type: {self.config.database_type}")
self.connection_counter += 1
connection_id = f"conn_{self.connection_counter}"
pooled_conn = PooledConnection(conn, self, connection_id)
self.connections.append(pooled_conn)
self.logger.debug(f"Created connection {connection_id}")
return pooled_conn
except Exception as e:
self.logger.error(f"Failed to create connection: {e}")
return None
async def acquire_connection(self, timeout: Optional[float] = None) -> PooledConnection:
"""Acquire a connection from the pool"""
if self._closed:
raise RuntimeError("Connection pool is closed")
timeout = timeout or self.config.connection_timeout
async with self.semaphore:
try:
# Try to get an available connection
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=timeout
)
# Check if connection is still valid
if await self._is_connection_valid(connection):
connection.in_use = True
return connection
else:
# Connection is invalid, create a new one
await connection.force_close()
self.connections.remove(connection)
except asyncio.TimeoutError:
pass
# No available connections, try to create a new one
if len(self.connections) < self.config.max_connections:
connection = await self._create_connection()
if connection:
connection.in_use = True
return connection
raise asyncio.TimeoutError("Could not acquire connection within timeout")
async def _is_connection_valid(self, connection: PooledConnection) -> bool:
"""Check if connection is still valid"""
if connection.is_closed:
return False
# Check connection age
if time.time() - connection.created_at > self.config.max_lifetime:
return False
# Check idle timeout
if time.time() - connection.last_used > self.config.idle_timeout:
return False
try:
# Test connection with a simple query
if self.config.database_type == DatabaseType.POSTGRESQL:
await connection.connection.execute("SELECT 1")
elif self.config.database_type == DatabaseType.SQLITE:
await connection.connection.execute("SELECT 1")
return True
except Exception:
return False
async def release_connection(self, connection: PooledConnection):
"""Release connection back to pool"""
if connection.is_closed or self._closed:
return
async with self.lock:
connection.in_use = False
if await self._is_connection_valid(connection):
await self.available_connections.put(connection)
else:
# Remove invalid connection
await connection.force_close()
if connection in self.connections:
self.connections.remove(connection)
async def cleanup_idle_connections(self):
"""Clean up idle connections periodically"""
while not self._closed:
await asyncio.sleep(60) # Check every minute
async with self.lock:
to_remove = []
for conn in self.connections:
if not conn.in_use and not await self._is_connection_valid(conn):
to_remove.append(conn)
for conn in to_remove:
await conn.force_close()
self.connections.remove(conn)
self.logger.debug(f"Removed idle connection {conn.connection_id}")
@asynccontextmanager
async def transaction(self):
"""Context manager for database transactions"""
connection = await self.acquire_connection()
if self.config.database_type == DatabaseType.POSTGRESQL:
transaction = connection.connection.transaction()
await transaction.start()
try:
yield connection
await transaction.commit()
except Exception:
await transaction.rollback()
raise
finally:
await connection.close()
else:
# SQLite and other databases
try:
yield connection
await connection.connection.commit()
except Exception:
await connection.connection.rollback()
raise
finally:
await connection.close()
async def execute_many(self, query: str, parameters: List[tuple]):
"""Execute query with multiple parameter sets"""
async with self.transaction() as conn:
if self.config.database_type == DatabaseType.POSTGRESQL:
return await conn.connection.executemany(query, parameters)
else:
results = []
for params in parameters:
result = await conn.execute(query, *params)
results.append(result)
return results
async def get_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
active_connections = len([c for c in self.connections if c.in_use])
available_connections = self.available_connections.qsize()
total_connections = len(self.connections)
return {
'total_connections': total_connections,
'active_connections': active_connections,
'available_connections': available_connections,
'max_connections': self.config.max_connections,
'min_connections': self.config.min_connections
}
async def close(self):
"""Close all connections in the pool"""
if self._closed:
return
self._closed = True
self.logger.info("Closing connection pool")
# Close all connections
close_tasks = []
for connection in self.connections:
close_tasks.append(connection.force_close())
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
self.connections.clear()
class DatabaseManager:
"""High-level database manager with connection pooling"""
def __init__(self, config: ConnectionConfig):
self.config = config
self.pool: Optional[AsyncConnectionPool] = None
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
self.pool = AsyncConnectionPool(self.config)
await self.pool.__aenter__()
# Start cleanup task
asyncio.create_task(self.pool.cleanup_idle_connections())
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
await self.pool.__aexit__(exc_type, exc_val, exc_tb)
async def execute(self, query: str, *args):
"""Execute a query"""
connection = await self.pool.acquire_connection()
try:
return await connection.execute(query, *args)
finally:
await connection.close()
async def fetch(self, query: str, *args) -> List[Dict]:
"""Fetch multiple rows"""
connection = await self.pool.acquire_connection()
try:
return await connection.fetch(query, *args)
finally:
await connection.close()
async def fetchone(self, query: str, *args) -> Optional[Dict]:
"""Fetch single row"""
connection = await self.pool.acquire_connection()
try:
return await connection.fetchone(query, *args)
finally:
await connection.close()
async def execute_transaction(self, operations: List[tuple]):
"""Execute multiple operations in a transaction"""
async with self.pool.transaction() as conn:
results = []
for query, args in operations:
result = await conn.execute(query, *args)
results.append(result)
return results
async def get_stats(self) -> Dict[str, Any]:
"""Get database pool statistics"""
return await self.pool.get_pool_stats()
# Usage examples
async def database_examples():
"""Demonstrate database connection pooling"""
print("=== Database Connection Pool Examples ===\n")
# PostgreSQL example
pg_config = ConnectionConfig(
database_type=DatabaseType.POSTGRESQL,
host="localhost",
port=5432,
database="testdb",
username="postgres",
password="password",
min_connections=3,
max_connections=10
)
# SQLite example (more practical for demo)
sqlite_config = ConnectionConfig(
database_type=DatabaseType.SQLITE,
database=":memory:", # In-memory database
min_connections=2,
max_connections=5
)
print("Testing SQLite connection pool:")
async with DatabaseManager(sqlite_config) as db:
# Create test table
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE
)
""")
# Insert test data
await db.execute_transaction([
("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com")),
("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com")),
("INSERT INTO users (name, email) VALUES (?, ?)", ("Charlie", "charlie@example.com")),
])
# Fetch data
users = await db.fetch("SELECT * FROM users")
print(f"Users in database: {len(users)}")
for user in users:
print(f" {user}")
# Get pool statistics
stats = await db.get_stats()
print(f"\nPool statistics: {stats}")
# Test concurrent operations
print("\nTesting concurrent operations:")
async def concurrent_query(query_id: int):
user = await db.fetchone("SELECT * FROM users WHERE id = ?", query_id)
print(f"Query {query_id}: {user}")
return user
# Run multiple concurrent queries
tasks = [concurrent_query(i) for i in range(1, 4)]
results = await asyncio.gather(*tasks)
print(f"Concurrent query results: {len(results)}")
# Final pool statistics
final_stats = await db.get_stats()
print(f"Final pool statistics: {final_stats}")
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)
# Run examples
asyncio.run(database_examples())Python12. Troubleshooting and Debugging
Common Asyncio Issues and Solutions
Issue 1: “RuntimeError: asyncio.run() cannot be called from a running event loop”
Problem:
# This will fail in Jupyter notebooks or when already in async context
import asyncio
async def main():
return "Hello"
# ❌ Fails if event loop is already running
asyncio.run(main())PythonSolutions:
# Solution 1: Check if loop is running
import asyncio
async def main():
return "Hello"
try:
# Try to get running loop
loop = asyncio.get_running_loop()
# If we get here, we're in an async context
result = await main()
except RuntimeError:
# No running loop, safe to use asyncio.run()
result = asyncio.run(main())
# Solution 2: Use nest_asyncio for Jupyter
# pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()
asyncio.run(main()) # Now works in Jupyter
# Solution 3: Create task in existing loop
async def run_in_existing_loop():
task = asyncio.create_task(main())
return await taskPythonIssue 2: Forgetting to await coroutines
Problem:
async def fetch_data():
await asyncio.sleep(1)
return "data"
async def main():
# ❌ Returns coroutine object, not the result
result = fetch_data()
print(result) # <coroutine object fetch_data at 0x...>PythonSolution:
async def main():
# ✅ Properly await the coroutine
result = await fetch_data()
print(result) # "data"PythonIssue 3: Blocking the event loop
Problem:
import time
async def blocking_example():
# ❌ This blocks the entire event loop!
time.sleep(2) # Nothing else can run during this time
return "done"PythonSolution:
async def non_blocking_example():
# ✅ This yields control to other tasks
await asyncio.sleep(2) # Other tasks can run during this time
return "done"
# For CPU-intensive work, use run_in_executor
async def cpu_intensive_work():
def heavy_computation():
return sum(i * i for i in range(1000000))
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, heavy_computation)
return resultPythonDebugging Asyncio Applications
Enable Debug Mode
import asyncio
import logging
# Enable asyncio debug mode
asyncio.run(main(), debug=True)
# Or set environment variable
# PYTHONASYNCIODEBUG=1 python your_script.py
# Configure logging for asyncio
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('asyncio')PythonCommon Debugging Techniques
import asyncio
import traceback
import sys
async def debug_example():
"""Example showing various debugging techniques"""
# 1. Add timing information
import time
start_time = time.time()
try:
# 2. Use asyncio.wait_for to catch hanging operations
result = await asyncio.wait_for(some_async_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out!")
# 3. Print current tasks to see what's running
for task in asyncio.all_tasks():
print(f"Task: {task.get_name()}, Done: {task.done()}")
if not task.done():
print(f" Stack: {task.get_stack()}")
except Exception as e:
# 4. Print full traceback for async exceptions
print(f"Error: {e}")
traceback.print_exc()
finally:
elapsed = time.time() - start_time
print(f"Operation took: {elapsed:.2f} seconds")
async def some_async_operation():
# Simulate long-running operation
await asyncio.sleep(10)
return "done"PythonMemory Debugging
import asyncio
import gc
import tracemalloc
async def memory_debug_example():
"""Debug memory usage in async applications"""
# Start memory tracing
tracemalloc.start()
# Take snapshot before operations
snapshot1 = tracemalloc.take_snapshot()
# Run your async operations
tasks = []
for i in range(1000):
task = asyncio.create_task(asyncio.sleep(0.1))
tasks.append(task)
await asyncio.gather(*tasks)
# Take snapshot after operations
snapshot2 = tracemalloc.take_snapshot()
# Compare memory usage
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("Top 10 memory allocations:")
for stat in top_stats[:10]:
print(stat)
# Force garbage collection
collected = gc.collect()
print(f"Garbage collected: {collected} objects")PythonPerformance Profiling
import asyncio
import cProfile
import pstats
from functools import wraps
def async_profile(func):
"""Decorator to profile async functions"""
@wraps(func)
async def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
try:
result = await func(*args, **kwargs)
return result
finally:
pr.disable()
stats = pstats.Stats(pr)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return wrapper
@async_profile
async def example_to_profile():
"""Example function to profile"""
tasks = []
for i in range(100):
task = asyncio.create_task(asyncio.sleep(0.01))
tasks.append(task)
await asyncio.gather(*tasks)
return "completed"
# Usage
# asyncio.run(example_to_profile())PythonBest Practices for Debugging
- Use meaningful task names:
# ✅ Good: Named tasks are easier to debug
task = asyncio.create_task(fetch_data(), name="fetch_user_data")
# ❌ Bad: Anonymous tasks are hard to track
task = asyncio.create_task(fetch_data())Python2. Implement proper logging:
import logging
logger = logging.getLogger(__name__)
async def well_logged_function():
logger.info("Starting operation")
try:
result = await some_operation()
logger.info(f"Operation successful: {result}")
return result
except Exception as e:
logger.error(f"Operation failed: {e}", exc_info=True)
raisePython- Use context managers for resources:
# ✅ Good: Automatic cleanup
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.text()
# ❌ Bad: Manual cleanup (error-prone)
session = aiohttp.ClientSession()
response = await session.get(url)
data = await response.text()
await response.close()
await session.close()Python13. Asyncio vs Other Frameworks
Asyncio vs Threading
# Threading approach
import threading
import time
import requests
def fetch_url_threaded(url):
response = requests.get(url)
return response.status_code
def threaded_example():
urls = ['http://httpbin.org/delay/1'] * 5
threads = []
results = []
def worker(url):
result = fetch_url_threaded(url)
results.append(result)
# Create threads
for url in urls:
thread = threading.Thread(target=worker, args=(url,))
threads.append(thread)
thread.start()
# Wait for completion
for thread in threads:
thread.join()
return results
# Asyncio approach
import asyncio
import aiohttp
async def fetch_url_async(session, url):
async with session.get(url) as response:
return response.status
async def asyncio_example():
urls = ['http://httpbin.org/delay/1'] * 5
async with aiohttp.ClientSession() as session:
tasks = [fetch_url_async(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return resultsPythonComparison:
| Aspect | Threading | Asyncio |
|---|---|---|
| Concurrency Model | Preemptive multitasking | Cooperative multitasking |
| Memory Usage | Higher (each thread ~8MB) | Lower (single thread) |
| Context Switching | OS-managed (expensive) | Event loop (cheap) |
| Shared State | Requires locks/synchronization | No locks needed |
| Debugging | Complex (race conditions) | Easier (single thread) |
| CPU-bound Tasks | Good (true parallelism) | Poor (single thread) |
| I/O-bound Tasks | Good | Excellent |
| Scalability | Limited (~1000 threads) | High (~10,000+ tasks) |
Asyncio vs Multiprocessing
# Multiprocessing approach
import multiprocessing as mp
import time
def cpu_intensive_task(n):
"""CPU-bound task - benefits from multiprocessing"""
result = 0
for i in range(n):
result += i * i
return result
def multiprocessing_example():
with mp.Pool() as pool:
tasks = [1000000] * 4
results = pool.map(cpu_intensive_task, tasks)
return results
# Asyncio with executor
async def asyncio_with_executor():
loop = asyncio.get_running_loop()
with mp.Pool() as pool:
tasks = [1000000] * 4
# Run CPU-bound work in process pool
results = await loop.run_in_executor(pool, lambda: [cpu_intensive_task(n) for n in tasks])
return resultsPythonAsyncio vs Other Async Frameworks
Comparison with Node.js
// Node.js (JavaScript)
const fs = require('fs').promises;
const http = require('http');
async function nodeExample() {
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const contents = await Promise.all(
files.map(file => fs.readFile(file, 'utf8'))
);
return contents;
}JavaScript# Python Asyncio equivalent
import asyncio
import aiofiles
async def python_example():
files = ['file1.txt', 'file2.txt', 'file3.txt']
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
return await f.read()
contents = await asyncio.gather(*[read_file(f) for f in files])
return contentsPythonComparison with Go Goroutines
// Go
package main
import (
"fmt"
"sync"
"time"
)
func goExample() {
var wg sync.WaitGroup
results := make(chan string, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Second)
results <- fmt.Sprintf("Result %d", id)
}(i)
}
wg.Wait()
close(results)
for result := range results {
fmt.Println(result)
}
}Go# Python Asyncio equivalent
async def python_equivalent():
async def worker(worker_id):
await asyncio.sleep(1)
return f"Result {worker_id}"
tasks = [worker(i) for i in range(3)]
results = await asyncio.gather(*tasks)
for result in results:
print(result)PythonWhen to Choose What
Use Asyncio when:
- I/O-bound operations (network, file, database)
- Need to handle many concurrent connections
- Building web servers, APIs, or real-time applications
- Memory efficiency is important
- Single-machine applications
Use Threading when:
- Mixed I/O and CPU-bound tasks
- Need to integrate with blocking libraries
- Simpler concurrency model preferred
- Limited concurrent operations
Use Multiprocessing when:
- CPU-bound tasks dominate
- Need true parallelism
- Tasks are independent
- Have multiple CPU cores available
Use other frameworks when:
- Twisted: Legacy async Python applications
- Tornado: Web applications with WebSocket support
- Gevent: Monkey-patching existing sync code
- Trio: More structured async programming
14. Appendix: Quick Reference
Common Asyncio Patterns Cheat Sheet
# Basic async function
async def my_func():
await asyncio.sleep(1)
return "result"
# Running async code
asyncio.run(my_func())
# Concurrent execution
await asyncio.gather(func1(), func2(), func3())
# Creating tasks
task = asyncio.create_task(my_func())
result = await task
# Timeout
try:
result = await asyncio.wait_for(my_func(), timeout=5.0)
except asyncio.TimeoutError:
print("Timed out")
# Error handling
try:
results = await asyncio.gather(
func1(), func2(), func3(),
return_exceptions=True
)
except Exception as e:
print(f"Error: {e}")PythonEvent Loop Methods
# Get current loop
loop = asyncio.get_running_loop()
# Schedule callback
loop.call_soon(callback)
loop.call_later(delay, callback)
loop.call_at(when, callback)
# Run in executor
result = await loop.run_in_executor(None, blocking_func)
# Create future
future = loop.create_future()
future.set_result("value")PythonSynchronization Primitives
# Lock
lock = asyncio.Lock()
async with lock:
# Critical section
pass
# Semaphore
semaphore = asyncio.Semaphore(5)
async with semaphore:
# Limited concurrency
pass
# Event
event = asyncio.Event()
await event.wait() # Wait for event
event.set() # Set event
# Queue
queue = asyncio.Queue()
await queue.put(item)
item = await queue.get()PythonHTTP Client Example
import aiohttp
async def http_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
data = await response.json()
return dataPythonCommon Gotchas and Solutions
- Forgetting await:
result = await async_func()notresult = async_func() - Blocking operations: Use
asyncio.sleep()nottime.sleep() - Event loop issues: Use
asyncio.run()or check for running loop - Resource cleanup: Always use
async withfor resources - Exception handling: Use
return_exceptions=Trueingather()
Conclusion
This comprehensive asyncio book covers:
- Fundamentals: Basic concepts, coroutines, event loops
- Advanced Features: Tasks, futures, synchronization primitives
- Networking: Streams, WebSockets, HTTP clients
- Error Handling: Exception management and best practices
- Performance: Optimization techniques and profiling
- Real-World Applications: Web scraping, chat servers, rate limiting, database pooling
- Troubleshooting: Common issues and debugging techniques
- Comparisons: How asyncio relates to other concurrency models
Key takeaways:
- Asyncio excels at I/O-bound operations
- Proper error handling is crucial for production applications
- Connection pooling and rate limiting are essential for scalable systems
- Understanding the event loop is key to effective asyncio programming
- Always use proper resource management with async context managers
- Debug mode and logging are essential for development
This guide provides both theoretical understanding and practical implementations that can be adapted for production use cases.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
