A comprehensive guide to mastering asynchronous programming in Python with practical examples, best practices, and real-world applications.

    Table of Contents

    1. Prerequisites and Setup
    2. Introduction to Asynchronous Programming
    3. Getting Started with Asyncio
    4. Understanding Coroutines
    5. Event Loop Deep Dive
    6. Tasks and Futures
    7. Synchronization Primitives
    8. Asyncio Streams
    9. Error Handling and Best Practices
    10. Performance and Optimization
    11. Real-World Applications
    12. Troubleshooting and Debugging
    13. Asyncio vs Other Frameworks
    14. Appendix: Quick Reference

    1. Prerequisites and Setup

    Required Python Knowledge

    Before diving into asyncio, you should be comfortable with:

    Essential Python Concepts:

    • Functions and generators (yield, yield from)
    • Exception handling (try, except, finally)
    • Context managers (with statements)
    • Decorators (basic understanding)
    • Object-oriented programming (classes, methods)

    Intermediate Concepts:

    • Python’s import system and modules
    • Basic understanding of threads vs processes
    • File I/O operations
    • Network programming basics (optional but helpful)

    Python Version Requirements

    import sys
    print(f"Python version: {sys.version}")
    
    # Asyncio is available in Python 3.4+
    # For best experience, use Python 3.7+ for asyncio.run()
    # Python 3.11+ includes TaskGroup for better error handling
    
    assert sys.version_info >= (3, 7), "Python 3.7+ recommended for this guide"
    Python

    Development Environment Setup

    Required Libraries:

    # Core asyncio is built-in, but install these for examples
    pip install aiohttp aiofiles asyncpg websockets
    
    # For advanced examples
    pip install redis psutil beautifulsoup4 jwt
    
    # Development tools
    pip install pytest pytest-asyncio black isort mypy
    Bash

    IDE/Editor Recommendations:

    • VS Code: Excellent asyncio debugging support
    • PyCharm Professional: Advanced async debugging
    • Vim/Neovim: With Python LSP support

    Mental Model: What Makes Asyncio Different?

    graph LR
        subgraph "Traditional Synchronous"
            A[Request 1] --> B[Wait for Response]
            B --> C[Request 2]
            C --> D[Wait for Response]
            D --> E[Request 3]
        end
    
        subgraph "Asynchronous"
            F[Request 1] --> G[Switch to Request 2]
            G --> H[Switch to Request 3]
            H --> I[Response 1 Ready]
            I --> J[Response 2 Ready]
            J --> K[Response 3 Ready]
        end

    Key Mental Shifts:

    1. Cooperative Multitasking: Functions voluntarily yield control
    2. Event-Driven: React to events (I/O completion, timers)
    3. Single-Threaded: No thread-safety concerns, but blocking operations halt everything
    4. Explicit Concurrency: You control when and how concurrency happens

    2. Introduction to Asynchronous Programming

    What is Asynchronous Programming?

    Asynchronous programming is a paradigm that allows code to run concurrently without blocking the main thread. Unlike traditional synchronous programming where operations execute sequentially, async programming enables multiple operations to run “simultaneously.”

    Key Concept: Concurrency vs Parallelism

    • Concurrency: Dealing with multiple tasks at once (not necessarily simultaneously)
    • Parallelism: Actually executing multiple tasks simultaneously (requires multiple cores)
    • Asyncio: Provides concurrency in a single thread through cooperative multitasking
    graph TD
        A[Synchronous Programming] --> B[Task 1: 3s]
        B --> C[Task 2: 2s] 
        C --> D[Task 3: 1s]
        D --> E[Total: 6s]
    
        F[Asynchronous Programming] --> G[Task 1: 3s]
        F --> H[Task 2: 2s]
        F --> I[Task 3: 1s]
        G --> J[Total: ~3s]
        H --> J
        I --> J
    
        style E fill:#ffcccc
        style J fill:#ccffcc

    Understanding Blocking vs Non-Blocking Operations

    Blocking Operations (Bad for asyncio):

    import time
    import requests
    
    # These BLOCK the entire event loop!
    time.sleep(1)              # ❌ Blocks everything
    requests.get('http://...')  # ❌ Blocks everything
    open('file.txt').read()    # ❌ Blocks everything
    Python

    Non-Blocking Operations (Good for asyncio):

    import asyncio
    import aiohttp
    import aiofiles
    
    # These yield control back to the event loop
    await asyncio.sleep(1)              # ✅ Cooperative
    async with aiohttp.ClientSession() as session:
        await session.get('http://...')  # ✅ Cooperative
    async with aiofiles.open('file.txt') as f:
        await f.read()                   # ✅ Cooperative
    Python

    Why Use Asyncio?

    Perfect For (I/O Bound Tasks):

    • 🌐 Network requests: APIs, web scraping, downloads
    • 📁 File operations: Reading/writing large files
    • 🗄️ Database queries: Multiple database calls
    • 🔗 WebSocket connections: Real-time communication
    • 🚀 Microservices: Handling many concurrent requests

    Not Ideal For (CPU Bound Tasks):

    • 🧮 Mathematical computations
    • 🖼️ Image/video processing
    • 🔐 Cryptographic operations
    • 📊 Data analysis (use multiprocessing instead)

    Common Misconceptions

    “Asyncio makes everything faster”

    • Only improves I/O bound operations
    • Can be slower for CPU-bound tasks due to overhead

    “Asyncio is multithreading”

    • Asyncio runs in a single thread
    • Uses cooperative multitasking, not preemptive

    “Just add async/await everywhere”

    • Only use async when you need to wait for I/O
    • Overhead exists for function calls

    Synchronous vs Asynchronous Comparison

    Example: Fetching Multiple URLs

    # Synchronous approach - Operations run one after another
    import time
    import requests
    
    def fetch_data_sync(urls):
        """Fetch URLs one by one (blocking)"""
        results = []
        for url in urls:
            print(f"⏳ Fetching {url}")
            response = requests.get(url)  # This blocks!
            results.append(response.status_code)
            print(f"✅ Got {response.status_code} from {url}")
        return results
    
    # Takes ~3 seconds for 3 URLs (1 second each)
    start = time.time()
    urls = ['http://httpbin.org/delay/1'] * 3
    results = fetch_data_sync(urls)
    print(f"🐌 Sync took: {time.time() - start:.2f} seconds")
    Python
    # Asynchronous approach - Operations run concurrently
    import asyncio
    import aiohttp
    import time
    
    async def fetch_data_async(urls):
        """Fetch URLs concurrently (non-blocking)"""
        async with aiohttp.ClientSession() as session:
            async def fetch_one(url):
                print(f"⏳ Starting {url}")
                async with session.get(url) as response:
                    print(f"✅ Got {response.status} from {url}")
                    return response.status
    
            # Create tasks for concurrent execution
            tasks = [fetch_one(url) for url in urls]
            return await asyncio.gather(*tasks)
    
    # Takes ~1 second for 3 URLs (all run concurrently)
    async def main():
        start = time.time()
        urls = ['http://httpbin.org/delay/1'] * 3
        results = await fetch_data_async(urls)
        print(f"🚀 Async took: {time.time() - start:.2f} seconds")
    
    # Run the async function
    # asyncio.run(main())
    Python

    Performance Comparison:

    • Synchronous: 3 URLs × 1 second = ~3 seconds
    • Asynchronous: 3 URLs concurrently = ~1 second
    • Speedup: ~3x faster for I/O bound operations!
    def fetch_data_sync(urls):
        results = []
        for url in urls:
            response = requests.get(url)
            results.append(response.status_code)
        return results
    
    # Takes ~3 seconds for 3 URLs
    
    start = time.time()
    urls = ['http://httpbin.org/delay/1'] * 3
    results = fetch_data_sync(urls)
    print(f"Sync took: {time.time() - start:.2f} seconds")
    
    # Asynchronous approach
    
    import asyncio
    import aiohttp
    
    async def fetch_data_async(urls):
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url in urls:
                tasks.append(fetch_url(session, url))
            results = await asyncio.gather(*tasks)
            return results
    
    async def fetch_url(session, url):
        async with session.get(url) as response:
            return response.status
    
    # Takes ~1 second for 3 URLs
    
    async def main():
        start = time.time()
        urls = ['http://httpbin.org/delay/1'] * 3
        results = await fetch_data_async(urls)
        print(f"Async took: {time.time() - start:.2f} seconds")
    
    # asyncio.run(main())
    Python

    3. Getting Started with Asyncio

    Installation and Setup

    Asyncio is part of Python’s standard library (Python 3.4+). No additional installation required:

    import asyncio
    import sys
    
    print(f"Python version: {sys.version}")
    print(f"Asyncio version: {asyncio.__version__ if hasattr(asyncio, '__version__') else 'Built-in'}")
    Python

    Your First Async Program

    import asyncio
    
    async def hello_world():
        """
        A simple async function (coroutine) that demonstrates:
        1. async def - declares a coroutine function
        2. await - pauses execution and yields control to event loop
        """
        print("Hello")
        await asyncio.sleep(1)  # Non-blocking sleep - other tasks can run
        print("World")
    
    # Method 1: Python 3.7+ (recommended)
    asyncio.run(hello_world())
    
    # Method 2: Python 3.6 and earlier
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(hello_world())
    Python

    Key Asyncio Components

    graph TB
        A[async def function] --> B[Coroutine Object]
        B --> C[Event Loop]
        C --> D[Task]
        D --> E[Future]
        E --> F[Result]
    
        G[await keyword] --> C
        H[asyncio.run] --> C
        I[asyncio.create_task] --> D

    Basic Async Patterns

    import asyncio
    import time
    
    # Pattern 1: Simple async function
    async def simple_async():
        """Basic async function that waits and returns a value"""
        await asyncio.sleep(1)  # Simulates I/O operation
        return "Done"
    
    # Pattern 2: Running multiple coroutines sequentially
    async def sequential_execution():
        """Runs coroutines one after another (not concurrent)"""
        print("Sequential execution started")
        start_time = time.time()
    
        result1 = await simple_async()  # Wait for first to complete
        result2 = await simple_async()  # Then start second
        result3 = await simple_async()  # Then start third
    
        end_time = time.time()
        print(f"Sequential took: {end_time - start_time:.2f} seconds")
        return [result1, result2, result3]
    
    # Pattern 3: Running multiple coroutines concurrently
    async def concurrent_execution():
        """Runs coroutines concurrently using asyncio.gather"""
        print("Concurrent execution started")
        start_time = time.time()
    
        # Start all coroutines at once
        results = await asyncio.gather(
            simple_async(),  # These three run concurrently
            simple_async(),
            simple_async()
        )
    
        end_time = time.time()
        print(f"Concurrent took: {end_time - start_time:.2f} seconds")
        return results
    
    # Pattern 4: Using tasks for better control
    async def task_execution():
        """Using tasks gives you more control over execution"""
        print("Task execution started")
        start_time = time.time()
    
        # Create tasks (starts execution immediately)
        task1 = asyncio.create_task(simple_async())
        task2 = asyncio.create_task(simple_async())
        task3 = asyncio.create_task(simple_async())
    
        # Tasks are already running, just wait for results
        result1 = await task1
        result2 = await task2
        result3 = await task3
    
        end_time = time.time()
        print(f"Task execution took: {end_time - start_time:.2f} seconds")
        return [result1, result2, result3]
    
    # Pattern 5: Error handling in async code
    async def safe_async():
        """Proper error handling in async functions"""
        try:
            # This might fail
            result = await simple_async()
            return result
        except asyncio.TimeoutError:
            print("Operation timed out")
            return None
        except Exception as e:
            print(f"Unexpected error: {e}")
            return None
        finally:
            print("Cleanup code runs regardless of success/failure")
    
    # Running the examples
    async def main():
        """Main function to demonstrate different patterns"""
        print("=== Asyncio Patterns Demo ===\n")
    
        # Sequential vs Concurrent comparison
        await sequential_execution()  # ~3 seconds
        print()
        await concurrent_execution()  # ~1 second
        print()
        await task_execution()        # ~1 second
        print()
    
        # Error handling
        result = await safe_async()
        print(f"Safe async result: {result}")
    
    # Run the demo
    if __name__ == "__main__":
        asyncio.run(main())
    Python

    When to Use async/await

    # ✅ GOOD: I/O operations that can benefit from concurrency
    async def good_async_usage():
        # Network requests
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com') as response:
                data = await response.json()
    
        # File operations
        async with aiofiles.open('file.txt', 'r') as f:
            content = await f.read()
    
        # Database operations
        async with asyncpg.connect('postgresql://...') as conn:
            result = await conn.fetch('SELECT * FROM users')
    
        return data, content, result
    
    # ❌ BAD: CPU-intensive operations (no I/O to wait for)
    async def bad_async_usage():
        # This doesn't benefit from async - just adds overhead
        total = 0
        for i in range(1000000):
            total += i * i  # Pure computation
        return total
    
    # ✅ BETTER: For CPU-intensive work, use regular functions
    def cpu_intensive_work():
        total = 0
        for i in range(1000000):
            total += i * i
        return total
    
    # Or use asyncio.run_in_executor for CPU work in async context
    async def async_with_cpu_work():
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, cpu_intensive_work)
        return result
    Python

    Hands-On Exercises

    Exercise 1: Basic Async Function

    # TODO: Create an async function that simulates downloading a file
    # It should take a filename and size, print progress, and return success message
    
    async def download_file(filename: str, size_mb: int):
        """
        Exercise: Implement a function that simulates file download
        Requirements:
        1. Print "Starting download of {filename} ({size_mb}MB)"
        2. Use asyncio.sleep() to simulate download time (0.1s per MB)
        3. Print progress every 20% completion
        4. Return "Downloaded {filename} successfully"
        """
        # Your code here
        pass
    
    # Test your function
    # asyncio.run(download_file("video.mp4", 100))
    Python

    Exercise 2: Concurrent Downloads

    # TODO: Download multiple files concurrently and show total time saved
    
    async def download_multiple_files():
        """
        Exercise: Download multiple files concurrently
        Requirements:
        1. Create a list of files to download: [("file1.zip", 50), ("file2.mp4", 30), ("file3.pdf", 20)]
        2. Download them concurrently using asyncio.gather()
        3. Measure and print total time taken
        4. Compare with sequential download time
        """
        # Your code here
        pass
    
    # asyncio.run(download_multiple_files())
    Python

    Exercise 3: Error Handling

    # TODO: Implement robust error handling for network operations
    
    async def fetch_with_retry(url: str, max_retries: int = 3):
        """
        Exercise: Implement retry logic for failed requests
        Requirements:
        1. Try to fetch the URL
        2. If it fails, retry up to max_retries times
        3. Use exponential backoff (1s, 2s, 4s delays)
        4. Return result or raise exception after all retries fail
        """
        # Your code here
        pass
    Python

    4. Understanding Coroutines

    What are Coroutines?

    Coroutines are special functions that can be paused and resumed. They’re defined with async def and called with await.

    stateDiagram-v2
        [*] --> Created: async def func()
        Created --> Running: await func()
        Running --> Suspended: await other_coro()
        Suspended --> Running: other_coro() completes
        Running --> Completed: return value
        Completed --> [*]

    Coroutine Lifecycle

    async def lifecycle_demo():
        print("1. Coroutine started")
    
        # Suspension point
        await asyncio.sleep(1)
        print("2. Resumed after sleep")
    
        # Another suspension
        result = await another_coro()
        print(f"3. Got result: {result}")
    
        return "Final result"
    
    async def another_coro():
        await asyncio.sleep(0.5)
        return "Hello from another coroutine"
    
    # Running the demo
    async def main():
        result = await lifecycle_demo()
        print(f"Final: {result}")
    
    asyncio.run(main())
    Python

    Coroutine Composition

    # Building complex operations from simple coroutines
    async def fetch_user(user_id):
        await asyncio.sleep(0.1)  # Simulate API call
        return {"id": user_id, "name": f"User{user_id}"}
    
    async def fetch_posts(user_id):
        await asyncio.sleep(0.2)  # Simulate API call
        return [f"Post {i} by User{user_id}" for i in range(3)]
    
    async def fetch_user_profile(user_id):
        # Fetch user and posts concurrently
        user, posts = await asyncio.gather(
            fetch_user(user_id),
            fetch_posts(user_id)
        )
    
        return {
            "user": user,
            "posts": posts
        }
    
    # Usage
    async def main():
        profile = await fetch_user_profile(123)
        print(profile)
    Python

    5. Event Loop Deep Dive

    Understanding the Event Loop

    The event loop is the heart of asyncio. It schedules and executes coroutines, handles I/O events, and manages callbacks.

    graph TD
        A[Event Loop] --> B[Ready Queue]
        A --> C[I/O Selector]
        A --> D[Timer Heap]
    
        B --> E[Execute Task]
        E --> F{Task Complete?}
        F -->|No| G[Suspend Task]
        F -->|Yes| H[Remove Task]
    
        G --> I[Schedule Resume]
        I --> B
    
        C --> J[I/O Ready]
        J --> K[Resume Waiting Task]
        K --> B
    
        D --> L[Timer Expired]
        L --> M[Resume Delayed Task]
        M --> B

    Working with Event Loops

    import asyncio
    
    # Getting the current event loop
    async def loop_info():
        loop = asyncio.get_running_loop()
        print(f"Loop: {loop}")
        print(f"Running: {loop.is_running()}")
        print(f"Closed: {loop.is_closed()}")
    
    # Creating and managing loops manually
    def manual_loop_management():
        # Create a new event loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        try:
            # Run coroutines
            loop.run_until_complete(loop_info())
        finally:
            loop.close()
    
    # Scheduling callbacks
    async def callback_example():
        loop = asyncio.get_running_loop()
    
        def callback():
            print("Callback executed!")
    
        # Schedule callback
        loop.call_soon(callback)
        loop.call_later(1.0, callback)
    
        await asyncio.sleep(2)
    Python

    Event Loop Policies

    # Custom event loop policy
    class CustomEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
        def new_event_loop(self):
            loop = super().new_event_loop()
            # Customize loop behavior
            loop.set_debug(True)
            return loop
    
    # Setting custom policy
    asyncio.set_event_loop_policy(CustomEventLoopPolicy())
    Python

    6. Tasks and Futures

    Understanding Tasks

    Tasks are wrappers around coroutines that allow them to be scheduled and executed by the event loop.

    classDiagram
        class Future {
            +result()
            +exception()
            +cancelled()
            +done()
            +add_done_callback()
        }
    
        class Task {
            +cancel()
            +get_coro()
            +get_name()
            +set_name()
        }
    
        Future <|-- Task

    Creating and Managing Tasks

    import asyncio
    
    async def background_task(name, duration):
        print(f"Task {name} started")
        await asyncio.sleep(duration)
        print(f"Task {name} completed")
        return f"Result from {name}"
    
    async def task_management():
        # Creating tasks
        task1 = asyncio.create_task(background_task("A", 2))
        task2 = asyncio.create_task(background_task("B", 1))
        task3 = asyncio.create_task(background_task("C", 3))
    
        # Setting task names
        task1.set_name("Long Task")
        task2.set_name("Quick Task")
        task3.set_name("Slow Task")
    
        # Wait for all tasks
        results = await asyncio.gather(task1, task2, task3)
        print(f"Results: {results}")
    
    # Task cancellation
    async def cancellation_example():
        task = asyncio.create_task(background_task("Cancelled", 5))
    
        # Cancel after 1 second
        await asyncio.sleep(1)
        task.cancel()
    
        try:
            await task
        except asyncio.CancelledError:
            print("Task was cancelled")
    Python

    Working with Futures

    import asyncio
    
    async def future_example():
        loop = asyncio.get_running_loop()
    
        # Creating a future
        future = loop.create_future()
    
        def set_result():
            if not future.done():
                future.set_result("Future result!")
    
        # Schedule result setting
        loop.call_later(2, set_result)
    
        # Wait for future
        result = await future
        print(f"Got: {result}")
    
    # Future with exception
    async def future_exception_example():
        loop = asyncio.get_running_loop()
        future = loop.create_future()
    
        def set_exception():
            future.set_exception(ValueError("Something went wrong!"))
    
        loop.call_later(1, set_exception)
    
        try:
            await future
        except ValueError as e:
            print(f"Caught exception: {e}")
    Python

    Task Groups (Python 3.11+)

    import asyncio
    
    async def task_group_example():
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(background_task("Group A", 1))
            task2 = tg.create_task(background_task("Group B", 2))
            task3 = tg.create_task(background_task("Group C", 1.5))
    
        print("All tasks in group completed")
        print(f"Results: {task1.result()}, {task2.result()}, {task3.result()}")
    Python

    7. Synchronization Primitives

    Why Synchronization in Async Code?

    Even though asyncio is single-threaded, we still need synchronization when sharing resources between coroutines.

    sequenceDiagram
        participant C1 as Coroutine 1
        participant R as Shared Resource
        participant C2 as Coroutine 2
    
        C1->>R: Acquire Lock
        C2->>R: Try Acquire Lock
        R-->>C2: Blocked
        C1->>R: Modify Resource
        C1->>R: Release Lock
        R-->>C2: Lock Acquired
        C2->>R: Modify Resource
        C2->>R: Release Lock

    Locks

    import asyncio
    
    class AsyncCounter:
        def __init__(self):
            self._value = 0
            self._lock = asyncio.Lock()
    
        async def increment(self):
            async with self._lock:
                # Critical section
                current = self._value
                await asyncio.sleep(0.1)  # Simulate work
                self._value = current + 1
    
        @property
        def value(self):
            return self._value
    
    async def worker(counter, name):
        for i in range(5):
            await counter.increment()
            print(f"Worker {name}: {counter.value}")
    
    async def lock_example():
        counter = AsyncCounter()
    
        # Run workers concurrently
        await asyncio.gather(
            worker(counter, "A"),
            worker(counter, "B"),
            worker(counter, "C")
        )
    
        print(f"Final value: {counter.value}")
    Python

    Semaphores

    import asyncio
    
    async def download_file(semaphore, url, session):
        async with semaphore:
            print(f"Starting download: {url}")
            await asyncio.sleep(2)  # Simulate download
            print(f"Completed download: {url}")
            return f"Content from {url}"
    
    async def semaphore_example():
        # Limit to 3 concurrent downloads
        semaphore = asyncio.Semaphore(3)
    
        urls = [f"http://example.com/file{i}.zip" for i in range(10)]
    
        tasks = []
        for url in urls:
            task = asyncio.create_task(
                download_file(semaphore, url, None)
            )
            tasks.append(task)
    
        results = await asyncio.gather(*tasks)
        print(f"Downloaded {len(results)} files")
    Python

    Events

    import asyncio
    
    async def waiter(event, name):
        print(f"Waiter {name} waiting for event")
        await event.wait()
        print(f"Waiter {name} got event!")
    
    async def setter(event):
        await asyncio.sleep(2)
        print("Setting event")
        event.set()
    
    async def event_example():
        event = asyncio.Event()
    
        # Start waiters
        waiters = [
            asyncio.create_task(waiter(event, f"W{i}"))
            for i in range(3)
        ]
    
        # Start setter
        setter_task = asyncio.create_task(setter(event))
    
        # Wait for all
        await asyncio.gather(*waiters, setter_task)
    Python

    Conditions

    import asyncio
    
    async def condition_example():
        condition = asyncio.Condition()
        items = []
    
        async def consumer(name):
            async with condition:
                while len(items) == 0:
                    print(f"Consumer {name} waiting for items")
                    await condition.wait()
    
                item = items.pop(0)
                print(f"Consumer {name} got item: {item}")
    
        async def producer():
            for i in range(5):
                async with condition:
                    item = f"item_{i}"
                    items.append(item)
                    print(f"Produced: {item}")
                    condition.notify_all()
                await asyncio.sleep(1)
    
        # Run producer and consumers
        await asyncio.gather(
            producer(),
            consumer("A"),
            consumer("B")
        )
    Python

    8. Asyncio Streams {#streams}

    Understanding Streams

    Asyncio streams provide a high-level interface for network programming.

    graph LR
        A[Client] --> B[StreamWriter]
        B --> C[Network]
        C --> D[StreamReader]
        D --> E[Server]
    
        E --> F[StreamWriter]
        F --> C
        C --> G[StreamReader]
        G --> A

    TCP Server and Client

    import asyncio
    
    # TCP Server
    async def handle_client(reader, writer):
        addr = writer.get_extra_info('peername')
        print(f"Client connected: {addr}")
    
        try:
            while True:
                data = await reader.read(1024)
                if not data:
                    break
    
                message = data.decode()
                print(f"Received: {message}")
    
                # Echo back
                response = f"Echo: {message}"
                writer.write(response.encode())
                await writer.drain()
    
        except asyncio.CancelledError:
            pass
        finally:
            print(f"Client disconnected: {addr}")
            writer.close()
            await writer.wait_closed()
    
    async def start_server():
        server = await asyncio.start_server(
            handle_client, 'localhost', 8888
        )
    
        addr = server.sockets[0].getsockname()
        print(f"Server started on {addr}")
    
        async with server:
            await server.serve_forever()
    
    # TCP Client
    async def tcp_client():
        reader, writer = await asyncio.open_connection(
            'localhost', 8888
        )
    
        try:
            # Send messages
            messages = ["Hello", "World", "From", "Client"]
    
            for msg in messages:
                writer.write(msg.encode())
                await writer.drain()
    
                # Read response
                data = await reader.read(1024)
                print(f"Server response: {data.decode()}")
    
                await asyncio.sleep(1)
    
        finally:
            writer.close()
            await writer.wait_closed()
    Python

    HTTP Client with aiohttp

    import aiohttp
    import asyncio
    
    async def fetch_url(session, url):
        try:
            async with session.get(url) as response:
                return {
                    'url': url,
                    'status': response.status,
                    'content': await response.text()
                }
        except Exception as e:
            return {
                'url': url,
                'error': str(e)
            }
    
    async def http_client_example():
        urls = [
            'http://httpbin.org/json',
            'http://httpbin.org/user-agent',
            'http://httpbin.org/headers'
        ]
    
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
    
            for result in results:
                if 'error' in result:
                    print(f"Error fetching {result['url']}: {result['error']}")
                else:
                    print(f"Success: {result['url']} - Status: {result['status']}")
    Python

    This completes the major sections of the comprehensive asyncio book. Would you like me to continue with the remaining sections (Error Handling, Performance, and Real-World Applications)?

    I’ll continue with the remaining sections of the comprehensive Python asyncio book, starting with Error Handling and Best Practices:

    9. Error Handling and Best Practices

    Exception Propagation in Async Code

    Understanding how exceptions flow through async code is crucial for robust applications.

    graph TD
        A[Coroutine A] --> B[await Coroutine B]
        B --> C[Exception Raised]
        C --> D[Exception Propagates]
        D --> E[Caught in A or Propagates Further]
    
        F[Task Creation] --> G[Background Task]
        G --> H[Exception in Task]
        H --> I[Exception Stored in Task]
        I --> J[Retrieved when awaited]

    Basic Exception Handling

    import asyncio
    import aiohttp
    import logging
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    async def risky_operation():
        """Simulates an operation that might fail"""
        await asyncio.sleep(1)
        if random.random() < 0.5:
            raise ValueError("Random failure occurred!")
        return "Success!"
    
    async def safe_wrapper():
        """Proper exception handling"""
        try:
            result = await risky_operation()
            logger.info(f"Operation succeeded: {result}")
            return result
        except ValueError as e:
            logger.error(f"Operation failed: {e}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise  # Re-raise unexpected exceptions
    
    # Multiple exception types
    async def http_request_with_retry(url, max_retries=3):
        """HTTP request with retry logic and specific exception handling"""
    
        for attempt in range(max_retries):
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, timeout=5) as response:
                        if response.status == 200:
                            return await response.text()
                        else:
                            raise aiohttp.ClientResponseError(
                                request_info=response.request_info,
                                history=response.history,
                                status=response.status
                            )
    
            except aiohttp.ClientTimeout:
                logger.warning(f"Timeout on attempt {attempt + 1}")
            except aiohttp.ClientConnectionError:
                logger.warning(f"Connection error on attempt {attempt + 1}")
            except aiohttp.ClientResponseError as e:
                logger.warning(f"HTTP {e.status} on attempt {attempt + 1}")
    
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
        raise Exception(f"Failed after {max_retries} attempts")
    Python

    Exception Handling in Concurrent Operations

    import asyncio
    
    async def task_with_exception(task_id, should_fail=False):
        """Task that may raise an exception"""
        await asyncio.sleep(1)
        if should_fail:
            raise ValueError(f"Task {task_id} failed!")
        return f"Task {task_id} completed"
    
    # Using asyncio.gather with exception handling
    async def gather_with_exceptions():
        """Handle exceptions in asyncio.gather"""
    
        tasks = [
            task_with_exception(1, False),
            task_with_exception(2, True),   # This will fail
            task_with_exception(3, False),
        ]
    
        try:
            results = await asyncio.gather(*tasks)
            print(f"All results: {results}")
        except ValueError as e:
            print(f"One task failed: {e}")
    
            # Alternative: Use return_exceptions=True
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    print(f"Task {i+1} failed: {result}")
                else:
                    print(f"Task {i+1} succeeded: {result}")
    
    # Using asyncio.as_completed for individual exception handling
    async def handle_tasks_individually():
        """Handle each task's exceptions separately"""
    
        tasks = [
            asyncio.create_task(task_with_exception(i, i == 2))
            for i in range(1, 4)
        ]
    
        for task in asyncio.as_completed(tasks):
            try:
                result = await task
                print(f"Completed: {result}")
            except ValueError as e:
                print(f"Task failed: {e}")
    
    # Exception handling with TaskGroup (Python 3.11+)
    async def task_group_exception_handling():
        """Using TaskGroup for automatic exception propagation"""
    
        try:
            async with asyncio.TaskGroup() as tg:
                task1 = tg.create_task(task_with_exception(1, False))
                task2 = tg.create_task(task_with_exception(2, True))  # Will fail
                task3 = tg.create_task(task_with_exception(3, False))
    
        except* ValueError as eg:
            print(f"Some tasks failed: {eg.exceptions}")
            # Handle multiple exceptions
            for exc in eg.exceptions:
                print(f"Exception: {exc}")
    Python

    Custom Exception Classes

    class AsyncIOError(Exception):
        """Base exception for asyncio-related errors"""
        pass
    
    class ConnectionPoolExhausted(AsyncIOError):
        """Raised when connection pool is exhausted"""
        pass
    
    class RateLimitExceeded(AsyncIOError):
        """Raised when rate limit is exceeded"""
        pass
    
    class AsyncResourceManager:
        """Example of custom exception usage"""
    
        def __init__(self, max_connections=10):
            self.max_connections = max_connections
            self.active_connections = 0
            self._semaphore = asyncio.Semaphore(max_connections)
    
        async def acquire_connection(self):
            try:
                await asyncio.wait_for(self._semaphore.acquire(), timeout=5.0)
                self.active_connections += 1
                return f"Connection-{self.active_connections}"
            except asyncio.TimeoutError:
                raise ConnectionPoolExhausted(
                    f"Could not acquire connection within timeout. "
                    f"Active: {self.active_connections}/{self.max_connections}"
                )
    
        async def release_connection(self, connection_id):
            self.active_connections -= 1
            self._semaphore.release()
            print(f"Released {connection_id}")
    
    async def use_resource_manager():
        """Example usage with custom exceptions"""
        manager = AsyncResourceManager(max_connections=2)
    
        try:
            # Acquire multiple connections
            conn1 = await manager.acquire_connection()
            conn2 = await manager.acquire_connection()
    
            # This should timeout and raise ConnectionPoolExhausted
            conn3 = await manager.acquire_connection()
    
        except ConnectionPoolExhausted as e:
            print(f"Resource exhausted: {e}")
        finally:
            # Cleanup
            if 'conn1' in locals():
                await manager.release_connection(conn1)
            if 'conn2' in locals():
                await manager.release_connection(conn2)
    Python

    Best Practices for Asyncio

    import asyncio
    import contextlib
    import functools
    import signal
    import sys
    from typing import Optional, List, Any
    
    # 1. Proper resource management
    class AsyncContextManager:
        """Example of proper async context manager"""
    
        async def __aenter__(self):
            print("Acquiring async resource")
            await asyncio.sleep(0.1)  # Simulate setup
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            print("Releasing async resource")
            await asyncio.sleep(0.1)  # Simulate cleanup
            if exc_type:
                print(f"Exception occurred: {exc_val}")
            return False  # Don't suppress exceptions
    
    # 2. Graceful shutdown handling
    class GracefulShutdown:
        """Handle graceful shutdown of async applications"""
    
        def __init__(self):
            self.shutdown_event = asyncio.Event()
            self.tasks: List[asyncio.Task] = []
    
        def signal_handler(self, signum, frame):
            """Handle shutdown signals"""
            print(f"\nReceived signal {signum}, initiating graceful shutdown...")
            self.shutdown_event.set()
    
        async def worker(self, worker_id: int):
            """Example worker that responds to shutdown"""
            try:
                while not self.shutdown_event.is_set():
                    print(f"Worker {worker_id} doing work...")
                    await asyncio.sleep(2)
            except asyncio.CancelledError:
                print(f"Worker {worker_id} was cancelled")
                raise
            finally:
                print(f"Worker {worker_id} cleaning up...")
    
        async def start_workers(self, num_workers: int = 3):
            """Start workers and handle shutdown"""
            # Register signal handlers
            signal.signal(signal.SIGINT, self.signal_handler)
            signal.signal(signal.SIGTERM, self.signal_handler)
    
            # Start workers
            self.tasks = [
                asyncio.create_task(self.worker(i))
                for i in range(num_workers)
            ]
    
            try:
                # Wait for shutdown signal
                await self.shutdown_event.wait()
    
                # Cancel all tasks
                print("Cancelling all tasks...")
                for task in self.tasks:
                    task.cancel()
    
                # Wait for tasks to complete
                await asyncio.gather(*self.tasks, return_exceptions=True)
    
            except KeyboardInterrupt:
                print("Forced shutdown")
    
    # 3. Async function decorators
    def async_retry(max_retries: int = 3, delay: float = 1.0):
        """Decorator for automatic retry with exponential backoff"""
    
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                last_exception = None
    
                for attempt in range(max_retries):
                    try:
                        return await func(*args, **kwargs)
                    except Exception as e:
                        last_exception = e
                        if attempt < max_retries - 1:
                            wait_time = delay * (2 ** attempt)
                            print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
                            await asyncio.sleep(wait_time)
                        else:
                            print(f"All {max_retries} attempts failed")
    
                raise last_exception
    
            return wrapper
        return decorator
    
    def async_timeout(seconds: float):
        """Decorator to add timeout to async functions"""
    
        def decorator(func):
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                try:
                    return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
                except asyncio.TimeoutError:
                    raise TimeoutError(f"Function {func.__name__} timed out after {seconds}s")
    
            return wrapper
        return decorator
    
    # Example usage of decorators
    @async_retry(max_retries=3, delay=0.5)
    @async_timeout(10.0)
    async def unreliable_api_call(url: str):
        """Simulated API call that might fail"""
        await asyncio.sleep(1)
        if random.random() < 0.7:  # 70% chance of failure
            raise aiohttp.ClientError("API temporarily unavailable")
        return {"data": "success"}
    
    # 4. Performance monitoring
    class AsyncPerformanceMonitor:
        """Monitor async function performance"""
    
        def __init__(self):
            self.stats = {}
    
        def monitor(self, func_name: Optional[str] = None):
            """Decorator to monitor async function performance"""
    
            def decorator(func):
                name = func_name or func.__name__
    
                @functools.wraps(func)
                async def wrapper(*args, **kwargs):
                    start_time = asyncio.get_event_loop().time()
                    try:
                        result = await func(*args, **kwargs)
                        success = True
                        return result
                    except Exception as e:
                        success = False
                        raise
                    finally:
                        duration = asyncio.get_event_loop().time() - start_time
                        self.record_stats(name, duration, success)
    
                return wrapper
            return decorator
    
        def record_stats(self, func_name: str, duration: float, success: bool):
            """Record performance statistics"""
            if func_name not in self.stats:
                self.stats[func_name] = {
                    'total_calls': 0,
                    'total_time': 0.0,
                    'successful_calls': 0,
                    'failed_calls': 0,
                    'avg_time': 0.0
                }
    
            stats = self.stats[func_name]
            stats['total_calls'] += 1
            stats['total_time'] += duration
    
            if success:
                stats['successful_calls'] += 1
            else:
                stats['failed_calls'] += 1
    
            stats['avg_time'] = stats['total_time'] / stats['total_calls']
    
        def get_report(self) -> dict:
            """Get performance report"""
            return self.stats.copy()
    
    # 5. Connection pooling best practices
    class AsyncHTTPClient:
        """HTTP client with proper connection pooling"""
    
        def __init__(self, max_connections: int = 100, timeout: float = 30.0):
            self.connector = aiohttp.TCPConnector(
                limit=max_connections,
                limit_per_host=20,
                keepalive_timeout=30,
                enable_cleanup_closed=True
            )
    
            self.timeout = aiohttp.ClientTimeout(total=timeout)
            self._session: Optional[aiohttp.ClientSession] = None
    
        async def __aenter__(self):
            self._session = aiohttp.ClientSession(
                connector=self.connector,
                timeout=self.timeout
            )
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self._session:
                await self._session.close()
    
        async def get(self, url: str, **kwargs) -> dict:
            """GET request with error handling"""
            if not self._session:
                raise RuntimeError("Client not initialized. Use 'async with' statement.")
    
            try:
                async with self._session.get(url, **kwargs) as response:
                    response.raise_for_status()
                    return {
                        'status': response.status,
                        'data': await response.json(),
                        'headers': dict(response.headers)
                    }
            except aiohttp.ClientError as e:
                raise AsyncIOError(f"HTTP request failed: {e}")
    
    # Example usage
    async def best_practices_example():
        """Demonstrate best practices"""
    
        # Performance monitoring
        monitor = AsyncPerformanceMonitor()
    
        @monitor.monitor("api_call")
        async def monitored_api_call():
            async with AsyncHTTPClient() as client:
                return await client.get("https://httpbin.org/json")
    
        # Resource management
        async with AsyncContextManager() as resource:
            try:
                result = await monitored_api_call()
                print(f"API call result: {result['status']}")
            except AsyncIOError as e:
                print(f"API error: {e}")
    
        # Print performance stats
        print("Performance stats:", monitor.get_report())
    
    # 6. Common pitfalls and how to avoid them
    async def common_pitfalls_examples():
        """Examples of common pitfalls and their solutions"""
    
        # PITFALL 1: Blocking I/O in async functions
        # BAD:
        # time.sleep(1)  # This blocks the entire event loop!
    
        # GOOD:
        await asyncio.sleep(1)  # This yields control to other coroutines
    
        # PITFALL 2: Not awaiting coroutines
        # BAD:
        # result = async_function()  # This returns a coroutine object, not the result!
    
        # GOOD:
        result = await async_function()
    
        # PITFALL 3: Creating tasks without managing them
        # BAD:
        # asyncio.create_task(some_background_work())  # Fire and forget - might leak
    
        # GOOD:
        task = asyncio.create_task(some_background_work())
        try:
            await task
        except Exception as e:
            print(f"Background task failed: {e}")
    
        # PITFALL 4: Not handling CancelledError
        # BAD:
        async def bad_cleanup():
            try:
                await long_running_operation()
            except:
                pass  # This swallows CancelledError!
    
        # GOOD:
        async def good_cleanup():
            try:
                await long_running_operation()
            except asyncio.CancelledError:
                print("Operation was cancelled")
                # Perform cleanup
                raise  # Re-raise to maintain cancellation
            except Exception as e:
                print(f"Operation failed: {e}")
    
    async def async_function():
        return "result"
    
    async def some_background_work():
        await asyncio.sleep(2)
        return "done"
    
    async def long_running_operation():
        await asyncio.sleep(10)
    Python

    10. Performance and Optimization

    Understanding Asyncio Performance

    Asyncio performance depends on understanding the event loop behavior and I/O patterns.

    graph TD
        A[Performance Factors] --> B[Event Loop Efficiency]
        A --> C[I/O Wait Time]
        A --> D[Task Switching Overhead]
        A --> E[Memory Usage]
    
        B --> F[Minimize Blocking Operations]
        C --> G[Connection Pooling]
        D --> H[Batch Operations]
        E --> I[Resource Management]

    Profiling Asyncio Applications

    import asyncio
    import cProfile
    import pstats
    import time
    import functools
    from typing import Dict, List
    import aiohttp
    
    class AsyncProfiler:
        """Profiler for asyncio applications"""
    
        def __init__(self):
            self.call_times: Dict[str, List[float]] = {}
            self.start_times: Dict[str, float] = {}
    
        def profile_coroutine(self, name: str = None):
            """Decorator to profile coroutine execution time"""
    
            def decorator(func):
                coro_name = name or f"{func.__module__}.{func.__name__}"
    
                @functools.wraps(func)
                async def wrapper(*args, **kwargs):
                    start_time = time.perf_counter()
                    try:
                        result = await func(*args, **kwargs)
                        return result
                    finally:
                        duration = time.perf_counter() - start_time
                        if coro_name not in self.call_times:
                            self.call_times[coro_name] = []
                        self.call_times[coro_name].append(duration)
    
                return wrapper
            return decorator
    
        def get_stats(self) -> Dict[str, Dict[str, float]]:
            """Get profiling statistics"""
            stats = {}
            for name, times in self.call_times.items():
                stats[name] = {
                    'calls': len(times),
                    'total_time': sum(times),
                    'avg_time': sum(times) / len(times),
                    'min_time': min(times),
                    'max_time': max(times)
                }
            return stats
    
        def print_stats(self):
            """Print formatted statistics"""
            stats = self.get_stats()
            print("\n" + "="*60)
            print("ASYNCIO PERFORMANCE PROFILE")
            print("="*60)
            print(f"{'Function':<30} {'Calls':<8} {'Total':<10} {'Avg':<10} {'Min':<10} {'Max':<10}")
            print("-"*60)
    
            for name, data in sorted(stats.items(), key=lambda x: x[1]['total_time'], reverse=True):
                print(f"{name:<30} {data['calls']:<8} {data['total_time']:<10.4f} "
                      f"{data['avg_time']:<10.4f} {data['min_time']:<10.4f} {data['max_time']:<10.4f}")
    
    # Usage example
    profiler = AsyncProfiler()
    
    @profiler.profile_coroutine("fetch_url")
    async def fetch_url(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    @profiler.profile_coroutine("process_data")
    async def process_data(data):
        # Simulate CPU-bound work
        await asyncio.sleep(0.1)
        return len(data)
    
    async def profiling_example():
        urls = ['http://httpbin.org/delay/1'] * 5
    
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url in urls:
                content_task = fetch_url(session, url)
                tasks.append(content_task)
    
            contents = await asyncio.gather(*tasks)
    
            # Process results
            process_tasks = [process_data(content) for content in contents]
            results = await asyncio.gather(*process_tasks)
    
        profiler.print_stats()
        return results
    Python

    Memory Optimization

    import asyncio
    import weakref
    import gc
    from typing import Optional, Set
    import psutil
    import os
    
    class MemoryMonitor:
        """Monitor memory usage in asyncio applications"""
    
        def __init__(self):
            self.process = psutil.Process(os.getpid())
            self.baseline_memory = self.get_memory_usage()
    
        def get_memory_usage(self) -> float:
            """Get current memory usage in MB"""
            return self.process.memory_info().rss / 1024 / 1024
    
        def print_memory_info(self, label: str = ""):
            """Print current memory information"""
            current = self.get_memory_usage()
            diff = current - self.baseline_memory
            print(f"Memory {label}: {current:.2f} MB (Δ {diff:+.2f} MB)")
    
        async def monitor_coroutine(self, coro, label: str = ""):
            """Monitor memory usage of a coroutine"""
            before = self.get_memory_usage()
            result = await coro
            after = self.get_memory_usage()
            print(f"Memory {label}: {before:.2f}{after:.2f} MB (Δ {after-before:+.2f} MB)")
            return result
    
    class AsyncObjectPool:
        """Object pool to reduce memory allocation"""
    
        def __init__(self, factory, max_size: int = 100):
            self.factory = factory
            self.max_size = max_size
            self.pool: List[Any] = []
            self.in_use: Set[Any] = set()
    
        async def acquire(self):
            """Acquire object from pool"""
            if self.pool:
                obj = self.pool.pop()
            else:
                obj = await self.factory()
    
            self.in_use.add(obj)
            return obj
    
        async def release(self, obj):
            """Release object back to pool"""
            if obj in self.in_use:
                self.in_use.remove(obj)
                if len(self.pool) < self.max_size:
                    # Reset object state if needed
                    if hasattr(obj, 'reset'):
                        await obj.reset()
                    self.pool.append(obj)
    
    class AsyncResource:
        """Example resource for pooling"""
    
        def __init__(self, resource_id: str):
            self.resource_id = resource_id
            self.data = bytearray(1024)  # Some memory allocation
    
        async def reset(self):
            """Reset resource state"""
            self.data = bytearray(1024)
    
        async def use(self):
            """Simulate resource usage"""
            await asyncio.sleep(0.1)
            return f"Used resource {self.resource_id}"
    
    async def create_resource():
        """Factory function for resources"""
        import uuid
        return AsyncResource(str(uuid.uuid4())[:8])
    
    # Memory optimization techniques
    async def memory_optimization_examples():
        """Demonstrate memory optimization techniques"""
    
        monitor = MemoryMonitor()
        monitor.print_memory_info("baseline")
    
        # 1. Object pooling
        pool = AsyncObjectPool(create_resource, max_size=10)
    
        async def use_pooled_resource():
            resource = await pool.acquire()
            try:
                result = await resource.use()
                return result
            finally:
                await pool.release(resource)
    
        # Test object pooling
        tasks = [use_pooled_resource() for _ in range(50)]
        results = await asyncio.gather(*tasks)
        monitor.print_memory_info("after pooling")
    
        # 2. Lazy loading and cleanup
        class LazyDataLoader:
            def __init__(self):
                self._data: Optional[bytes] = None
                self._refs = weakref.WeakSet()
    
            async def get_data(self):
                if self._data is None:
                    print("Loading large dataset...")
                    self._data = b"x" * (1024 * 1024)  # 1MB of data
                return self._data
    
            def register_user(self, user):
                self._refs.add(user)
    
            def cleanup_if_unused(self):
                if not self._refs:
                    print("Cleaning up unused data...")
                    self._data = None
                    gc.collect()
    
        loader = LazyDataLoader()
    
        # Simulate users
        class DataUser:
            def __init__(self, loader):
                self.loader = loader
                loader.register_user(self)
    
            async def process(self):
                data = await self.loader.get_data()
                await asyncio.sleep(0.1)
                return len(data)
    
        # Create users and process data
        users = [DataUser(loader) for _ in range(5)]
        tasks = [user.process() for user in users]
        results = await asyncio.gather(*tasks)
        monitor.print_memory_info("after lazy loading")
    
        # Cleanup
        del users
        loader.cleanup_if_unused()
        monitor.print_memory_info("after cleanup")
    
        # 3. Streaming data processing
        async def memory_efficient_data_processing():
            """Process large datasets without loading everything into memory"""
    
            async def data_generator():
                """Generate data chunks"""
                for i in range(100):
                    # Simulate reading chunks from file/network
                    chunk = f"data_chunk_{i}" * 100
                    yield chunk
                    await asyncio.sleep(0.01)
    
            async def process_chunk(chunk):
                """Process individual chunk"""
                await asyncio.sleep(0.01)
                return len(chunk)
    
            total_processed = 0
            async for chunk in data_generator():
                size = await process_chunk(chunk)
                total_processed += size
    
                # Optional: yield control periodically
                if total_processed % 10000 == 0:
                    await asyncio.sleep(0)
    
            return total_processed
    
        result = await monitor.monitor_coroutine(
            memory_efficient_data_processing(),
            "streaming processing"
        )
        print(f"Processed {result} bytes")
    Python

    Concurrency Optimization

    import asyncio
    import aiohttp
    from asyncio import Semaphore, Queue
    from typing import List, Callable, Any
    import time
    
    class OptimizedConcurrencyManager:
        """Manage concurrent operations with optimization"""
    
        def __init__(self, max_concurrent: int = 10, rate_limit: float = 0.1):
            self.semaphore = Semaphore(max_concurrent)
            self.rate_limit = rate_limit
            self.last_request_time = 0
    
        async def execute_with_limits(self, coro):
            """Execute coroutine with concurrency and rate limiting"""
            async with self.semaphore:
                # Rate limiting
                now = time.time()
                time_since_last = now - self.last_request_time
                if time_since_last < self.rate_limit:
                    await asyncio.sleep(self.rate_limit - time_since_last)
    
                self.last_request_time = time.time()
                return await coro
    
    class BatchProcessor:
        """Process items in batches for better performance"""
    
        def __init__(self, batch_size: int = 10, max_concurrent_batches: int = 3):
            self.batch_size = batch_size
            self.semaphore = Semaphore(max_concurrent_batches)
    
        async def process_batch(self, items: List[Any], processor: Callable):
            """Process a batch of items"""
            async with self.semaphore:
                tasks = [processor(item) for item in items]
                return await asyncio.gather(*tasks, return_exceptions=True)
    
        async def process_all(self, items: List[Any], processor: Callable):
            """Process all items in batches"""
            batches = [
                items[i:i + self.batch_size]
                for i in range(0, len(items), self.batch_size)
            ]
    
            batch_tasks = [
                self.process_batch(batch, processor)
                for batch in batches
            ]
    
            batch_results = await asyncio.gather(*batch_tasks)
    
            # Flatten results
            results = []
            for batch_result in batch_results:
                results.extend(batch_result)
    
            return results
    
    class AsyncQueue:
        """Optimized async queue with producer-consumer pattern"""
    
        def __init__(self, maxsize: int = 0):
            self.queue = Queue(maxsize=maxsize)
            self.producers_finished = asyncio.Event()
            self.active_producers = 0
    
        async def producer(self, items: List[Any]):
            """Producer coroutine"""
            self.active_producers += 1
            try:
                for item in items:
                    await self.queue.put(item)
                    await asyncio.sleep(0)  # Yield control
            finally:
                self.active_producers -= 1
                if self.active_producers == 0:
                    self.producers_finished.set()
    
        async def consumer(self, processor: Callable):
            """Consumer coroutine"""
            results = []
            while True:
                try:
                    # Wait for item with timeout
                    item = await asyncio.wait_for(self.queue.get(), timeout=1.0)
                    result = await processor(item)
                    results.append(result)
                    self.queue.task_done()
                except asyncio.TimeoutError:
                    # Check if producers are done
                    if self.producers_finished.is_set() and self.queue.empty():
                        break
    
            return results
    
        async def process_with_queue(self, all_items: List[Any], processor: Callable, num_consumers: int = 3):
            """Process items using producer-consumer pattern"""
    
            # Split items among producers
            chunk_size = len(all_items) // 2 + 1
            chunks = [all_items[i:i + chunk_size] for i in range(0, len(all_items), chunk_size)]
    
            # Start producers
            producer_tasks = [
                asyncio.create_task(self.producer(chunk))
                for chunk in chunks
            ]
    
            # Start consumers
            consumer_tasks = [
                asyncio.create_task(self.consumer(processor))
                for _ in range(num_consumers)
            ]
    
            # Wait for all producers to finish
            await asyncio.gather(*producer_tasks)
    
            # Wait for all consumers to finish
            consumer_results = await asyncio.gather(*consumer_tasks)
    
            # Flatten results
            all_results = []
            for results in consumer_results:
                all_results.extend(results)
    
            return all_results
    
    # Performance comparison examples
    async def performance_comparison():
        """Compare different concurrency approaches"""
    
        async def simulate_io_task(item_id: int):
            """Simulate I/O bound task"""
            await asyncio.sleep(0.1)  # Simulate network delay
            return f"processed_{item_id}"
    
        items = list(range(100))
    
        # 1. Sequential processing (baseline)
        print("Testing sequential processing...")
        start_time = time.time()
        sequential_results = []
        for item in items:
            result = await simulate_io_task(item)
            sequential_results.append(result)
        sequential_time = time.time() - start_time
        print(f"Sequential: {sequential_time:.2f}s")
    
        # 2. Unlimited concurrency
        print("Testing unlimited concurrency...")
        start_time = time.time()
        unlimited_tasks = [simulate_io_task(item) for item in items]
        unlimited_results = await asyncio.gather(*unlimited_tasks)
        unlimited_time = time.time() - start_time
        print(f"Unlimited concurrency: {unlimited_time:.2f}s")
    
        # 3. Limited concurrency
        print("Testing limited concurrency...")
        start_time = time.time()
        manager = OptimizedConcurrencyManager(max_concurrent=10)
        limited_tasks = [manager.execute_with_limits(simulate_io_task(item)) for item in items]
        limited_results = await asyncio.gather(*limited_tasks)
        limited_time = time.time() - start_time
        print(f"Limited concurrency: {limited_time:.2f}s")
    
        # 4. Batch processing
        print("Testing batch processing...")
        start_time = time.time()
        processor = BatchProcessor(batch_size=10, max_concurrent_batches=5)
        batch_results = await processor.process_all(items, simulate_io_task)
        batch_time = time.time() - start_time
        print(f"Batch processing: {batch_time:.2f}s")
    
        # 5. Queue-based processing
        print("Testing queue-based processing...")
        start_time = time.time()
        queue_processor = AsyncQueue()
        queue_results = await queue_processor.process_with_queue(items, simulate_io_task, num_consumers=5)
        queue_time = time.time() - start_time
        print(f"Queue-based: {queue_time:.2f}s")
    
        # Performance summary
        print("\n" + "="*50)
        print("PERFORMANCE COMPARISON")
        print("="*50)
        approaches = [
            ("Sequential", sequential_time),
            ("Unlimited Concurrency", unlimited_time),
            ("Limited Concurrency", limited_time),
            ("Batch Processing", batch_time),
            ("Queue-based", queue_time)
        ]
    
        for name, time_taken in approaches:
            speedup = sequential_time / time_taken
            print(f"{name:<20}: {time_taken:>6.2f}s (speedup: {speedup:>5.1f}x)")
    
    # Connection pooling optimization
    class OptimizedHTTPClient:
        """HTTP client optimized for performance"""
    
        def __init__(self):
            # Optimized connector settings
            self.connector = aiohttp.TCPConnector(
                limit=100,              # Total connection pool size
                limit_per_host=20,      # Connections per host
                keepalive_timeout=300,  # Keep connections alive longer
                enable_cleanup_closed=True,
                use_dns_cache=True,
                ttl_dns_cache=300,
                family=0,               # IPv4 and IPv6
                ssl=False
            )
    
            # Optimized timeout settings
            self.timeout = aiohttp.ClientTimeout(
                total=30,
                connect=5,
                sock_read=5
            )
    
        async def __aenter__(self):
            self.session = aiohttp.ClientSession(
                connector=self.connector,
                timeout=self.timeout
            )
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            await self.session.close()
    
        async def fetch_multiple(self, urls: List[str], max_concurrent: int = 20):
            """Fetch multiple URLs with optimized concurrency"""
            semaphore = Semaphore(max_concurrent)
    
            async def fetch_one(url):
                async with semaphore:
                    try:
                        async with self.session.get(url) as response:
                            return {
                                'url': url,
                                'status': response.status,
                                'size': len(await response.text()),
                                'time': response.headers.get('X-Response-Time', 'unknown')
                            }
                    except Exception as e:
                        return {'url': url, 'error': str(e)}
    
            tasks = [fetch_one(url) for url in urls]
            return await asyncio.gather(*tasks, return_exceptions=True)
    
    async def optimization_demo():
        """Demonstrate various optimization techniques"""
    
        print("Running optimization demonstrations...")
    
        # Performance comparison
        await performance_comparison()
    
        # HTTP client optimization
        urls = [f'http://httpbin.org/delay/0.1?id={i}' for i in range(20)]
    
        async with OptimizedHTTPClient() as client:
            print("\nTesting optimized HTTP client...")
            start_time = time.time()
            results = await client.fetch_multiple(urls, max_concurrent=10)
            http_time = time.time() - start_time
    
            successful = len([r for r in results if 'error' not in r])
            print(f"HTTP requests: {successful}/{len(urls)} successful in {http_time:.2f}s")
    
    if __name__ == "__main__":
        asyncio.run(optimization_demo())
    Python

    11. Real-World Applications

    Web Scraping at Scale

    Building a high-performance web scraper that can handle thousands of URLs efficiently.

    graph TD
        A[URL Queue] --> B[Scraper Pool]
        B --> C[Rate Limiter]
        C --> D[HTTP Client]
        D --> E[Parser]
        E --> F[Data Pipeline]
        F --> G[Storage]
    
        H[Retry Logic] --> D
        I[Error Handler] --> B
        J[Progress Monitor] --> A
    import asyncio
    import aiohttp
    import aiofiles
    from bs4 import BeautifulSoup
    from urllib.parse import urljoin, urlparse
    import json
    import time
    from typing import List, Dict, Set, Optional
    from dataclasses import dataclass
    import logging
    
    @dataclass
    class ScrapingJob:
        url: str
        depth: int = 0
        priority: int = 1
        retries: int = 0
        metadata: Dict = None
    
    class AsyncWebScraper:
        """Production-ready async web scraper"""
    
        def __init__(self, 
                     max_concurrent: int = 50,
                     max_retries: int = 3,
                     delay_between_requests: float = 0.1,
                     timeout: int = 30):
    
            self.max_concurrent = max_concurrent
            self.max_retries = max_retries
            self.delay_between_requests = delay_between_requests
    
            # Setup HTTP client with optimized settings
            self.connector = aiohttp.TCPConnector(
                limit=max_concurrent * 2,
                limit_per_host=20,
                keepalive_timeout=30,
                enable_cleanup_closed=True
            )
    
            self.timeout = aiohttp.ClientTimeout(total=timeout)
            self.session: Optional[aiohttp.ClientSession] = None
    
            # Concurrency control
            self.semaphore = asyncio.Semaphore(max_concurrent)
            self.rate_limiter = asyncio.Semaphore(1)
            self.last_request_time = 0
    
            # State tracking
            self.visited_urls: Set[str] = set()
            self.failed_urls: Set[str] = set()
            self.results: List[Dict] = []
    
            # Logging
            self.logger = logging.getLogger(__name__)
    
        async def __aenter__(self):
            self.session = aiohttp.ClientSession(
                connector=self.connector,
                timeout=self.timeout
            )
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.session:
                await self.session.close()
    
        async def rate_limit(self):
            """Implement rate limiting"""
            async with self.rate_limiter:
                now = time.time()
                time_since_last = now - self.last_request_time
                if time_since_last < self.delay_between_requests:
                    await asyncio.sleep(self.delay_between_requests - time_since_last)
                self.last_request_time = time.time()
    
        async def fetch_page(self, job: ScrapingJob) -> Optional[Dict]:
            """Fetch and parse a single page"""
    
            if job.url in self.visited_urls:
                return None
    
            async with self.semaphore:
                await self.rate_limit()
    
                try:
                    async with self.session.get(job.url) as response:
                        if response.status == 200:
                            content = await response.text()
                            self.visited_urls.add(job.url)
    
                            return {
                                'url': job.url,
                                'status': response.status,
                                'content': content,
                                'headers': dict(response.headers),
                                'depth': job.depth,
                                'timestamp': time.time()
                            }
                        else:
                            self.logger.warning(f"HTTP {response.status} for {job.url}")
                            return None
    
                except Exception as e:
                    self.logger.error(f"Error fetching {job.url}: {e}")
                    if job.retries < self.max_retries:
                        job.retries += 1
                        await asyncio.sleep(2 ** job.retries)  # Exponential backoff
                        return await self.fetch_page(job)
                    else:
                        self.failed_urls.add(job.url)
                        return None
    
        def extract_links(self, content: str, base_url: str) -> List[str]:
            """Extract links from HTML content"""
            try:
                soup = BeautifulSoup(content, 'html.parser')
                links = []
    
                for link in soup.find_all('a', href=True):
                    href = link['href']
                    full_url = urljoin(base_url, href)
    
                    # Filter valid HTTP URLs
                    parsed = urlparse(full_url)
                    if parsed.scheme in ('http', 'https'):
                        links.append(full_url)
    
                return links
            except Exception as e:
                self.logger.error(f"Error extracting links: {e}")
                return []
    
        def parse_content(self, page_data: Dict) -> Dict:
            """Parse content and extract structured data"""
            try:
                soup = BeautifulSoup(page_data['content'], 'html.parser')
    
                # Remove script and style elements
                for script in soup(["script", "style"]):
                    script.decompose()
    
                # Extract structured data
                parsed_data = {
                    'url': page_data['url'],
                    'title': soup.find('title').get_text().strip() if soup.find('title') else '',
                    'meta_description': '',
                    'headings': [],
                    'text_content': soup.get_text(),
                    'images': [],
                    'links': self.extract_links(page_data['content'], page_data['url']),
                    'depth': page_data['depth'],
                    'timestamp': page_data['timestamp']
                }
    
                # Extract meta description
                meta_desc = soup.find('meta', attrs={'name': 'description'})
                if meta_desc:
                    parsed_data['meta_description'] = meta_desc.get('content', '')
    
                # Extract headings
                for i in range(1, 7):
                    headings = soup.find_all(f'h{i}')
                    for heading in headings:
                        parsed_data['headings'].append({
                            'level': i,
                            'text': heading.get_text().strip()
                        })
    
                # Extract images
                for img in soup.find_all('img', src=True):
                    img_url = urljoin(page_data['url'], img['src'])
                    parsed_data['images'].append({
                        'src': img_url,
                        'alt': img.get('alt', ''),
                        'title': img.get('title', '')
                    })
    
                return parsed_data
    
            except Exception as e:
                self.logger.error(f"Error parsing content for {page_data['url']}: {e}")
                return None
    
        async def scrape_urls(self, 
                             urls: List[str], 
                             max_depth: int = 2,
                             follow_links: bool = True) -> List[Dict]:
            """Scrape multiple URLs with optional link following"""
    
            # Initialize job queue
            job_queue = asyncio.Queue()
    
            # Add initial URLs to queue
            for url in urls:
                await job_queue.put(ScrapingJob(url=url, depth=0))
    
            # Worker coroutine
            async def worker():
                worker_results = []
    
                while True:
                    try:
                        job = await asyncio.wait_for(job_queue.get(), timeout=1.0)
                    except asyncio.TimeoutError:
                        break
    
                    # Fetch page
                    page_data = await self.fetch_page(job)
                    if page_data:
                        # Parse content
                        parsed_data = self.parse_content(page_data)
                        if parsed_data:
                            worker_results.append(parsed_data)
    
                            # Add child links to queue if following links
                            if follow_links and job.depth < max_depth:
                                for link in parsed_data['links'][:5]:  # Limit links per page
                                    if link not in self.visited_urls:
                                        child_job = ScrapingJob(
                                            url=link, 
                                            depth=job.depth + 1
                                        )
                                        await job_queue.put(child_job)
    
                    job_queue.task_done()
    
                return worker_results
    
            # Start workers
            workers = [asyncio.create_task(worker()) for _ in range(self.max_concurrent)]
    
            # Wait for all jobs to complete
            await job_queue.join()
    
            # Cancel workers
            for worker_task in workers:
                worker_task.cancel()
    
            # Collect results
            worker_results = await asyncio.gather(*workers, return_exceptions=True)
    
            all_results = []
            for result in worker_results:
                if isinstance(result, list):
                    all_results.extend(result)
    
            return all_results
    
        async def save_results(self, results: List[Dict], filename: str):
            """Save results to JSON file"""
            async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
                await f.write(json.dumps(results, indent=2, ensure_ascii=False))
    
            self.logger.info(f"Saved {len(results)} results to {filename}")
    
    # Usage example
    async def scraping_example():
        """Demonstrate web scraping"""
    
        urls_to_scrape = [
            'https://httpbin.org/html',
            'https://httpbin.org/links/10',
            'https://example.com'
        ]
    
        async with AsyncWebScraper(max_concurrent=10) as scraper:
            results = await scraper.scrape_urls(
                urls_to_scrape, 
                max_depth=1, 
                follow_links=True
            )
    
            await scraper.save_results(results, 'scraping_results.json')
    
            print(f"Scraped {len(results)} pages")
            print(f"Visited URLs: {len(scraper.visited_urls)}")
            print(f"Failed URLs: {len(scraper.failed_urls)}")
    Python

    Real-Time Chat Application

    Building a WebSocket-based chat application with asyncio.

    import asyncio
    import websockets
    import json
    import logging
    from typing import Set, Dict, Optional
    from datetime import datetime
    import jwt
    import hashlib
    
    class ChatRoom:
        """Manages a chat room with multiple users"""
    
        def __init__(self, room_id: str):
            self.room_id = room_id
            self.clients: Set[websockets.WebSocketServerProtocol] = set()
            self.message_history: List[Dict] = []
            self.max_history = 100
    
        async def add_client(self, websocket, username: str):
            """Add a client to the room"""
            self.clients.add(websocket)
            websocket.username = username
            websocket.room_id = self.room_id
    
            # Send chat history to new client
            for message in self.message_history[-10:]:  # Last 10 messages
                await websocket.send(json.dumps(message))
    
            # Notify others about new user
            await self.broadcast({
                'type': 'user_joined',
                'username': username,
                'timestamp': datetime.now().isoformat(),
                'user_count': len(self.clients)
            }, exclude=websocket)
    
        async def remove_client(self, websocket):
            """Remove a client from the room"""
            if websocket in self.clients:
                self.clients.remove(websocket)
                username = getattr(websocket, 'username', 'Unknown')
    
                # Notify others about user leaving
                await self.broadcast({
                    'type': 'user_left',
                    'username': username,
                    'timestamp': datetime.now().isoformat(),
                    'user_count': len(self.clients)
                })
    
        async def broadcast(self, message: Dict, exclude: Optional[websockets.WebSocketServerProtocol] = None):
            """Broadcast message to all clients in room"""
            if not self.clients:
                return
    
            message_json = json.dumps(message)
    
            # Send to all clients except excluded one
            clients_to_send = self.clients - {exclude} if exclude else self.clients
    
            # Handle disconnected clients
            disconnected = set()
    
            for client in clients_to_send:
                try:
                    await client.send(message_json)
                except websockets.exceptions.ConnectionClosed:
                    disconnected.add(client)
    
            # Remove disconnected clients
            for client in disconnected:
                await self.remove_client(client)
    
        async def add_message(self, message: Dict):
            """Add message to history and broadcast"""
            # Add timestamp and store in history
            message['timestamp'] = datetime.now().isoformat()
            self.message_history.append(message)
    
            # Limit history size
            if len(self.message_history) > self.max_history:
                self.message_history = self.message_history[-self.max_history:]
    
            # Broadcast to all clients
            await self.broadcast(message)
    
    class ChatServer:
        """WebSocket chat server"""
    
        def __init__(self, secret_key: str = "your-secret-key"):
            self.rooms: Dict[str, ChatRoom] = {}
            self.secret_key = secret_key
            self.logger = logging.getLogger(__name__)
    
        def get_or_create_room(self, room_id: str) -> ChatRoom:
            """Get existing room or create new one"""
            if room_id not in self.rooms:
                self.rooms[room_id] = ChatRoom(room_id)
            return self.rooms[room_id]
    
        def authenticate_user(self, token: str) -> Optional[str]:
            """Authenticate user with JWT token"""
            try:
                payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
                return payload.get('username')
            except jwt.InvalidTokenError:
                return None
    
        def create_token(self, username: str) -> str:
            """Create JWT token for user"""
            payload = {
                'username': username,
                'exp': datetime.utcnow().timestamp() + 3600  # 1 hour expiry
            }
            return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
        async def handle_client(self, websocket, path):
            """Handle individual client connection"""
            username = None
            room = None
    
            try:
                # Wait for authentication message
                auth_message = await asyncio.wait_for(websocket.recv(), timeout=10.0)
                auth_data = json.loads(auth_message)
    
                if auth_data['type'] == 'auth':
                    username = self.authenticate_user(auth_data['token'])
                    if not username:
                        await websocket.send(json.dumps({
                            'type': 'error',
                            'message': 'Authentication failed'
                        }))
                        return
    
                    room_id = auth_data.get('room_id', 'general')
                    room = self.get_or_create_room(room_id)
                    await room.add_client(websocket, username)
    
                    await websocket.send(json.dumps({
                        'type': 'auth_success',
                        'message': f'Welcome to room {room_id}!'
                    }))
    
                    self.logger.info(f"User {username} joined room {room_id}")
    
                # Handle messages
                async for message in websocket:
                    try:
                        data = json.loads(message)
                        await self.process_message(data, websocket, room)
                    except json.JSONDecodeError:
                        await websocket.send(json.dumps({
                            'type': 'error',
                            'message': 'Invalid JSON format'
                        }))
                    except Exception as e:
                        self.logger.error(f"Error processing message: {e}")
    
            except asyncio.TimeoutError:
                await websocket.send(json.dumps({
                    'type': 'error',
                    'message': 'Authentication timeout'
                }))
            except websockets.exceptions.ConnectionClosed:
                pass
            except Exception as e:
                self.logger.error(f"Error handling client: {e}")
            finally:
                if room and websocket in room.clients:
                    await room.remove_client(websocket)
                    if username:
                        self.logger.info(f"User {username} left room {room.room_id}")
    
        async def process_message(self, data: Dict, websocket, room: ChatRoom):
            """Process different types of messages"""
            message_type = data.get('type')
            username = getattr(websocket, 'username', 'Unknown')
    
            if message_type == 'chat':
                # Regular chat message
                await room.add_message({
                    'type': 'chat',
                    'username': username,
                    'message': data['message'],
                    'room_id': room.room_id
                })
    
            elif message_type == 'typing':
                # Typing indicator
                await room.broadcast({
                    'type': 'typing',
                    'username': username,
                    'is_typing': data.get('is_typing', False)
                }, exclude=websocket)
    
            elif message_type == 'ping':
                # Ping/pong for connection health
                await websocket.send(json.dumps({'type': 'pong'}))
    
            else:
                await websocket.send(json.dumps({
                    'type': 'error',
                    'message': f'Unknown message type: {message_type}'
                }))
    
        async def start_server(self, host: str = 'localhost', port: int = 8765):
            """Start the WebSocket server"""
            self.logger.info(f"Starting chat server on {host}:{port}")
    
            async with websockets.serve(self.handle_client, host, port):
                await asyncio.Future()  # Run forever
    
    # Chat client example
    class ChatClient:
        """WebSocket chat client"""
    
        def __init__(self, username: str, server_url: str = "ws://localhost:8765"):
            self.username = username
            self.server_url = server_url
            self.websocket = None
            self.token = None
    
        async def connect(self, room_id: str = 'general'):
            """Connect to chat server"""
            try:
                self.websocket = await websockets.connect(self.server_url)
    
                # Create token (in real app, get from auth server)
                server = ChatServer()
                self.token = server.create_token(self.username)
    
                # Send authentication
                auth_message = {
                    'type': 'auth',
                    'token': self.token,
                    'room_id': room_id
                }
                await self.websocket.send(json.dumps(auth_message))
    
                # Wait for auth response
                response = await self.websocket.recv()
                auth_response = json.loads(response)
    
                if auth_response['type'] == 'auth_success':
                    print(f"Connected: {auth_response['message']}")
                    return True
                else:
                    print(f"Connection failed: {auth_response['message']}")
                    return False
    
            except Exception as e:
                print(f"Connection error: {e}")
                return False
    
        async def send_message(self, message: str):
            """Send chat message"""
            if self.websocket:
                await self.websocket.send(json.dumps({
                    'type': 'chat',
                    'message': message
                }))
    
        async def listen_for_messages(self):
            """Listen for incoming messages"""
            try:
                async for message in self.websocket:
                    data = json.loads(message)
                    self.handle_message(data)
            except websockets.exceptions.ConnectionClosed:
                print("Connection closed by server")
            except Exception as e:
                print(f"Error receiving messages: {e}")
    
        def handle_message(self, data: Dict):
            """Handle incoming message"""
            msg_type = data.get('type')
    
            if msg_type == 'chat':
                timestamp = data.get('timestamp', '')
                username = data.get('username', '')
                message = data.get('message', '')
                print(f"[{timestamp}] {username}: {message}")
    
            elif msg_type == 'user_joined':
                username = data.get('username', '')
                count = data.get('user_count', 0)
                print(f"*** {username} joined the chat (users online: {count}) ***")
    
            elif msg_type == 'user_left':
                username = data.get('username', '')
                count = data.get('user_count', 0)
                print(f"*** {username} left the chat (users online: {count}) ***")
    
            elif msg_type == 'typing':
                username = data.get('username', '')
                is_typing = data.get('is_typing', False)
                if is_typing:
                    print(f"*** {username} is typing... ***")
    
            elif msg_type == 'error':
                print(f"Error: {data.get('message', 'Unknown error')}")
    
        async def disconnect(self):
            """Disconnect from server"""
            if self.websocket:
                await self.websocket.close()
    
    # Usage examples
    async def run_chat_server():
        """Run the chat server"""
        server = ChatServer()
        await server.start_server()
    
    async def run_chat_client():
        """Run a chat client"""
        client = ChatClient("TestUser")
    
        if await client.connect("general"):
            # Start listening for messages in background
            listen_task = asyncio.create_task(client.listen_for_messages())
    
            # Send some test messages
            await client.send_message("Hello, everyone!")
            await asyncio.sleep(1)
            await client.send_message("How is everyone doing?")
    
            # Wait a bit then disconnect
            await asyncio.sleep(5)
            await client.disconnect()
    
            # Cancel listening task
            listen_task.cancel()
            try:
                await listen_task
            except asyncio.CancelledError:
                pass
    Python

    API Rate Limiter

    Building a sophisticated rate limiting system for APIs.

    import asyncio
    import time
    from typing import Dict, Optional, Tuple
    from dataclasses import dataclass
    from enum import Enum
    import redis.asyncio as redis
    import json
    
    class RateLimitStrategy(Enum):
        FIXED_WINDOW = "fixed_window"
        SLIDING_WINDOW = "sliding_window"
        TOKEN_BUCKET = "token_bucket"
        LEAKY_BUCKET = "leaky_bucket"
    
    @dataclass
    class RateLimitRule:
        requests_per_window: int
        window_size_seconds: int
        strategy: RateLimitStrategy = RateLimitStrategy.SLIDING_WINDOW
        burst_allowance: int = 0
    
    class AsyncRateLimiter:
        """Advanced async rate limiter with multiple strategies"""
    
        def __init__(self, redis_url: str = "redis://localhost:6379"):
            self.redis_url = redis_url
            self.redis_client: Optional[redis.Redis] = None
            self.local_cache: Dict[str, Dict] = {}
    
        async def __aenter__(self):
            self.redis_client = redis.from_url(self.redis_url)
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.redis_client:
                await self.redis_client.close()
    
        async def check_rate_limit(self, 
                                  key: str, 
                                  rule: RateLimitRule) -> Tuple[bool, Dict]:
            """Check if request is within rate limit"""
    
            current_time = time.time()
    
            if rule.strategy == RateLimitStrategy.SLIDING_WINDOW:
                return await self._sliding_window_check(key, rule, current_time)
            elif rule.strategy == RateLimitStrategy.FIXED_WINDOW:
                return await self._fixed_window_check(key, rule, current_time)
            elif rule.strategy == RateLimitStrategy.TOKEN_BUCKET:
                return await self._token_bucket_check(key, rule, current_time)
            elif rule.strategy == RateLimitStrategy.LEAKY_BUCKET:
                return await self._leaky_bucket_check(key, rule, current_time)
            else:
                raise ValueError(f"Unknown strategy: {rule.strategy}")
    
        async def _sliding_window_check(self, 
                                       key: str, 
                                       rule: RateLimitRule, 
                                       current_time: float) -> Tuple[bool, Dict]:
            """Sliding window rate limiting"""
    
            window_start = current_time - rule.window_size_seconds
    
            # Lua script for atomic operations
            lua_script = """
            local key = KEYS[1]
            local window_start = tonumber(ARGV[1])
            local current_time = tonumber(ARGV[2])
            local max_requests = tonumber(ARGV[3])
            local window_size = tonumber(ARGV[4])
    
            -- Remove old entries
            redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
    
            -- Count current requests in window
            local current_count = redis.call('ZCARD', key)
    
            if current_count < max_requests then
                -- Add current request
                redis.call('ZADD', key, current_time, current_time)
                redis.call('EXPIRE', key, window_size + 1)
                return {1, current_count + 1, max_requests - current_count - 1}
            else
                return {0, current_count, 0}
            end
            """
    
            result = await self.redis_client.eval(
                lua_script, 
                1, 
                key,
                window_start,
                current_time,
                rule.requests_per_window,
                rule.window_size_seconds
            )
    
            allowed = bool(result[0])
            current_count = result[1]
            remaining = result[2]
    
            return allowed, {
                'allowed': allowed,
                'current_count': current_count,
                'remaining': remaining,
                'reset_time': current_time + rule.window_size_seconds,
                'strategy': rule.strategy.value
            }
    
        async def _fixed_window_check(self, 
                                     key: str, 
                                     rule: RateLimitRule, 
                                     current_time: float) -> Tuple[bool, Dict]:
            """Fixed window rate limiting"""
    
            window_id = int(current_time // rule.window_size_seconds)
            window_key = f"{key}:window:{window_id}"
    
            # Lua script for atomic increment and check
            lua_script = """
            local key = KEYS[1]
            local max_requests = tonumber(ARGV[1])
            local window_size = tonumber(ARGV[2])
    
            local current_count = redis.call('INCR', key)
    
            if current_count == 1 then
                redis.call('EXPIRE', key, window_size)
            end
    
            if current_count <= max_requests then
                return {1, current_count, max_requests - current_count}
            else
                return {0, current_count, 0}
            end
            """
    
            result = await self.redis_client.eval(
                lua_script,
                1,
                window_key,
                rule.requests_per_window,
                rule.window_size_seconds
            )
    
            allowed = bool(result[0])
            current_count = result[1]
            remaining = result[2]
    
            reset_time = (window_id + 1) * rule.window_size_seconds
    
            return allowed, {
                'allowed': allowed,
                'current_count': current_count,
                'remaining': remaining,
                'reset_time': reset_time,
                'strategy': rule.strategy.value
            }
    
        async def _token_bucket_check(self, 
                                     key: str, 
                                     rule: RateLimitRule, 
                                     current_time: float) -> Tuple[bool, Dict]:
            """Token bucket rate limiting"""
    
            bucket_key = f"{key}:bucket"
    
            # Lua script for token bucket
            lua_script = """
            local key = KEYS[1]
            local current_time = tonumber(ARGV[1])
            local max_tokens = tonumber(ARGV[2])
            local refill_rate = tonumber(ARGV[3])
            local window_size = tonumber(ARGV[4])
    
            local bucket_data = redis.call('HMGET', key, 'tokens', 'last_refill')
            local tokens = tonumber(bucket_data[1]) or max_tokens
            local last_refill = tonumber(bucket_data[2]) or current_time
    
            -- Calculate tokens to add based on time elapsed
            local time_elapsed = current_time - last_refill
            local tokens_to_add = math.floor(time_elapsed * refill_rate / window_size)
    
            tokens = math.min(max_tokens, tokens + tokens_to_add)
    
            if tokens >= 1 then
                tokens = tokens - 1
                redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
                redis.call('EXPIRE', key, window_size * 2)
                return {1, tokens, max_tokens}
            else
                redis.call('HMSET', key, 'tokens', tokens, 'last_refill', current_time)
                redis.call('EXPIRE', key, window_size * 2)
                return {0, tokens, max_tokens}
            end
            """
    
            result = await self.redis_client.eval(
                lua_script,
                1,
                bucket_key,
                current_time,
                rule.requests_per_window + rule.burst_allowance,
                rule.requests_per_window,
                rule.window_size_seconds
            )
    
            allowed = bool(result[0])
            tokens_remaining = result[1]
            max_tokens = result[2]
    
            return allowed, {
                'allowed': allowed,
                'tokens_remaining': tokens_remaining,
                'max_tokens': max_tokens,
                'strategy': rule.strategy.value
            }
    
    class RateLimitMiddleware:
        """Rate limiting middleware for web applications"""
    
        def __init__(self, limiter: AsyncRateLimiter):
            self.limiter = limiter
            self.rules: Dict[str, RateLimitRule] = {}
            self.default_rule = RateLimitRule(
                requests_per_window=100,
                window_size_seconds=60,
                strategy=RateLimitStrategy.SLIDING_WINDOW
            )
    
        def add_rule(self, pattern: str, rule: RateLimitRule):
            """Add rate limiting rule for specific pattern"""
            self.rules[pattern] = rule
    
        def get_client_key(self, request) -> str:
            """Extract client identifier from request"""
            # In real implementation, use IP, user ID, API key, etc.
            client_ip = getattr(request, 'remote_addr', 'unknown')
            user_id = getattr(request, 'user_id', None)
    
            if user_id:
                return f"user:{user_id}"
            else:
                return f"ip:{client_ip}"
    
        def get_rule_for_endpoint(self, endpoint: str) -> RateLimitRule:
            """Get rate limiting rule for endpoint"""
            for pattern, rule in self.rules.items():
                if pattern in endpoint:
                    return rule
            return self.default_rule
    
        async def process_request(self, request) -> Tuple[bool, Dict]:
            """Process request through rate limiter"""
            client_key = self.get_client_key(request)
            endpoint = getattr(request, 'path', '/')
            rule = self.get_rule_for_endpoint(endpoint)
    
            rate_limit_key = f"rate_limit:{client_key}:{endpoint}"
    
            return await self.limiter.check_rate_limit(rate_limit_key, rule)
    
    # Example usage with different web frameworks
    class AsyncAPIServer:
        """Example async API server with rate limiting"""
    
        def __init__(self):
            self.limiter = None
            self.middleware = None
    
        async def __aenter__(self):
            self.limiter = AsyncRateLimiter()
            await self.limiter.__aenter__()
    
            self.middleware = RateLimitMiddleware(self.limiter)
    
            # Configure rate limiting rules
            self.middleware.add_rule('/api/upload', RateLimitRule(
                requests_per_window=5,
                window_size_seconds=60,
                strategy=RateLimitStrategy.TOKEN_BUCKET,
                burst_allowance=2
            ))
    
            self.middleware.add_rule('/api/search', RateLimitRule(
                requests_per_window=50,
                window_size_seconds=60,
                strategy=RateLimitStrategy.SLIDING_WINDOW
            ))
    
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.limiter:
                await self.limiter.__aexit__(exc_type, exc_val, exc_tb)
    
        async def handle_request(self, request_data: Dict) -> Dict:
            """Handle API request with rate limiting"""
    
            # Mock request object
            class MockRequest:
                def __init__(self, data):
                    self.remote_addr = data.get('client_ip', '127.0.0.1')
                    self.user_id = data.get('user_id')
                    self.path = data.get('path', '/')
    
            request = MockRequest(request_data)
    
            # Check rate limit
            allowed, rate_info = await self.middleware.process_request(request)
    
            if not allowed:
                return {
                    'status': 429,
                    'error': 'Rate limit exceeded',
                    'rate_limit_info': rate_info
                }
    
            # Process request (mock)
            await asyncio.sleep(0.1)  # Simulate processing
    
            return {
                'status': 200,
                'data': 'Request processed successfully',
                'rate_limit_info': rate_info
            }
    
    # Distributed rate limiter for microservices
    class DistributedRateLimiter:
        """Distributed rate limiter for microservices"""
    
        def __init__(self, redis_cluster_nodes: List[str]):
            self.redis_nodes = redis_cluster_nodes
            self.redis_client = None
    
        async def __aenter__(self):
            # In real implementation, use Redis Cluster
            self.redis_client = redis.from_url(self.redis_nodes[0])
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.redis_client:
                await self.redis_client.close()
    
        async def check_global_rate_limit(self, 
                                         service_name: str,
                                         resource_type: str,
                                         identifier: str,
                                         rule: RateLimitRule) -> Tuple[bool, Dict]:
            """Check rate limit across all service instances"""
    
            global_key = f"global_rate_limit:{service_name}:{resource_type}:{identifier}"
    
            # Use consistent hashing to distribute load
            key_hash = hash(global_key) % len(self.redis_nodes)
    
            # For this example, we'll use single Redis instance
            limiter = AsyncRateLimiter(self.redis_nodes[0])
            async with limiter:
                return await limiter.check_rate_limit(global_key, rule)
    
    # Usage examples
    async def rate_limiter_examples():
        """Demonstrate rate limiting functionality"""
    
        print("=== Rate Limiter Examples ===\n")
    
        # Example 1: Basic rate limiting
        async with AsyncRateLimiter() as limiter:
            rule = RateLimitRule(
                requests_per_window=5,
                window_size_seconds=10,
                strategy=RateLimitStrategy.SLIDING_WINDOW
            )
    
            print("Testing sliding window rate limiting:")
            for i in range(8):
                allowed, info = await limiter.check_rate_limit("user:123", rule)
                print(f"Request {i+1}: {'' if allowed else ''} - {info}")
                await asyncio.sleep(1)
    
            print()
    
        # Example 2: API server with rate limiting
        print("Testing API server with rate limiting:")
        async with AsyncAPIServer() as server:
            test_requests = [
                {'client_ip': '192.168.1.1', 'path': '/api/search', 'user_id': 'user1'},
                {'client_ip': '192.168.1.1', 'path': '/api/upload', 'user_id': 'user1'},
                {'client_ip': '192.168.1.2', 'path': '/api/search', 'user_id': 'user2'},
                {'client_ip': '192.168.1.1', 'path': '/api/upload', 'user_id': 'user1'},
            ]
    
            for i, req_data in enumerate(test_requests):
                response = await server.handle_request(req_data)
                print(f"Request {i+1} ({req_data['path']}): Status {response['status']}")
                if response['status'] == 429:
                    print(f"  Rate limited: {response['rate_limit_info']}")
                else:
                    print(f"  Success: {response['rate_limit_info']}")
    
            print()
    
    if __name__ == "__main__":
        # Configure logging
        logging.basicConfig(level=logging.INFO)
    
        # Run examples
        asyncio.run(rate_limiter_examples())
    Python

    Database Connection Pool

    Building an efficient async database connection pool.

    import asyncio
    import asyncpg
    import aiosqlite
    from typing import Optional, Dict, Any, List, Protocol
    from contextlib import asynccontextmanager
    import logging
    import time
    from dataclasses import dataclass
    from enum import Enum
    
    class DatabaseType(Enum):
        POSTGRESQL = "postgresql"
        SQLITE = "sqlite"
        MYSQL = "mysql"
    
    @dataclass
    class ConnectionConfig:
        database_type: DatabaseType
        host: str = "localhost"
        port: int = 5432
        database: str = "testdb"
        username: str = "user"
        password: str = "password"
        min_connections: int = 5
        max_connections: int = 20
        connection_timeout: float = 10.0
        idle_timeout: float = 300.0
        max_lifetime: float = 3600.0
    
    class AsyncConnection(Protocol):
        """Protocol for async database connections"""
    
        async def execute(self, query: str, *args) -> Any:
            """Execute a query"""
            ...
    
        async def fetch(self, query: str, *args) -> List[Dict]:
            """Fetch multiple rows"""
            ...
    
        async def fetchone(self, query: str, *args) -> Optional[Dict]:
            """Fetch single row"""
            ...
    
        async def close(self) -> None:
            """Close connection"""
            ...
    
    class PooledConnection:
        """Wrapper for pooled database connection"""
    
        def __init__(self, connection, pool, connection_id: str):
            self.connection = connection
            self.pool = pool
            self.connection_id = connection_id
            self.created_at = time.time()
            self.last_used = time.time()
            self.in_use = False
            self.is_closed = False
    
        async def execute(self, query: str, *args):
            """Execute query through pooled connection"""
            self.last_used = time.time()
            return await self.connection.execute(query, *args)
    
        async def fetch(self, query: str, *args):
            """Fetch multiple rows"""
            self.last_used = time.time()
            return await self.connection.fetch(query, *args)
    
        async def fetchone(self, query: str, *args):
            """Fetch single row"""
            self.last_used = time.time()
            return await self.connection.fetchrow(query, *args)
    
        async def close(self):
            """Return connection to pool"""
            if not self.is_closed:
                await self.pool.release_connection(self)
    
        async def force_close(self):
            """Force close the underlying connection"""
            if not self.is_closed:
                await self.connection.close()
                self.is_closed = True
    
    class AsyncConnectionPool:
        """Async database connection pool"""
    
        def __init__(self, config: ConnectionConfig):
            self.config = config
            self.connections: List[PooledConnection] = []
            self.available_connections: asyncio.Queue = asyncio.Queue()
            self.semaphore = asyncio.Semaphore(config.max_connections)
            self.lock = asyncio.Lock()
            self.connection_counter = 0
            self.logger = logging.getLogger(__name__)
            self._closed = False
    
        async def __aenter__(self):
            await self.initialize()
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            await self.close()
    
        async def initialize(self):
            """Initialize the connection pool"""
            self.logger.info(f"Initializing connection pool with {self.config.min_connections} connections")
    
            # Create minimum number of connections
            for _ in range(self.config.min_connections):
                connection = await self._create_connection()
                if connection:
                    await self.available_connections.put(connection)
    
        async def _create_connection(self) -> Optional[PooledConnection]:
            """Create a new database connection"""
            try:
                if self.config.database_type == DatabaseType.POSTGRESQL:
                    conn = await asyncpg.connect(
                        host=self.config.host,
                        port=self.config.port,
                        database=self.config.database,
                        user=self.config.username,
                        password=self.config.password,
                        timeout=self.config.connection_timeout
                    )
                elif self.config.database_type == DatabaseType.SQLITE:
                    conn = await aiosqlite.connect(self.config.database)
                else:
                    raise ValueError(f"Unsupported database type: {self.config.database_type}")
    
                self.connection_counter += 1
                connection_id = f"conn_{self.connection_counter}"
    
                pooled_conn = PooledConnection(conn, self, connection_id)
                self.connections.append(pooled_conn)
    
                self.logger.debug(f"Created connection {connection_id}")
                return pooled_conn
    
            except Exception as e:
                self.logger.error(f"Failed to create connection: {e}")
                return None
    
        async def acquire_connection(self, timeout: Optional[float] = None) -> PooledConnection:
            """Acquire a connection from the pool"""
            if self._closed:
                raise RuntimeError("Connection pool is closed")
    
            timeout = timeout or self.config.connection_timeout
    
            async with self.semaphore:
                try:
                    # Try to get an available connection
                    connection = await asyncio.wait_for(
                        self.available_connections.get(),
                        timeout=timeout
                    )
    
                    # Check if connection is still valid
                    if await self._is_connection_valid(connection):
                        connection.in_use = True
                        return connection
                    else:
                        # Connection is invalid, create a new one
                        await connection.force_close()
                        self.connections.remove(connection)
    
                except asyncio.TimeoutError:
                    pass
    
                # No available connections, try to create a new one
                if len(self.connections) < self.config.max_connections:
                    connection = await self._create_connection()
                    if connection:
                        connection.in_use = True
                        return connection
    
                raise asyncio.TimeoutError("Could not acquire connection within timeout")
    
        async def _is_connection_valid(self, connection: PooledConnection) -> bool:
            """Check if connection is still valid"""
            if connection.is_closed:
                return False
    
            # Check connection age
            if time.time() - connection.created_at > self.config.max_lifetime:
                return False
    
            # Check idle timeout
            if time.time() - connection.last_used > self.config.idle_timeout:
                return False
    
            try:
                # Test connection with a simple query
                if self.config.database_type == DatabaseType.POSTGRESQL:
                    await connection.connection.execute("SELECT 1")
                elif self.config.database_type == DatabaseType.SQLITE:
                    await connection.connection.execute("SELECT 1")
                return True
            except Exception:
                return False
    
        async def release_connection(self, connection: PooledConnection):
            """Release connection back to pool"""
            if connection.is_closed or self._closed:
                return
    
            async with self.lock:
                connection.in_use = False
    
                if await self._is_connection_valid(connection):
                    await self.available_connections.put(connection)
                else:
                    # Remove invalid connection
                    await connection.force_close()
                    if connection in self.connections:
                        self.connections.remove(connection)
    
        async def cleanup_idle_connections(self):
            """Clean up idle connections periodically"""
            while not self._closed:
                await asyncio.sleep(60)  # Check every minute
    
                async with self.lock:
                    to_remove = []
    
                    for conn in self.connections:
                        if not conn.in_use and not await self._is_connection_valid(conn):
                            to_remove.append(conn)
    
                    for conn in to_remove:
                        await conn.force_close()
                        self.connections.remove(conn)
                        self.logger.debug(f"Removed idle connection {conn.connection_id}")
    
        @asynccontextmanager
        async def transaction(self):
            """Context manager for database transactions"""
            connection = await self.acquire_connection()
    
            if self.config.database_type == DatabaseType.POSTGRESQL:
                transaction = connection.connection.transaction()
                await transaction.start()
    
                try:
                    yield connection
                    await transaction.commit()
                except Exception:
                    await transaction.rollback()
                    raise
                finally:
                    await connection.close()
            else:
                # SQLite and other databases
                try:
                    yield connection
                    await connection.connection.commit()
                except Exception:
                    await connection.connection.rollback()
                    raise
                finally:
                    await connection.close()
    
        async def execute_many(self, query: str, parameters: List[tuple]):
            """Execute query with multiple parameter sets"""
            async with self.transaction() as conn:
                if self.config.database_type == DatabaseType.POSTGRESQL:
                    return await conn.connection.executemany(query, parameters)
                else:
                    results = []
                    for params in parameters:
                        result = await conn.execute(query, *params)
                        results.append(result)
                    return results
    
        async def get_pool_stats(self) -> Dict[str, Any]:
            """Get connection pool statistics"""
            active_connections = len([c for c in self.connections if c.in_use])
            available_connections = self.available_connections.qsize()
            total_connections = len(self.connections)
    
            return {
                'total_connections': total_connections,
                'active_connections': active_connections,
                'available_connections': available_connections,
                'max_connections': self.config.max_connections,
                'min_connections': self.config.min_connections
            }
    
        async def close(self):
            """Close all connections in the pool"""
            if self._closed:
                return
    
            self._closed = True
            self.logger.info("Closing connection pool")
    
            # Close all connections
            close_tasks = []
            for connection in self.connections:
                close_tasks.append(connection.force_close())
    
            if close_tasks:
                await asyncio.gather(*close_tasks, return_exceptions=True)
    
            self.connections.clear()
    
    class DatabaseManager:
        """High-level database manager with connection pooling"""
    
        def __init__(self, config: ConnectionConfig):
            self.config = config
            self.pool: Optional[AsyncConnectionPool] = None
            self.logger = logging.getLogger(__name__)
    
        async def __aenter__(self):
            self.pool = AsyncConnectionPool(self.config)
            await self.pool.__aenter__()
    
            # Start cleanup task
            asyncio.create_task(self.pool.cleanup_idle_connections())
    
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            if self.pool:
                await self.pool.__aexit__(exc_type, exc_val, exc_tb)
    
        async def execute(self, query: str, *args):
            """Execute a query"""
            connection = await self.pool.acquire_connection()
            try:
                return await connection.execute(query, *args)
            finally:
                await connection.close()
    
        async def fetch(self, query: str, *args) -> List[Dict]:
            """Fetch multiple rows"""
            connection = await self.pool.acquire_connection()
            try:
                return await connection.fetch(query, *args)
            finally:
                await connection.close()
    
        async def fetchone(self, query: str, *args) -> Optional[Dict]:
            """Fetch single row"""
            connection = await self.pool.acquire_connection()
            try:
                return await connection.fetchone(query, *args)
            finally:
                await connection.close()
    
        async def execute_transaction(self, operations: List[tuple]):
            """Execute multiple operations in a transaction"""
            async with self.pool.transaction() as conn:
                results = []
                for query, args in operations:
                    result = await conn.execute(query, *args)
                    results.append(result)
                return results
    
        async def get_stats(self) -> Dict[str, Any]:
            """Get database pool statistics"""
            return await self.pool.get_pool_stats()
    
    # Usage examples
    async def database_examples():
        """Demonstrate database connection pooling"""
    
        print("=== Database Connection Pool Examples ===\n")
    
        # PostgreSQL example
        pg_config = ConnectionConfig(
            database_type=DatabaseType.POSTGRESQL,
            host="localhost",
            port=5432,
            database="testdb",
            username="postgres",
            password="password",
            min_connections=3,
            max_connections=10
        )
    
        # SQLite example (more practical for demo)
        sqlite_config = ConnectionConfig(
            database_type=DatabaseType.SQLITE,
            database=":memory:",  # In-memory database
            min_connections=2,
            max_connections=5
        )
    
        print("Testing SQLite connection pool:")
        async with DatabaseManager(sqlite_config) as db:
            # Create test table
            await db.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY,
                    name TEXT NOT NULL,
                    email TEXT UNIQUE
                )
            """)
    
            # Insert test data
            await db.execute_transaction([
                ("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com")),
                ("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com")),
                ("INSERT INTO users (name, email) VALUES (?, ?)", ("Charlie", "charlie@example.com")),
            ])
    
            # Fetch data
            users = await db.fetch("SELECT * FROM users")
            print(f"Users in database: {len(users)}")
    
            for user in users:
                print(f"  {user}")
    
            # Get pool statistics
            stats = await db.get_stats()
            print(f"\nPool statistics: {stats}")
    
            # Test concurrent operations
            print("\nTesting concurrent operations:")
    
            async def concurrent_query(query_id: int):
                user = await db.fetchone("SELECT * FROM users WHERE id = ?", query_id)
                print(f"Query {query_id}: {user}")
                return user
    
            # Run multiple concurrent queries
            tasks = [concurrent_query(i) for i in range(1, 4)]
            results = await asyncio.gather(*tasks)
    
            print(f"Concurrent query results: {len(results)}")
    
            # Final pool statistics
            final_stats = await db.get_stats()
            print(f"Final pool statistics: {final_stats}")
    
    if __name__ == "__main__":
        # Configure logging
        logging.basicConfig(level=logging.INFO)
    
        # Run examples
        asyncio.run(database_examples())
    Python

    12. Troubleshooting and Debugging

    Common Asyncio Issues and Solutions

    Issue 1: “RuntimeError: asyncio.run() cannot be called from a running event loop”

    Problem:

    # This will fail in Jupyter notebooks or when already in async context
    import asyncio
    
    async def main():
        return "Hello"
    
    # ❌ Fails if event loop is already running
    asyncio.run(main())
    Python

    Solutions:

    # Solution 1: Check if loop is running
    import asyncio
    
    async def main():
        return "Hello"
    
    try:
        # Try to get running loop
        loop = asyncio.get_running_loop()
        # If we get here, we're in an async context
        result = await main()
    except RuntimeError:
        # No running loop, safe to use asyncio.run()
        result = asyncio.run(main())
    
    # Solution 2: Use nest_asyncio for Jupyter
    # pip install nest_asyncio
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(main())  # Now works in Jupyter
    
    # Solution 3: Create task in existing loop
    async def run_in_existing_loop():
        task = asyncio.create_task(main())
        return await task
    Python

    Issue 2: Forgetting to await coroutines

    Problem:

    async def fetch_data():
        await asyncio.sleep(1)
        return "data"
    
    async def main():
        # ❌ Returns coroutine object, not the result
        result = fetch_data()
        print(result)  # <coroutine object fetch_data at 0x...>
    Python

    Solution:

    async def main():
        # ✅ Properly await the coroutine
        result = await fetch_data()
        print(result)  # "data"
    Python

    Issue 3: Blocking the event loop

    Problem:

    import time
    
    async def blocking_example():
        # ❌ This blocks the entire event loop!
        time.sleep(2)  # Nothing else can run during this time
        return "done"
    Python

    Solution:

    async def non_blocking_example():
        # ✅ This yields control to other tasks
        await asyncio.sleep(2)  # Other tasks can run during this time
        return "done"
    
    # For CPU-intensive work, use run_in_executor
    async def cpu_intensive_work():
        def heavy_computation():
            return sum(i * i for i in range(1000000))
    
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, heavy_computation)
        return result
    Python

    Debugging Asyncio Applications

    Enable Debug Mode

    import asyncio
    import logging
    
    # Enable asyncio debug mode
    asyncio.run(main(), debug=True)
    
    # Or set environment variable
    # PYTHONASYNCIODEBUG=1 python your_script.py
    
    # Configure logging for asyncio
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger('asyncio')
    Python

    Common Debugging Techniques

    import asyncio
    import traceback
    import sys
    
    async def debug_example():
        """Example showing various debugging techniques"""
    
        # 1. Add timing information
        import time
        start_time = time.time()
    
        try:
            # 2. Use asyncio.wait_for to catch hanging operations
            result = await asyncio.wait_for(some_async_operation(), timeout=5.0)
    
        except asyncio.TimeoutError:
            print("Operation timed out!")
            # 3. Print current tasks to see what's running
            for task in asyncio.all_tasks():
                print(f"Task: {task.get_name()}, Done: {task.done()}")
                if not task.done():
                    print(f"  Stack: {task.get_stack()}")
    
        except Exception as e:
            # 4. Print full traceback for async exceptions
            print(f"Error: {e}")
            traceback.print_exc()
    
        finally:
            elapsed = time.time() - start_time
            print(f"Operation took: {elapsed:.2f} seconds")
    
    async def some_async_operation():
        # Simulate long-running operation
        await asyncio.sleep(10)
        return "done"
    Python

    Memory Debugging

    import asyncio
    import gc
    import tracemalloc
    
    async def memory_debug_example():
        """Debug memory usage in async applications"""
    
        # Start memory tracing
        tracemalloc.start()
    
        # Take snapshot before operations
        snapshot1 = tracemalloc.take_snapshot()
    
        # Run your async operations
        tasks = []
        for i in range(1000):
            task = asyncio.create_task(asyncio.sleep(0.1))
            tasks.append(task)
    
        await asyncio.gather(*tasks)
    
        # Take snapshot after operations
        snapshot2 = tracemalloc.take_snapshot()
    
        # Compare memory usage
        top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    
        print("Top 10 memory allocations:")
        for stat in top_stats[:10]:
            print(stat)
    
        # Force garbage collection
        collected = gc.collect()
        print(f"Garbage collected: {collected} objects")
    Python

    Performance Profiling

    import asyncio
    import cProfile
    import pstats
    from functools import wraps
    
    def async_profile(func):
        """Decorator to profile async functions"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            pr = cProfile.Profile()
            pr.enable()
    
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                pr.disable()
                stats = pstats.Stats(pr)
                stats.sort_stats('cumulative')
                stats.print_stats(10)  # Top 10 functions
    
        return wrapper
    
    @async_profile
    async def example_to_profile():
        """Example function to profile"""
        tasks = []
        for i in range(100):
            task = asyncio.create_task(asyncio.sleep(0.01))
            tasks.append(task)
    
        await asyncio.gather(*tasks)
        return "completed"
    
    # Usage
    # asyncio.run(example_to_profile())
    Python

    Best Practices for Debugging

    1. Use meaningful task names:
    # ✅ Good: Named tasks are easier to debug
    task = asyncio.create_task(fetch_data(), name="fetch_user_data")
    
    # ❌ Bad: Anonymous tasks are hard to track
    
    task = asyncio.create_task(fetch_data())
    Python

    2. Implement proper logging:

    import logging
    
    logger = logging.getLogger(__name__)
    
    async def well_logged_function():
        logger.info("Starting operation")
        try:
            result = await some_operation()
            logger.info(f"Operation successful: {result}")
            return result
        except Exception as e:
            logger.error(f"Operation failed: {e}", exc_info=True)
            raise
    Python
    1. Use context managers for resources:
    # ✅ Good: Automatic cleanup
    async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        data = await response.text()
    
    
    # ❌ Bad: Manual cleanup (error-prone)
    
    session = aiohttp.ClientSession()
    response = await session.get(url)
    data = await response.text()
    await response.close()
    await session.close()
    Python

    13. Asyncio vs Other Frameworks

    Asyncio vs Threading

    # Threading approach
    import threading
    import time
    import requests
    
    def fetch_url_threaded(url):
        response = requests.get(url)
        return response.status_code
    
    def threaded_example():
        urls = ['http://httpbin.org/delay/1'] * 5
        threads = []
        results = []
    
        def worker(url):
            result = fetch_url_threaded(url)
            results.append(result)
    
        # Create threads
        for url in urls:
            thread = threading.Thread(target=worker, args=(url,))
            threads.append(thread)
            thread.start()
    
        # Wait for completion
        for thread in threads:
            thread.join()
    
        return results
    
    # Asyncio approach
    import asyncio
    import aiohttp
    
    async def fetch_url_async(session, url):
        async with session.get(url) as response:
            return response.status
    
    async def asyncio_example():
        urls = ['http://httpbin.org/delay/1'] * 5
    
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url_async(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
    
        return results
    Python

    Comparison:

    AspectThreadingAsyncio
    Concurrency ModelPreemptive multitaskingCooperative multitasking
    Memory UsageHigher (each thread ~8MB)Lower (single thread)
    Context SwitchingOS-managed (expensive)Event loop (cheap)
    Shared StateRequires locks/synchronizationNo locks needed
    DebuggingComplex (race conditions)Easier (single thread)
    CPU-bound TasksGood (true parallelism)Poor (single thread)
    I/O-bound TasksGoodExcellent
    ScalabilityLimited (~1000 threads)High (~10,000+ tasks)

    Asyncio vs Multiprocessing

    # Multiprocessing approach
    import multiprocessing as mp
    import time
    
    def cpu_intensive_task(n):
        """CPU-bound task - benefits from multiprocessing"""
        result = 0
        for i in range(n):
            result += i * i
        return result
    
    def multiprocessing_example():
        with mp.Pool() as pool:
            tasks = [1000000] * 4
            results = pool.map(cpu_intensive_task, tasks)
        return results
    
    # Asyncio with executor
    async def asyncio_with_executor():
        loop = asyncio.get_running_loop()
        with mp.Pool() as pool:
            tasks = [1000000] * 4
            # Run CPU-bound work in process pool
            results = await loop.run_in_executor(pool, lambda: [cpu_intensive_task(n) for n in tasks])
        return results
    Python

    Asyncio vs Other Async Frameworks

    Comparison with Node.js

    // Node.js (JavaScript)
    const fs = require('fs').promises;
    const http = require('http');
    
    async function nodeExample() {
        const files = ['file1.txt', 'file2.txt', 'file3.txt'];
        const contents = await Promise.all(
            files.map(file => fs.readFile(file, 'utf8'))
        );
        return contents;
    }
    JavaScript
    # Python Asyncio equivalent
    import asyncio
    import aiofiles
    
    async def python_example():
        files = ['file1.txt', 'file2.txt', 'file3.txt']
    
        async def read_file(filename):
            async with aiofiles.open(filename, 'r') as f:
                return await f.read()
    
        contents = await asyncio.gather(*[read_file(f) for f in files])
        return contents
    Python

    Comparison with Go Goroutines

    // Go
    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func goExample() {
        var wg sync.WaitGroup
        results := make(chan string, 3)
    
        for i := 0; i < 3; i++ {
            wg.Add(1)
            go func(id int) {
                defer wg.Done()
                time.Sleep(time.Second)
                results <- fmt.Sprintf("Result %d", id)
            }(i)
        }
    
        wg.Wait()
        close(results)
    
        for result := range results {
            fmt.Println(result)
        }
    }
    Go
    # Python Asyncio equivalent
    async def python_equivalent():
        async def worker(worker_id):
            await asyncio.sleep(1)
            return f"Result {worker_id}"
    
        tasks = [worker(i) for i in range(3)]
        results = await asyncio.gather(*tasks)
    
        for result in results:
            print(result)
    Python

    When to Choose What

    Use Asyncio when:

    • I/O-bound operations (network, file, database)
    • Need to handle many concurrent connections
    • Building web servers, APIs, or real-time applications
    • Memory efficiency is important
    • Single-machine applications

    Use Threading when:

    • Mixed I/O and CPU-bound tasks
    • Need to integrate with blocking libraries
    • Simpler concurrency model preferred
    • Limited concurrent operations

    Use Multiprocessing when:

    • CPU-bound tasks dominate
    • Need true parallelism
    • Tasks are independent
    • Have multiple CPU cores available

    Use other frameworks when:

    • Twisted: Legacy async Python applications
    • Tornado: Web applications with WebSocket support
    • Gevent: Monkey-patching existing sync code
    • Trio: More structured async programming

    14. Appendix: Quick Reference

    Common Asyncio Patterns Cheat Sheet

    # Basic async function
    async def my_func():
        await asyncio.sleep(1)
        return "result"
    
    # Running async code
    asyncio.run(my_func())
    
    # Concurrent execution
    await asyncio.gather(func1(), func2(), func3())
    
    # Creating tasks
    task = asyncio.create_task(my_func())
    result = await task
    
    # Timeout
    try:
        result = await asyncio.wait_for(my_func(), timeout=5.0)
    except asyncio.TimeoutError:
        print("Timed out")
    
    # Error handling
    try:
        results = await asyncio.gather(
            func1(), func2(), func3(),
            return_exceptions=True
        )
    except Exception as e:
        print(f"Error: {e}")
    Python

    Event Loop Methods

    # Get current loop
    loop = asyncio.get_running_loop()
    
    # Schedule callback
    loop.call_soon(callback)
    loop.call_later(delay, callback)
    loop.call_at(when, callback)
    
    # Run in executor
    result = await loop.run_in_executor(None, blocking_func)
    
    # Create future
    future = loop.create_future()
    future.set_result("value")
    Python

    Synchronization Primitives

    # Lock
    lock = asyncio.Lock()
    async with lock:
        # Critical section
        pass
    
    # Semaphore
    semaphore = asyncio.Semaphore(5)
    async with semaphore:
        # Limited concurrency
        pass
    
    # Event
    event = asyncio.Event()
    await event.wait()  # Wait for event
    event.set()         # Set event
    
    # Queue
    queue = asyncio.Queue()
    await queue.put(item)
    item = await queue.get()
    Python

    HTTP Client Example

    import aiohttp
    
    async def http_example():
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.example.com') as response:
                data = await response.json()
                return data
    Python

    Common Gotchas and Solutions

    1. Forgetting await: result = await async_func() not result = async_func()
    2. Blocking operations: Use asyncio.sleep() not time.sleep()
    3. Event loop issues: Use asyncio.run() or check for running loop
    4. Resource cleanup: Always use async with for resources
    5. Exception handling: Use return_exceptions=True in gather()

    Conclusion

    This comprehensive asyncio book covers:

    1. Fundamentals: Basic concepts, coroutines, event loops
    2. Advanced Features: Tasks, futures, synchronization primitives
    3. Networking: Streams, WebSockets, HTTP clients
    4. Error Handling: Exception management and best practices
    5. Performance: Optimization techniques and profiling
    6. Real-World Applications: Web scraping, chat servers, rate limiting, database pooling
    7. Troubleshooting: Common issues and debugging techniques
    8. Comparisons: How asyncio relates to other concurrency models

    Key takeaways:

    • Asyncio excels at I/O-bound operations
    • Proper error handling is crucial for production applications
    • Connection pooling and rate limiting are essential for scalable systems
    • Understanding the event loop is key to effective asyncio programming
    • Always use proper resource management with async context managers
    • Debug mode and logging are essential for development

    This guide provides both theoretical understanding and practical implementations that can be adapted for production use cases.


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *