Python Azure Cosmos DB

    A Complete Guide to Building Scalable Applications with Azure Cosmos DB and Python


    Table of Contents

    1. Introduction to Azure Cosmos DB
    2. Setting Up Your Development Environment
    3. Understanding Cosmos DB Core Concepts
    4. Getting Started with Python SDK
    5. Database and Container Operations
    6. CRUD Operations in Detail
    7. Querying Data
    8. Partitioning Strategies
    9. Performance Optimization
    10. Consistency Levels
    11. Change Feed Processing
    12. Stored Procedures and Triggers
    13. Security and Authentication
    14. Monitoring and Troubleshooting
    15. Integration Patterns
    16. Best Practices and Production Deployment
    17. Advanced Scenarios

    1. Introduction to Azure Cosmos DB

    What is Azure Cosmos DB?

    Azure Cosmos DB is Microsoft’s globally distributed, multi-model database service designed for modern applications that require low latency, elastic scale, and high availability. It provides comprehensive SLAs for throughput, latency, availability, and consistency.

    graph TB
        A[Azure Cosmos DB] --> B[Multi-Model Support]
        A --> C[Global Distribution]
        A --> D[Elastic Scale]
        A --> E[Multiple Consistency Levels]
    
        B --> F[SQL API]
        B --> G[MongoDB API]
        B --> H[Cassandra API]
        B --> I[Gremlin API]
        B --> J[Table API]
    
        C --> K[Multi-Region Writes]
        C --> L[Automatic Failover]
        C --> M[Data Replication]
    
        D --> N[Horizontal Scaling]
        D --> O[Auto-scaling]
        D --> P[Reserved Capacity]

    Key Features

    1. Turnkey Global Distribution: Replicate your data across any number of Azure regions
    2. Multi-Model Support: Work with document, key-value, graph, and column-family data
    3. Elastic Scale: Scale throughput and storage independently and elastically
    4. Low Latency: Single-digit millisecond latencies at the 99th percentile
    5. Five Consistency Levels: Choose from strong, bounded staleness, session, consistent prefix, and eventual consistency

    Use Cases

    mindmap
      root((Azure Cosmos DB Use Cases))
        Web Applications
          E-commerce platforms
          Content management
          User profiles
        IoT Applications
          Telemetry data
          Real-time analytics
          Device management
        Gaming
          Leaderboards
          Player profiles
          Game state
        Mobile Apps
          Offline sync
          Real-time features
          User data
        Financial Services
          Transaction processing
          Risk management
          Compliance reporting

    2. Setting Up Your Development Environment

    Prerequisites

    Before we begin, ensure you have the following:

    1. Python 3.7 or higher
    2. An Azure subscription
    3. Visual Studio Code (recommended) or your preferred IDE
    4. Git for version control

    Installing Required Packages

    # Install the Azure Cosmos DB Python SDK
    pip install azure-cosmos
    
    # Additional useful packages
    pip install python-dotenv  # For environment variables
    pip install azure-identity  # For authentication
    pip install azure-keyvault-secrets  # For secret management
    pip install pytest  # For testing
    pip install jupyter  # For interactive development
    Bash

    Creating an Azure Cosmos DB Account

    sequenceDiagram
        participant User
        participant AzurePortal
        participant CosmosDB
        participant Python
    
        User->>AzurePortal: Create Cosmos DB Account
        AzurePortal->>CosmosDB: Provision Resources
        CosmosDB->>AzurePortal: Return Connection Details
        AzurePortal->>User: Provide Endpoint & Keys
        User->>Python: Configure SDK with credentials
        Python->>CosmosDB: Establish Connection

    Environment Configuration

    Create a .env file to store your configuration:

    # .env file
    COSMOS_ENDPOINT=https://your-account.documents.azure.com:443/
    COSMOS_KEY=your-primary-key-here
    COSMOS_DATABASE_NAME=SampleDB
    COSMOS_CONTAINER_NAME=SampleContainer
    Bash

    Basic Configuration Module

    # config.py
    import os
    from dotenv import load_dotenv
    
    load_dotenv()
    
    class CosmosConfig:
        def __init__(self):
            self.endpoint = os.getenv('COSMOS_ENDPOINT')
            self.key = os.getenv('COSMOS_KEY')
            self.database_name = os.getenv('COSMOS_DATABASE_NAME')
            self.container_name = os.getenv('COSMOS_CONTAINER_NAME')
    
        def validate(self):
            """Validate that all required configuration is present"""
            required_vars = [
                self.endpoint, 
                self.key, 
                self.database_name, 
                self.container_name
            ]
            if not all(required_vars):
                raise ValueError("Missing required configuration variables")
            return True
    
    # Usage
    config = CosmosConfig()
    config.validate()
    Python

    3. Understanding Cosmos DB Core Concepts

    Hierarchy of Resources

    graph TD
        A[Cosmos DB Account] --> B[Database]
        B --> C[Container]
        C --> D[Items/Documents]
        C --> E[Stored Procedures]
        C --> F[User Defined Functions]
        C --> G[Triggers]
    
        A --> H[Account Settings]
        H --> I[Consistency Level]
        H --> J[Geo-Replication]
        H --> K[Failover Policy]
    
        C --> L[Container Settings]
        L --> M[Partition Key]
        L --> N[Throughput]
        L --> O[Indexing Policy]

    Request Units (RUs)

    Request Units represent the cost of database operations in Cosmos DB. Understanding RUs is crucial for performance and cost optimization.

    graph LR
        A[Database Operation] --> B[Request Units Consumed]
    
        B --> C[Point Read: 1 RU per 1KB]
        B --> D[Point Write: ~5 RUs per 1KB]
        B --> E[Query: Variable RUs]
        B --> F[Stored Procedure: Variable RUs]
    
        G[Factors Affecting RU Consumption] --> H[Item Size]
        G --> I[Operation Type]
        G --> J[Indexing]
        G --> K[Consistency Level]
        G --> L[Query Complexity]

    Partitioning

    graph TB
        A[Logical Partition] --> B[Partition Key Value: 'Electronics']
        A --> C[Partition Key Value: 'Clothing']
        A --> D[Partition Key Value: 'Books']
    
        B --> E[Physical Partition 1]
        C --> F[Physical Partition 2]
        D --> G[Physical Partition 3]
    
        E --> H[Items with Electronics category]
        F --> I[Items with Clothing category]
        G --> J[Items with Books category]
    
        K[Partition Key Selection] --> L[High Cardinality]
        K --> M[Even Distribution]
        K --> N[Query Efficiency]

    4. Getting Started with Python SDK

    Basic Connection

    # cosmos_client.py
    from azure.cosmos import CosmosClient, PartitionKey, exceptions
    from config import CosmosConfig
    import logging
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class CosmosDBClient:
        def __init__(self, config: CosmosConfig):
            """Initialize Cosmos DB client with configuration"""
            self.config = config
            self.client = None
            self.database = None
            self.container = None
    
        def connect(self):
            """Establish connection to Cosmos DB"""
            try:
                self.client = CosmosClient(
                    url=self.config.endpoint,
                    credential=self.config.key
                )
                logger.info("Successfully connected to Cosmos DB")
                return self
            except Exception as e:
                logger.error(f"Failed to connect to Cosmos DB: {e}")
                raise
    
        def get_database(self, create_if_not_exists=True):
            """Get or create database"""
            try:
                if create_if_not_exists:
                    self.database = self.client.create_database_if_not_exists(
                        id=self.config.database_name
                    )
                else:
                    self.database = self.client.get_database_client(
                        database=self.config.database_name
                    )
                logger.info(f"Database '{self.config.database_name}' ready")
                return self.database
            except exceptions.CosmosResourceNotFoundError:
                logger.error(f"Database '{self.config.database_name}' not found")
                raise
            except Exception as e:
                logger.error(f"Error accessing database: {e}")
                raise
    
        def get_container(self, partition_key_path="/id", create_if_not_exists=True):
            """Get or create container"""
            if not self.database:
                self.get_database()
    
            try:
                if create_if_not_exists:
                    self.container = self.database.create_container_if_not_exists(
                        id=self.config.container_name,
                        partition_key=PartitionKey(path=partition_key_path),
                        offer_throughput=400  # Minimum throughput
                    )
                else:
                    self.container = self.database.get_container_client(
                        container=self.config.container_name
                    )
                logger.info(f"Container '{self.config.container_name}' ready")
                return self.container
            except Exception as e:
                logger.error(f"Error accessing container: {e}")
                raise
    
    # Usage example
    if __name__ == "__main__":
        config = CosmosConfig()
    
        # Initialize and connect
        cosmos_client = CosmosDBClient(config)
        cosmos_client.connect()
    
        # Get database and container
        database = cosmos_client.get_database()
        container = cosmos_client.get_container(partition_key_path="/category")
    
        print("Cosmos DB setup complete!")
    Python

    Error Handling Best Practices

    # error_handling.py
    from azure.cosmos import exceptions
    import time
    import random
    
    def handle_cosmos_exceptions(func):
        """Decorator for handling common Cosmos DB exceptions"""
        def wrapper(*args, **kwargs):
            max_retries = 3
            retry_delay = 1
    
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions.CosmosResourceNotFoundError as e:
                    logger.error(f"Resource not found: {e}")
                    raise
                except exceptions.CosmosResourceExistsError as e:
                    logger.warning(f"Resource already exists: {e}")
                    raise
                except exceptions.CosmosHttpResponseError as e:
                    if e.status_code == 429:  # Rate limited
                        if attempt < max_retries - 1:
                            wait_time = retry_delay * (2 ** attempt) + random.uniform(0, 1)
                            logger.warning(f"Rate limited. Retrying in {wait_time:.2f} seconds...")
                            time.sleep(wait_time)
                            continue
                    logger.error(f"HTTP error: {e.status_code} - {e.message}")
                    raise
                except Exception as e:
                    logger.error(f"Unexpected error: {e}")
                    raise
    
            raise Exception(f"Max retries ({max_retries}) exceeded")
    
        return wrapper
    Python

    5. Database and Container Operations

    Database Management

    # database_operations.py
    from azure.cosmos import exceptions
    import logging
    
    logger = logging.getLogger(__name__)
    
    class DatabaseManager:
        def __init__(self, cosmos_client):
            self.client = cosmos_client
    
        @handle_cosmos_exceptions
        def create_database(self, database_id, throughput=None):
            """Create a new database"""
            try:
                database = self.client.create_database(
                    id=database_id,
                    offer_throughput=throughput
                )
                logger.info(f"Created database: {database_id}")
                return database
            except exceptions.CosmosResourceExistsError:
                logger.warning(f"Database {database_id} already exists")
                return self.client.get_database_client(database_id)
    
        @handle_cosmos_exceptions
        def list_databases(self):
            """List all databases in the account"""
            databases = list(self.client.list_databases())
            logger.info(f"Found {len(databases)} databases")
            return databases
    
        @handle_cosmos_exceptions
        def delete_database(self, database_id):
            """Delete a database"""
            database = self.client.get_database_client(database_id)
            database.delete_database()
            logger.info(f"Deleted database: {database_id}")
    
        @handle_cosmos_exceptions
        def get_database_properties(self, database_id):
            """Get database properties and throughput information"""
            database = self.client.get_database_client(database_id)
            properties = database.read()
    
            try:
                throughput = database.read_offer()
                properties['throughput'] = throughput['content']['offerThroughput']
            except exceptions.CosmosResourceNotFoundError:
                properties['throughput'] = None  # Serverless or container-level throughput
    
            return properties
    Python

    Container Management

    # container_operations.py
    from azure.cosmos import PartitionKey
    import json
    
    class ContainerManager:
        def __init__(self, database_client):
            self.database = database_client
    
        @handle_cosmos_exceptions
        def create_container(self, container_id, partition_key_path, 
                            throughput=400, indexing_policy=None):
            """Create a new container with specified partition key"""
    
            container_definition = {
                'id': container_id,
                'partitionKey': {
                    'paths': [partition_key_path],
                    'kind': 'Hash'
                }
            }
    
            if indexing_policy:
                container_definition['indexingPolicy'] = indexing_policy
    
            try:
                container = self.database.create_container(
                    id=container_id,
                    partition_key=PartitionKey(path=partition_key_path),
                    offer_throughput=throughput,
                    indexing_policy=indexing_policy
                )
                logger.info(f"Created container: {container_id}")
                return container
            except exceptions.CosmosResourceExistsError:
                logger.warning(f"Container {container_id} already exists")
                return self.database.get_container_client(container_id)
    
        @handle_cosmos_exceptions
        def create_container_with_custom_indexing(self, container_id, partition_key_path):
            """Create container with custom indexing policy"""
    
            # Custom indexing policy example
            indexing_policy = {
                "indexingMode": "consistent",
                "automatic": True,
                "includedPaths": [
                    {
                        "path": "/*"
                    }
                ],
                "excludedPaths": [
                    {
                        "path": "/description/*"
                    },
                    {
                        "path": "/\"_etag\"/?"
                    }
                ],
                "compositeIndexes": [
                    [
                        {
                            "path": "/category",
                            "order": "ascending"
                        },
                        {
                            "path": "/price",
                            "order": "descending"
                        }
                    ]
                ]
            }
    
            return self.create_container(
                container_id=container_id,
                partition_key_path=partition_key_path,
                throughput=400,
                indexing_policy=indexing_policy
            )
    
        @handle_cosmos_exceptions
        def list_containers(self):
            """List all containers in the database"""
            containers = list(self.database.list_containers())
            logger.info(f"Found {len(containers)} containers")
            return containers
    
        @handle_cosmos_exceptions
        def delete_container(self, container_id):
            """Delete a container"""
            container = self.database.get_container_client(container_id)
            container.delete_container()
            logger.info(f"Deleted container: {container_id}")
    
        @handle_cosmos_exceptions
        def scale_container_throughput(self, container_id, new_throughput):
            """Scale container throughput"""
            container = self.database.get_container_client(container_id)
    
            try:
                current_offer = container.read_offer()
                container.replace_throughput(new_throughput)
                logger.info(f"Scaled container {container_id} throughput to {new_throughput} RU/s")
            except exceptions.CosmosResourceNotFoundError:
                logger.error(f"Container {container_id} does not have provisioned throughput")
                raise
    Python

    Container Throughput Management

    graph TD
        A[Container Throughput] --> B[Provisioned]
        A --> C[Serverless]
        A --> D[Autoscale]
    
        B --> E[Fixed RU/s]
        B --> F[Manual Scaling]
        B --> G[Predictable Costs]
    
        C --> H[Pay per Request]
        C --> I[No Throughput Management]
        C --> J[Variable Costs]
    
        D --> K[Auto-scaling Range]
        D --> L[Scales to Zero]
        D --> M[Handles Traffic Spikes]
    
        N[Choosing Throughput Model] --> O[Workload Pattern]
        N --> P[Cost Considerations]
        N --> Q[Performance Requirements]

    6. CRUD Operations in Detail

    Create Operations

    # crud_operations.py
    import uuid
    from datetime import datetime
    from typing import Dict, Any, Optional
    
    class DocumentOperations:
        def __init__(self, container_client):
            self.container = container_client
    
        @handle_cosmos_exceptions
        def create_item(self, item_data: Dict[str, Any], partition_key_value: str = None) -> Dict[str, Any]:
            """Create a new document in the container"""
    
            # Ensure the item has an id
            if 'id' not in item_data:
                item_data['id'] = str(uuid.uuid4())
    
            # Add timestamp
            item_data['_ts'] = datetime.utcnow().isoformat()
    
            # Set partition key if provided
            if partition_key_value:
                item_data['partitionKey'] = partition_key_value
    
            created_item = self.container.create_item(body=item_data)
            logger.info(f"Created item with id: {created_item['id']}")
            return created_item
    
        @handle_cosmos_exceptions
        def create_multiple_items(self, items: list) -> list:
            """Create multiple items efficiently"""
            created_items = []
    
            for item in items:
                try:
                    created_item = self.create_item(item)
                    created_items.append(created_item)
                except Exception as e:
                    logger.error(f"Failed to create item {item.get('id', 'unknown')}: {e}")
                    # Continue with other items
    
            logger.info(f"Successfully created {len(created_items)} out of {len(items)} items")
            return created_items
    Python

    Read Operations

        @handle_cosmos_exceptions
        def read_item(self, item_id: str, partition_key: str) -> Optional[Dict[str, Any]]:
            """Read a specific item by id and partition key"""
            try:
                item = self.container.read_item(
                    item=item_id,
                    partition_key=partition_key
                )
                logger.info(f"Retrieved item: {item_id}")
                return item
            except exceptions.CosmosResourceNotFoundError:
                logger.warning(f"Item not found: {item_id}")
                return None
    
        @handle_cosmos_exceptions
        def read_all_items(self, max_item_count: int = None) -> list:
            """Read all items in the container"""
            items = list(self.container.read_all_items(max_item_count=max_item_count))
            logger.info(f"Retrieved {len(items)} items")
            return items
    
        @handle_cosmos_exceptions
        def read_items_by_partition(self, partition_key: str) -> list:
            """Read all items in a specific partition"""
            query = "SELECT * FROM c WHERE c.partitionKey = @partition_key"
            parameters = [{"name": "@partition_key", "value": partition_key}]
    
            items = list(self.container.query_items(
                query=query,
                parameters=parameters,
                partition_key=partition_key
            ))
    
            logger.info(f"Retrieved {len(items)} items from partition: {partition_key}")
            return items
    Python

    Update Operations

        @handle_cosmos_exceptions
        def update_item(self, item_id: str, partition_key: str, 
                       updates: Dict[str, Any]) -> Dict[str, Any]:
            """Update an existing item"""
            try:
                # First, read the current item
                current_item = self.read_item(item_id, partition_key)
                if not current_item:
                    raise ValueError(f"Item {item_id} not found")
    
                # Apply updates
                current_item.update(updates)
                current_item['_lastModified'] = datetime.utcnow().isoformat()
    
                # Replace the item
                updated_item = self.container.replace_item(
                    item=item_id,
                    body=current_item
                )
    
                logger.info(f"Updated item: {item_id}")
                return updated_item
    
            except exceptions.CosmosResourceNotFoundError:
                logger.error(f"Item not found for update: {item_id}")
                raise
    
        @handle_cosmos_exceptions
        def upsert_item(self, item_data: Dict[str, Any]) -> Dict[str, Any]:
            """Insert or update an item"""
            item_data['_lastModified'] = datetime.utcnow().isoformat()
    
            upserted_item = self.container.upsert_item(body=item_data)
            logger.info(f"Upserted item: {upserted_item['id']}")
            return upserted_item
    
        @handle_cosmos_exceptions
        def patch_item(self, item_id: str, partition_key: str, 
                       patch_operations: list) -> Dict[str, Any]:
            """Perform patch operations on an item"""
    
            # Example patch operations:
            # [
            #     {"op": "replace", "path": "/price", "value": 29.99},
            #     {"op": "add", "path": "/tags/-", "value": "new-tag"},
            #     {"op": "remove", "path": "/discontinued"}
            # ]
    
            patched_item = self.container.patch_item(
                item=item_id,
                partition_key=partition_key,
                patch_operations=patch_operations
            )
    
            logger.info(f"Patched item: {item_id}")
            return patched_item
    Python

    Delete Operations

        @handle_cosmos_exceptions
        def delete_item(self, item_id: str, partition_key: str) -> bool:
            """Delete a specific item"""
            try:
                self.container.delete_item(
                    item=item_id,
                    partition_key=partition_key
                )
                logger.info(f"Deleted item: {item_id}")
                return True
            except exceptions.CosmosResourceNotFoundError:
                logger.warning(f"Item not found for deletion: {item_id}")
                return False
    
        @handle_cosmos_exceptions
        def delete_items_by_condition(self, condition: str, parameters: list = None) -> int:
            """Delete items that match a condition"""
            # First, query to get items to delete
            query = f"SELECT c.id, c.partitionKey FROM c WHERE {condition}"
    
            items_to_delete = list(self.container.query_items(
                query=query,
                parameters=parameters or []
            ))
    
            deleted_count = 0
            for item in items_to_delete:
                try:
                    self.delete_item(item['id'], item['partitionKey'])
                    deleted_count += 1
                except Exception as e:
                    logger.error(f"Failed to delete item {item['id']}: {e}")
    
            logger.info(f"Deleted {deleted_count} items")
            return deleted_count
    Python

    Bulk Operations

    # bulk_operations.py
    from azure.cosmos import TransactionalBatch
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import threading
    
    class BulkOperations:
        def __init__(self, container_client):
            self.container = container_client
            self._lock = threading.Lock()
    
        def bulk_create_threaded(self, items: list, max_workers: int = 5) -> list:
            """Create items using multiple threads"""
            created_items = []
            failed_items = []
    
            def create_item_worker(item):
                try:
                    doc_ops = DocumentOperations(self.container)
                    return doc_ops.create_item(item)
                except Exception as e:
                    logger.error(f"Failed to create item: {e}")
                    return None
    
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                future_to_item = {
                    executor.submit(create_item_worker, item): item 
                    for item in items
                }
    
                for future in as_completed(future_to_item):
                    result = future.result()
                    if result:
                        with self._lock:
                            created_items.append(result)
                    else:
                        with self._lock:
                            failed_items.append(future_to_item[future])
    
            logger.info(f"Bulk create completed: {len(created_items)} succeeded, {len(failed_items)} failed")
            return created_items, failed_items
    
        @handle_cosmos_exceptions
        def transactional_batch_operations(self, partition_key: str, operations: list):
            """Perform multiple operations in a transaction"""
    
            # Create a transactional batch
            batch = TransactionalBatch()
    
            for operation in operations:
                op_type = operation['type']
                item_data = operation['data']
    
                if op_type == 'create':
                    batch.create_item(body=item_data)
                elif op_type == 'upsert':
                    batch.upsert_item(body=item_data)
                elif op_type == 'replace':
                    batch.replace_item(item=item_data['id'], body=item_data)
                elif op_type == 'delete':
                    batch.delete_item(item=item_data['id'])
    
            # Execute the batch
            batch_response = self.container.execute_item_batch(
                batch_operations=batch,
                partition_key=partition_key
            )
    
            logger.info(f"Executed batch with {len(operations)} operations")
            return batch_response
    Python

    7. Querying Data

    SQL Query Fundamentals

    # query_operations.py
    from typing import List, Dict, Any, Optional
    
    class QueryOperations:
        def __init__(self, container_client):
            self.container = container_client
    
        @handle_cosmos_exceptions
        def simple_query(self, query: str, parameters: List[Dict] = None, 
                        cross_partition: bool = False) -> List[Dict[str, Any]]:
            """Execute a simple SQL query"""
    
            enable_cross_partition_query = cross_partition
    
            items = list(self.container.query_items(
                query=query,
                parameters=parameters or [],
                enable_cross_partition_query=enable_cross_partition_query
            ))
    
            logger.info(f"Query returned {len(items)} items")
            return items
    
        def query_with_pagination(self, query: str, page_size: int = 100, 
                                 parameters: List[Dict] = None) -> List[Dict[str, Any]]:
            """Execute query with pagination support"""
    
            all_items = []
            continuation_token = None
    
            while True:
                query_iterable = self.container.query_items(
                    query=query,
                    parameters=parameters or [],
                    max_item_count=page_size,
                    continuation=continuation_token,
                    enable_cross_partition_query=True
                )
    
                # Get the current page
                current_page = []
                iterator = iter(query_iterable)
    
                try:
                    for _ in range(page_size):
                        current_page.append(next(iterator))
                except StopIteration:
                    pass
    
                all_items.extend(current_page)
    
                # Check if there are more pages
                continuation_token = query_iterable.continuation
                if not continuation_token or len(current_page) < page_size:
                    break
    
            logger.info(f"Paginated query returned {len(all_items)} total items")
            return all_items
    Python

    Query Examples

    class QueryExamples:
        def __init__(self, query_ops: QueryOperations):
            self.query_ops = query_ops
    
        def find_by_category(self, category: str) -> List[Dict]:
            """Find all items in a specific category"""
            query = "SELECT * FROM c WHERE c.category = @category"
            parameters = [{"name": "@category", "value": category}]
    
            return self.query_ops.simple_query(query, parameters)
    
        def find_by_price_range(self, min_price: float, max_price: float) -> List[Dict]:
            """Find items within a price range"""
            query = """
            SELECT c.id, c.name, c.price, c.category 
            FROM c 
            WHERE c.price >= @min_price AND c.price <= @max_price
            ORDER BY c.price
            """
            parameters = [
                {"name": "@min_price", "value": min_price},
                {"name": "@max_price", "value": max_price}
            ]
    
            return self.query_ops.simple_query(query, parameters, cross_partition=True)
    
        def search_by_text(self, search_term: str) -> List[Dict]:
            """Search items by text in name or description"""
            query = """
            SELECT * FROM c 
            WHERE CONTAINS(UPPER(c.name), UPPER(@search_term)) 
               OR CONTAINS(UPPER(c.description), UPPER(@search_term))
            """
            parameters = [{"name": "@search_term", "value": search_term}]
    
            return self.query_ops.simple_query(query, parameters, cross_partition=True)
    
        def aggregate_by_category(self) -> List[Dict]:
            """Get count and average price by category"""
            query = """
            SELECT 
                c.category,
                COUNT(1) as item_count,
                AVG(c.price) as avg_price,
                MIN(c.price) as min_price,
                MAX(c.price) as max_price
            FROM c 
            GROUP BY c.category
            """
    
            return self.query_ops.simple_query(query, cross_partition=True)
    
        def find_recent_items(self, days: int = 7) -> List[Dict]:
            """Find items created in the last N days"""
            query = """
            SELECT * FROM c 
            WHERE c._ts > @timestamp
            ORDER BY c._ts DESC
            """
    
            import time
            timestamp = int(time.time()) - (days * 24 * 60 * 60)
            parameters = [{"name": "@timestamp", "value": timestamp}]
    
            return self.query_ops.simple_query(query, parameters, cross_partition=True)
    
        def complex_join_query(self) -> List[Dict]:
            """Example of a complex query with subqueries"""
            query = """
            SELECT 
                c.category,
                c.name,
                c.price,
                ARRAY(
                    SELECT VALUE t 
                    FROM t IN c.tags 
                    WHERE t != "legacy"
                ) as filtered_tags
            FROM c 
            WHERE c.price > (
                SELECT VALUE AVG(p.price) 
                FROM p IN c.category
            )
            """
    
            return self.query_ops.simple_query(query, cross_partition=True)
    Python

    Query Performance Optimization

    graph TD
        A[Query Performance] --> B[Index Usage]
        A --> C[Partition Key]
        A --> D[Query Structure]
        A --> E[RU Consumption]
    
        B --> F[Included Paths]
        B --> G[Excluded Paths]
        B --> H[Composite Indexes]
    
        C --> I[Single Partition Queries]
        C --> J[Cross-Partition Queries]
    
        D --> K[SELECT Clause Optimization]
        D --> L[WHERE Clause Efficiency]
        D --> M[ORDER BY Considerations]
    
        E --> N[Query Metrics]
        E --> O[Request Charge]
        E --> P[Response Time]
    # query_optimization.py
    class QueryOptimization:
        def __init__(self, container_client):
            self.container = container_client
    
        def analyze_query_metrics(self, query: str, parameters: list = None):
            """Analyze query performance metrics"""
    
            query_iterable = self.container.query_items(
                query=query,
                parameters=parameters or [],
                enable_cross_partition_query=True,
                populate_query_metrics=True
            )
    
            # Execute query and collect metrics
            items = list(query_iterable)
    
            # Get query metrics
            metrics = query_iterable.query_metrics
    
            analysis = {
                'item_count': len(items),
                'request_charge': query_iterable.request_charge,
                'query_metrics': metrics
            }
    
            logger.info(f"Query Analysis: {analysis}")
            return analysis
    
        def optimized_pagination(self, query: str, page_size: int = 100):
            """Optimized pagination with continuation tokens"""
    
            pages = []
            continuation_token = None
            total_ru_charge = 0
    
            while True:
                query_iterable = self.container.query_items(
                    query=query,
                    max_item_count=page_size,
                    continuation=continuation_token,
                    enable_cross_partition_query=True
                )
    
                # Convert to list to execute the query
                page_items = list(query_iterable)
    
                if not page_items:
                    break
    
                pages.append({
                    'items': page_items,
                    'request_charge': query_iterable.request_charge,
                    'continuation_token': query_iterable.continuation
                })
    
                total_ru_charge += query_iterable.request_charge
                continuation_token = query_iterable.continuation
    
                if not continuation_token:
                    break
    
            logger.info(f"Retrieved {len(pages)} pages, Total RU: {total_ru_charge}")
            return pages, total_ru_charge
    Python

    8. Partitioning Strategies

    Understanding Partition Keys

    graph TB
        A[Partition Key Selection] --> B[High Cardinality]
        A --> C[Even Distribution]
        A --> D[Query Pattern Alignment]
    
        B --> E[Many Unique Values]
        B --> F[Avoid Hot Partitions]
    
        C --> G[Balanced Load]
        C --> H[Efficient Scaling]
    
        D --> I[Single Partition Queries]
        D --> J[Minimal Cross-Partition]
    
        K[Common Patterns] --> L[Entity ID]
        K --> M[Category/Type]
        K --> N[Time-based]
        K --> O[User/Tenant ID]
        K --> P[Geographic Region]
    
        Q[Anti-patterns] --> R[Sequential Values]
        Q --> S[Low Cardinality]
        Q --> T[Uneven Distribution]

    Partition Key Design Patterns

    # partitioning_strategies.py
    import hashlib
    from datetime import datetime
    from typing import Any, Dict
    
    class PartitionKeyStrategies:
    
        @staticmethod
        def user_based_partition(user_id: str) -> str:
            """Use user ID as partition key - good for user-centric applications"""
            return user_id
    
        @staticmethod
        def category_based_partition(category: str) -> str:
            """Use category as partition key - good for catalog applications"""
            return category.lower()
    
        @staticmethod
        def hash_based_partition(value: str, num_partitions: int = 100) -> str:
            """Create hash-based partition for even distribution"""
            hash_object = hashlib.md5(value.encode())
            hash_hex = hash_object.hexdigest()
            partition_num = int(hash_hex, 16) % num_partitions
            return f"partition_{partition_num:03d}"
    
        @staticmethod
        def time_based_partition(date_value: datetime = None, granularity: str = "month") -> str:
            """Create time-based partitions"""
            if not date_value:
                date_value = datetime.utcnow()
    
            if granularity == "year":
                return f"{date_value.year}"
            elif granularity == "month":
                return f"{date_value.year}_{date_value.month:02d}"
            elif granularity == "day":
                return f"{date_value.year}_{date_value.month:02d}_{date_value.day:02d}"
            else:
                raise ValueError("Granularity must be 'year', 'month', or 'day'")
    
        @staticmethod
        def composite_partition(primary: str, secondary: str, separator: str = "_") -> str:
            """Create composite partition key"""
            return f"{primary}{separator}{secondary}"
    
        @staticmethod
        def tenant_based_partition(tenant_id: str, entity_type: str = None) -> str:
            """Multi-tenant partition strategy"""
            if entity_type:
                return f"{tenant_id}_{entity_type}"
            return tenant_id
    
        @staticmethod
        def geo_based_partition(country: str, region: str = None) -> str:
            """Geographic-based partitioning"""
            if region:
                return f"{country}_{region}"
            return country
    
    class PartitionDesignHelper:
        def __init__(self):
            self.strategies = PartitionKeyStrategies()
    
        def analyze_partition_distribution(self, items: list, partition_key_func) -> Dict[str, Any]:
            """Analyze how items would be distributed across partitions"""
            partition_counts = {}
    
            for item in items:
                partition_key = partition_key_func(item)
                partition_counts[partition_key] = partition_counts.get(partition_key, 0) + 1
    
            total_items = len(items)
            num_partitions = len(partition_counts)
    
            # Calculate distribution metrics
            avg_items_per_partition = total_items / num_partitions if num_partitions > 0 else 0
            max_items = max(partition_counts.values()) if partition_counts else 0
            min_items = min(partition_counts.values()) if partition_counts else 0
    
            # Calculate skew
            skew = (max_items - min_items) / avg_items_per_partition if avg_items_per_partition > 0 else 0
    
            analysis = {
                'total_items': total_items,
                'num_partitions': num_partitions,
                'avg_items_per_partition': avg_items_per_partition,
                'max_items_in_partition': max_items,
                'min_items_in_partition': min_items,
                'distribution_skew': skew,
                'partition_distribution': partition_counts
            }
    
            return analysis
    
        def recommend_partition_strategy(self, sample_data: list, query_patterns: list) -> Dict[str, Any]:
            """Recommend partition strategy based on data and query patterns"""
    
            recommendations = []
    
            # Analyze different strategies
            strategies_to_test = [
                ("user_id", lambda item: self.strategies.user_based_partition(item.get('userId', 'unknown'))),
                ("category", lambda item: self.strategies.category_based_partition(item.get('category', 'unknown'))),
                ("hash_user", lambda item: self.strategies.hash_based_partition(item.get('userId', 'unknown'))),
                ("time_month", lambda item: self.strategies.time_based_partition(
                    datetime.fromisoformat(item.get('createdDate', datetime.utcnow().isoformat())), "month"
                )),
            ]
    
            for strategy_name, strategy_func in strategies_to_test:
                try:
                    analysis = self.analyze_partition_distribution(sample_data, strategy_func)
                    analysis['strategy_name'] = strategy_name
                    recommendations.append(analysis)
                except Exception as e:
                    logger.warning(f"Could not analyze strategy {strategy_name}: {e}")
    
            # Sort by distribution quality (lower skew is better)
            recommendations.sort(key=lambda x: x['distribution_skew'])
    
            return {
                'recommended_strategy': recommendations[0] if recommendations else None,
                'all_analyses': recommendations,
                'query_pattern_considerations': self._analyze_query_patterns(query_patterns)
            }
    
        def _analyze_query_patterns(self, query_patterns: list) -> Dict[str, Any]:
            """Analyze query patterns to understand partition key requirements"""
    
            cross_partition_queries = []
            single_partition_queries = []
    
            for pattern in query_patterns:
                if pattern.get('requires_cross_partition', False):
                    cross_partition_queries.append(pattern)
                else:
                    single_partition_queries.append(pattern)
    
            return {
                'total_patterns': len(query_patterns),
                'cross_partition_ratio': len(cross_partition_queries) / len(query_patterns) if query_patterns else 0,
                'single_partition_ratio': len(single_partition_queries) / len(query_patterns) if query_patterns else 0,
                'recommendations': [
                    "Consider time-based partitioning for time-series data",
                    "Use user/tenant ID for user-centric applications",
                    "Use hash-based partitioning for even distribution",
                    "Avoid low-cardinality partition keys"
                ]
            }
    Python

    Hot Partition Detection and Mitigation

    # hot_partition_detection.py
    import time
    from collections import defaultdict, deque
    from datetime import datetime, timedelta
    
    class HotPartitionMonitor:
        def __init__(self, container_client, monitoring_window_minutes: int = 15):
            self.container = container_client
            self.monitoring_window = timedelta(minutes=monitoring_window_minutes)
            self.partition_metrics = defaultdict(lambda: deque())
    
        def record_operation(self, partition_key: str, operation_type: str, ru_consumed: float):
            """Record an operation for hot partition analysis"""
            timestamp = datetime.utcnow()
    
            self.partition_metrics[partition_key].append({
                'timestamp': timestamp,
                'operation_type': operation_type,
                'ru_consumed': ru_consumed
            })
    
            # Clean old records
            self._cleanup_old_records(partition_key)
    
        def _cleanup_old_records(self, partition_key: str):
            """Remove records outside the monitoring window"""
            cutoff_time = datetime.utcnow() - self.monitoring_window
    
            while (self.partition_metrics[partition_key] and 
                   self.partition_metrics[partition_key][0]['timestamp'] < cutoff_time):
                self.partition_metrics[partition_key].popleft()
    
        def detect_hot_partitions(self, ru_threshold: float = 1000) -> list:
            """Detect partitions that exceed RU threshold in the monitoring window"""
            hot_partitions = []
    
            for partition_key, operations in self.partition_metrics.items():
                total_ru = sum(op['ru_consumed'] for op in operations)
    
                if total_ru > ru_threshold:
                    hot_partitions.append({
                        'partition_key': partition_key,
                        'total_ru_consumed': total_ru,
                        'operation_count': len(operations),
                        'avg_ru_per_operation': total_ru / len(operations) if operations else 0,
                        'time_window_minutes': self.monitoring_window.total_seconds() / 60
                    })
    
            return sorted(hot_partitions, key=lambda x: x['total_ru_consumed'], reverse=True)
    
        def suggest_mitigation_strategies(self, hot_partitions: list) -> Dict[str, Any]:
            """Suggest strategies to mitigate hot partitions"""
    
            strategies = []
    
            for hot_partition in hot_partitions:
                partition_strategies = []
    
                # Strategy 1: Partition key redesign
                partition_strategies.append({
                    'type': 'partition_key_redesign',
                    'description': 'Consider using a more granular or hash-based partition key',
                    'implementation': 'Add a suffix or prefix to distribute load'
                })
    
                # Strategy 2: Caching
                partition_strategies.append({
                    'type': 'caching',
                    'description': 'Implement caching for frequently accessed data',
                    'implementation': 'Use Azure Cache for Redis or in-memory caching'
                })
    
                # Strategy 3: Read replicas
                partition_strategies.append({
                    'type': 'read_replicas',
                    'description': 'Use read regions for read-heavy workloads',
                    'implementation': 'Configure multi-region reads'
                })
    
                # Strategy 4: Data archiving
                partition_strategies.append({
                    'type': 'data_archiving',
                    'description': 'Archive old data to reduce partition size',
                    'implementation': 'Move historical data to cheaper storage'
                })
    
                strategies.append({
                    'partition_key': hot_partition['partition_key'],
                    'severity': 'high' if hot_partition['total_ru_consumed'] > 5000 else 'medium',
                    'strategies': partition_strategies
                })
    
            return {
                'hot_partitions_detected': len(hot_partitions),
                'mitigation_strategies': strategies,
                'general_recommendations': [
                    'Monitor partition metrics regularly',
                    'Test partition key design with realistic data',
                    'Consider using synthetic partition keys for even distribution',
                    'Implement circuit breaker patterns for hot partitions'
                ]
            }
    Python

    9. Performance Optimization

    Request Units (RU) Optimization

    graph TD
        A[RU Optimization] --> B[Query Optimization]
        A --> C[Indexing Strategy]
        A --> D[Document Design]
        A --> E[Operation Patterns]
    
        B --> F[SELECT Clause]
        B --> G[WHERE Filters]
        B --> H[ORDER BY Usage]
        B --> I[JOINs and Subqueries]
    
        C --> J[Included Paths]
        C --> K[Excluded Paths]
        C --> L[Composite Indexes]
        C --> M[Spatial Indexes]
    
        D --> N[Document Size]
        D --> O[Property Structure]
        D --> P[Array Optimization]
    
        E --> Q[Batch Operations]
        E --> R[Bulk Imports]
        E --> S[Connection Pooling]
    # performance_optimization.py
    import time
    import statistics
    from typing import List, Dict, Any
    from concurrent.futures import ThreadPoolExecutor
    from dataclasses import dataclass
    
    @dataclass
    class PerformanceMetrics:
        operation_type: str
        duration_ms: float
        ru_consumed: float
        item_count: int
        success: bool
        error_message: str = None
    
    class PerformanceOptimizer:
        def __init__(self, container_client):
            self.container = container_client
            self.metrics_history = []
    
        def benchmark_operation(self, operation_func, *args, **kwargs) -> PerformanceMetrics:
            """Benchmark a Cosmos DB operation"""
            start_time = time.time()
            ru_consumed = 0
            success = False
            error_message = None
            item_count = 0
    
            try:
                # Execute the operation
                if hasattr(operation_func, '__self__'):
                    # Method call
                    result = operation_func(*args, **kwargs)
                else:
                    # Function call
                    result = operation_func(self.container, *args, **kwargs)
    
                success = True
    
                # Try to extract RU consumption and item count
                if hasattr(result, 'request_charge'):
                    ru_consumed = result.request_charge
                elif isinstance(result, list):
                    item_count = len(result)
                elif isinstance(result, dict) and '_request_charge' in result:
                    ru_consumed = result['_request_charge']
    
            except Exception as e:
                error_message = str(e)
    
            duration_ms = (time.time() - start_time) * 1000
    
            metrics = PerformanceMetrics(
                operation_type=operation_func.__name__,
                duration_ms=duration_ms,
                ru_consumed=ru_consumed,
                item_count=item_count,
                success=success,
                error_message=error_message
            )
    
            self.metrics_history.append(metrics)
            return metrics
    
        def optimize_query_performance(self, query: str, parameters: list = None) -> Dict[str, Any]:
            """Analyze and optimize query performance"""
    
            # Test different query variations
            optimizations = []
    
            # Original query
            original_metrics = self._test_query_performance(query, parameters)
            optimizations.append({
                'type': 'original',
                'query': query,
                'metrics': original_metrics
            })
    
            # Test with TOP clause if not present
            if 'TOP' not in query.upper():
                top_query = query.replace('SELECT', 'SELECT TOP 1000', 1)
                top_metrics = self._test_query_performance(top_query, parameters)
                optimizations.append({
                    'type': 'with_top_limit',
                    'query': top_query,
                    'metrics': top_metrics
                })
    
            # Test with specific field selection instead of *
            if 'SELECT *' in query:
                optimized_query = query.replace('SELECT *', 'SELECT c.id, c.name, c.category')
                field_metrics = self._test_query_performance(optimized_query, parameters)
                optimizations.append({
                    'type': 'specific_fields',
                    'query': optimized_query,
                    'metrics': field_metrics
                })
    
            # Sort by RU consumption
            optimizations.sort(key=lambda x: x['metrics']['avg_ru_consumed'])
    
            return {
                'optimizations': optimizations,
                'best_option': optimizations[0],
                'improvement_percentage': self._calculate_improvement(
                    optimizations[0]['metrics'], 
                    original_metrics
                )
            }
    
        def _test_query_performance(self, query: str, parameters: list = None, runs: int = 3) -> Dict[str, float]:
            """Test query performance multiple times"""
            metrics = []
    
            for _ in range(runs):
                try:
                    start_time = time.time()
    
                    query_iterable = self.container.query_items(
                        query=query,
                        parameters=parameters or [],
                        enable_cross_partition_query=True
                    )
    
                    items = list(query_iterable)
                    duration = (time.time() - start_time) * 1000
    
                    metrics.append({
                        'duration_ms': duration,
                        'ru_consumed': query_iterable.request_charge,
                        'item_count': len(items)
                    })
    
                except Exception as e:
                    logger.error(f"Query performance test failed: {e}")
    
            if not metrics:
                return {'avg_duration_ms': 0, 'avg_ru_consumed': 0, 'avg_item_count': 0}
    
            return {
                'avg_duration_ms': statistics.mean([m['duration_ms'] for m in metrics]),
                'avg_ru_consumed': statistics.mean([m['ru_consumed'] for m in metrics]),
                'avg_item_count': statistics.mean([m['item_count'] for m in metrics]),
                'min_duration_ms': min([m['duration_ms'] for m in metrics]),
                'max_duration_ms': max([m['duration_ms'] for m in metrics])
            }
    
        def optimize_bulk_operations(self, items: list, operation_type: str = 'create') -> Dict[str, Any]:
            """Optimize bulk operations performance"""
    
            strategies = []
    
            # Strategy 1: Sequential processing
            sequential_metrics = self._test_bulk_sequential(items, operation_type)
            strategies.append({
                'type': 'sequential',
                'metrics': sequential_metrics
            })
    
            # Strategy 2: Parallel processing
            parallel_metrics = self._test_bulk_parallel(items, operation_type, max_workers=5)
            strategies.append({
                'type': 'parallel_5_workers',
                'metrics': parallel_metrics
            })
    
            # Strategy 3: Batch processing
            batch_metrics = self._test_bulk_batched(items, operation_type, batch_size=25)
            strategies.append({
                'type': 'batched_25_items',
                'metrics': batch_metrics
            })
    
            # Find best strategy
            best_strategy = min(strategies, key=lambda x: x['metrics']['total_duration_ms'])
    
            return {
                'strategies_tested': strategies,
                'recommended_strategy': best_strategy,
                'performance_comparison': self._compare_strategies(strategies)
            }
    
        def _test_bulk_sequential(self, items: list, operation_type: str) -> Dict[str, float]:
            """Test sequential bulk operations"""
            start_time = time.time()
            successful_operations = 0
            total_ru = 0
    
            doc_ops = DocumentOperations(self.container)
    
            for item in items:
                try:
                    if operation_type == 'create':
                        result = doc_ops.create_item(item)
                        # Extract RU if available
                        # Note: Individual operation RU tracking would need modification to DocumentOperations
                        successful_operations += 1
                except Exception as e:
                    logger.error(f"Sequential operation failed: {e}")
    
            total_duration = (time.time() - start_time) * 1000
    
            return {
                'total_duration_ms': total_duration,
                'successful_operations': successful_operations,
                'operations_per_second': successful_operations / (total_duration / 1000) if total_duration > 0 else 0,
                'total_ru_consumed': total_ru
            }
    
        def _test_bulk_parallel(self, items: list, operation_type: str, max_workers: int) -> Dict[str, float]:
            """Test parallel bulk operations"""
            start_time = time.time()
            successful_operations = 0
    
            def process_item(item):
                try:
                    doc_ops = DocumentOperations(self.container)
                    if operation_type == 'create':
                        return doc_ops.create_item(item)
                    return None
                except Exception as e:
                    logger.error(f"Parallel operation failed: {e}")
                    return None
    
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                results = list(executor.map(process_item, items))
                successful_operations = sum(1 for r in results if r is not None)
    
            total_duration = (time.time() - start_time) * 1000
    
            return {
                'total_duration_ms': total_duration,
                'successful_operations': successful_operations,
                'operations_per_second': successful_operations / (total_duration / 1000) if total_duration > 0 else 0,
                'total_ru_consumed': 0  # Would need to aggregate from individual operations
            }
    Python

    Indexing Optimization

    # indexing_optimization.py
    class IndexingOptimizer:
        def __init__(self, container_client):
            self.container = container_client
    
        def analyze_query_index_usage(self, queries: List[str]) -> Dict[str, Any]:
            """Analyze which indexes would benefit the given queries"""
    
            index_recommendations = []
    
            for query in queries:
                # Parse query to identify used fields
                query_analysis = self._analyze_query_fields(query)
    
                # Recommend indexes
                recommendations = self._recommend_indexes_for_query(query_analysis)
    
                index_recommendations.append({
                    'query': query,
                    'analysis': query_analysis,
                    'recommended_indexes': recommendations
                })
    
            # Consolidate recommendations
            consolidated = self._consolidate_index_recommendations(index_recommendations)
    
            return {
                'individual_recommendations': index_recommendations,
                'consolidated_recommendations': consolidated,
                'suggested_indexing_policy': self._generate_indexing_policy(consolidated)
            }
    
        def _analyze_query_fields(self, query: str) -> Dict[str, Any]:
            """Analyze fields used in a query"""
            import re
    
            # Simple regex patterns to identify field usage
            where_pattern = r'WHERE\s+.*?(?:ORDER\s+BY|GROUP\s+BY|$)'
            order_pattern = r'ORDER\s+BY\s+(.*?)(?:GROUP\s+BY|$)'
    
            where_match = re.search(where_pattern, query, re.IGNORECASE | re.DOTALL)
            order_match = re.search(order_pattern, query, re.IGNORECASE)
    
            filter_fields = []
            sort_fields = []
    
            if where_match:
                # Extract field names from WHERE clause
                where_clause = where_match.group(0)
                field_pattern = r'c\.(\w+)'
                filter_fields = re.findall(field_pattern, where_clause)
    
            if order_match:
                # Extract field names from ORDER BY clause
                order_clause = order_match.group(1)
                field_pattern = r'c\.(\w+)'
                sort_fields = re.findall(field_pattern, order_clause)
    
            return {
                'filter_fields': list(set(filter_fields)),
                'sort_fields': list(set(sort_fields)),
                'requires_composite_index': len(sort_fields) > 1 or (filter_fields and sort_fields)
            }
    
        def _recommend_indexes_for_query(self, query_analysis: Dict) -> List[Dict]:
            """Recommend specific indexes for a query"""
            recommendations = []
    
            # Single field indexes for filters
            for field in query_analysis['filter_fields']:
                recommendations.append({
                    'type': 'single_field',
                    'path': f'/{field}',
                    'purpose': 'Optimize WHERE clause filtering'
                })
    
            # Composite indexes for sorting with filtering
            if query_analysis['requires_composite_index']:
                all_fields = query_analysis['filter_fields'] + query_analysis['sort_fields']
                if len(all_fields) > 1:
                    recommendations.append({
                        'type': 'composite',
                        'paths': [{'path': f'/{field}', 'order': 'ascending'} for field in all_fields],
                        'purpose': 'Optimize combined filtering and sorting'
                    })
    
            return recommendations
    
        def create_optimized_indexing_policy(self, field_usage_stats: Dict[str, int]) -> Dict:
            """Create an optimized indexing policy based on field usage statistics"""
    
            # Sort fields by usage frequency
            frequently_used = [field for field, count in field_usage_stats.items() if count > 10]
            occasionally_used = [field for field, count in field_usage_stats.items() if 5 <= count <= 10]
            rarely_used = [field for field, count in field_usage_stats.items() if count < 5]
    
            indexing_policy = {
                "indexingMode": "consistent",
                "automatic": True,
                "includedPaths": [],
                "excludedPaths": [
                    {"path": "/\"_etag\"/?"}  # Always exclude _etag
                ],
                "compositeIndexes": []
            }
    
            # Include frequently used fields
            for field in frequently_used:
                indexing_policy["includedPaths"].append({"path": f"/{field}/*"})
    
            # Exclude rarely used fields to reduce index size
            for field in rarely_used:
                indexing_policy["excludedPaths"].append({"path": f"/{field}/*"})
    
            # Add composite indexes for common query patterns
            if len(frequently_used) >= 2:
                composite_index = []
                for field in frequently_used[:3]:  # Limit to 3 fields for performance
                    composite_index.append({
                        "path": f"/{field}",
                        "order": "ascending"
                    })
                indexing_policy["compositeIndexes"].append(composite_index)
    
            return indexing_policy
    
        def _consolidate_index_recommendations(self, recommendations: list) -> Dict[str, Any]:
            """Consolidate index recommendations from multiple queries"""
            all_single_fields = set()
            all_composite_combinations = []
    
            for rec in recommendations:
                for index in rec['recommended_indexes']:
                    if index['type'] == 'single_field':
                        all_single_fields.add(index['path'])
                    elif index['type'] == 'composite':
                        all_composite_combinations.append(index['paths'])
    
            return {
                'single_field_indexes': list(all_single_fields),
                'composite_indexes': all_composite_combinations,
                'total_indexes_recommended': len(all_single_fields) + len(all_composite_combinations)
            }
    Python

    10. Consistency Levels

    Understanding Consistency Models

    graph TD
        A[Consistency Levels] --> B[Strong]
        A --> C[Bounded Staleness]
        A --> D[Session]
        A --> E[Consistent Prefix]
        A --> F[Eventual]
    
        B --> G[Linearizability]
        B --> H[Highest Latency]
        B --> I[Highest RU Cost]
    
        C --> J[Time-bounded]
        C --> K[Operation-bounded]
        C --> L[Medium Latency]
    
        D --> M[Read Your Writes]
        D --> N[Monotonic Reads]
        D --> O[Default Choice]
    
        E --> P[Prefix Consistency]
        E --> Q[Lower Latency]
        E --> R[Lower Cost]
    
        F --> S[Lowest Latency]
        F --> T[Lowest Cost]
        F --> U[No Guarantees]

    Consistency Level Implementation

    # consistency_levels.py
    from azure.cosmos import ConsistencyLevel
    from typing import Dict, Any, Optional
    
    class ConsistencyManager:
        def __init__(self, cosmos_client):
            self.client = cosmos_client
    
        def set_session_consistency(self):
            """Configure session consistency (default and recommended for most scenarios)"""
            # Session consistency is the default and doesn't require special configuration
            # It provides read-your-writes and monotonic read guarantees
            pass
    
        def demonstrate_consistency_levels(self, container):
            """Demonstrate different consistency levels with examples"""
    
            # Example item to work with
            test_item = {
                "id": "consistency-test-001",
                "name": "Test Item",
                "value": 100,
                "category": "test"
            }
    
            # Create the item
            created_item = container.create_item(body=test_item)
    
            consistency_examples = {
                ConsistencyLevel.Strong: self._test_strong_consistency,
                ConsistencyLevel.BoundedStaleness: self._test_bounded_staleness,
                ConsistencyLevel.Session: self._test_session_consistency,
                ConsistencyLevel.ConsistentPrefix: self._test_consistent_prefix,
                ConsistencyLevel.Eventual: self._test_eventual_consistency
            }
    
            results = {}
            for level, test_func in consistency_examples.items():
                try:
                    result = test_func(container, created_item)
                    results[level.name] = result
                except Exception as e:
                    results[level.name] = {"error": str(e)}
    
            return results
    
        def _test_strong_consistency(self, container, item) -> Dict[str, Any]:
            """Test strong consistency behavior"""
            # Strong consistency guarantees linearizability
            # Every read receives the most recent write
    
            import time
            start_time = time.time()
    
            # Update the item
            item['value'] = 200
            updated_item = container.replace_item(
                item=item['id'],
                body=item
            )
    
            # Immediately read the item - should always return updated value
            read_item = container.read_item(
                item=item['id'],
                partition_key=item['category']
            )
    
            end_time = time.time()
    
            return {
                "consistency_level": "Strong",
                "updated_value": updated_item['value'],
                "read_value": read_item['value'],
                "consistent": updated_item['value'] == read_item['value'],
                "latency_ms": (end_time - start_time) * 1000,
                "guarantees": [
                    "Linearizability",
                    "Read-your-writes",
                    "Monotonic reads",
                    "Monotonic writes",
                    "Consistent prefix"
                ]
            }
    
        def _test_session_consistency(self, container, item) -> Dict[str, Any]:
            """Test session consistency behavior"""
            # Session consistency provides read-your-writes guarantee
            # within the same session (connection)
    
            import time
            start_time = time.time()
    
            # Update the item
            item['value'] = 300
            updated_item = container.replace_item(
                item=item['id'],
                body=item
            )
    
            # Read from the same session - should see the update
            read_item = container.read_item(
                item=item['id'],
                partition_key=item['category']
            )
    
            end_time = time.time()
    
            return {
                "consistency_level": "Session",
                "updated_value": updated_item['value'],
                "read_value": read_item['value'],
                "consistent": updated_item['value'] == read_item['value'],
                "latency_ms": (end_time - start_time) * 1000,
                "guarantees": [
                    "Read-your-writes (same session)",
                    "Monotonic reads",
                    "Consistent prefix"
                ]
            }
    
        def _test_eventual_consistency(self, container, item) -> Dict[str, Any]:
            """Test eventual consistency behavior"""
            # Eventual consistency provides no ordering guarantees
            # but offers the lowest latency and cost
    
            import time
            start_time = time.time()
    
            # Update the item
            item['value'] = 400
            updated_item = container.replace_item(
                item=item['id'],
                body=item
            )
    
            # Read might not immediately reflect the update
            read_item = container.read_item(
                item=item['id'],
                partition_key=item['category']
            )
    
            end_time = time.time()
    
            return {
                "consistency_level": "Eventual",
                "updated_value": updated_item['value'],
                "read_value": read_item['value'],
                "consistent": updated_item['value'] == read_item['value'],
                "latency_ms": (end_time - start_time) * 1000,
                "guarantees": [
                    "No ordering guarantees",
                    "Lowest latency",
                    "Lowest cost"
                ]
            }
    
    class ConsistencyLevelSelector:
        """Helper class to choose appropriate consistency level"""
    
        @staticmethod
        def recommend_consistency_level(requirements: Dict[str, Any]) -> Dict[str, Any]:
            """Recommend consistency level based on application requirements"""
    
            # Analyze requirements
            needs_strong_consistency = requirements.get('financial_transactions', False)
            needs_read_your_writes = requirements.get('user_generated_content', False)
            latency_sensitive = requirements.get('real_time_applications', False)
            cost_sensitive = requirements.get('cost_optimization', False)
            global_distribution = requirements.get('multi_region', False)
    
            recommendations = []
    
            if needs_strong_consistency:
                recommendations.append({
                    'level': 'Strong',
                    'reason': 'Financial transactions require linearizability',
                    'trade_offs': 'Higher latency and cost, lower availability'
                })
            elif needs_read_your_writes and not latency_sensitive:
                recommendations.append({
                    'level': 'Session',
                    'reason': 'User content needs read-your-writes guarantee',
                    'trade_offs': 'Balanced latency, cost, and consistency'
                })
            elif latency_sensitive and cost_sensitive:
                recommendations.append({
                    'level': 'Eventual',
                    'reason': 'Optimized for latency and cost',
                    'trade_offs': 'No consistency guarantees'
                })
            elif global_distribution:
                recommendations.append({
                    'level': 'Bounded Staleness',
                    'reason': 'Good for global applications with some staleness tolerance',
                    'trade_offs': 'Configurable staleness bounds'
                })
            else:
                recommendations.append({
                    'level': 'Session',
                    'reason': 'Default recommendation for most applications',
                    'trade_offs': 'Good balance of consistency, performance, and cost'
                })
    
            return {
                'recommendations': recommendations,
                'factors_considered': requirements,
                'default_choice': 'Session'
            }
    Python

    11. Change Feed Processing

    Understanding Change Feed

    graph LR
        A[Cosmos DB Container] --> B[Change Feed]
        B --> C[Change Feed Processor]
        C --> D[Application Logic]
    
        B --> E[Insert Events]
        B --> F[Update Events]
        B --> G[Replace Events]
    
        C --> H[Lease Container]
        C --> I[Checkpoint Management]
        C --> J[Load Balancing]
    
        D --> K[Real-time Analytics]
        D --> L[Event Sourcing]
        D --> M[Data Synchronization]
        D --> N[Audit Logging]

    Change Feed Implementation

    # change_feed_processor.py
    import asyncio
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos import PartitionKey
    from typing import Dict, Any, List, Callable
    import logging
    import json
    from datetime import datetime
    
    logger = logging.getLogger(__name__)
    
    class ChangeFeedProcessor:
        def __init__(self, config: CosmosConfig):
            self.config = config
            self.client = None
            self.monitored_container = None
            self.lease_container = None
            self.change_handlers = []
    
        async def initialize(self):
            """Initialize the change feed processor"""
            self.client = CosmosClient(
                url=self.config.endpoint,
                credential=self.config.key
            )
    
            # Get the monitored container
            database = self.client.get_database_client(self.config.database_name)
            self.monitored_container = database.get_container_client(self.config.container_name)
    
            # Create or get lease container
            self.lease_container = await self._setup_lease_container(database)
    
            logger.info("Change feed processor initialized")
    
        async def _setup_lease_container(self, database):
            """Setup lease container for change feed processor"""
            lease_container_name = f"{self.config.container_name}-leases"
    
            try:
                lease_container = await database.create_container_if_not_exists(
                    id=lease_container_name,
                    partition_key=PartitionKey(path="/id"),
                    offer_throughput=400
                )
                logger.info(f"Lease container '{lease_container_name}' ready")
                return lease_container
            except Exception as e:
                logger.error(f"Failed to create lease container: {e}")
                raise
    
        def add_change_handler(self, handler: Callable[[List[Dict[str, Any]]], None]):
            """Add a handler function for processing changes"""
            self.change_handlers.append(handler)
    
        async def start_processing(self, processor_name: str = "default-processor"):
            """Start processing the change feed"""
            if not self.client:
                await self.initialize()
    
            logger.info(f"Starting change feed processor: {processor_name}")
    
            try:
                # Use the change feed processor
                async with self.client:
                    change_feed_iterator = self.monitored_container.query_items_change_feed(
                        is_start_from_beginning=True,
                        partition_key_range_id="0"
                    )
    
                    async for changes in change_feed_iterator:
                        if changes:
                            await self._process_changes(changes)
    
            except Exception as e:
                logger.error(f"Error in change feed processing: {e}")
                raise
    
        async def _process_changes(self, changes: List[Dict[str, Any]]):
            """Process a batch of changes"""
            try:
                logger.info(f"Processing {len(changes)} changes")
    
                # Call all registered handlers
                for handler in self.change_handlers:
                    try:
                        if asyncio.iscoroutinefunction(handler):
                            await handler(changes)
                        else:
                            handler(changes)
                    except Exception as e:
                        logger.error(f"Error in change handler: {e}")
    
            except Exception as e:
                logger.error(f"Error processing changes: {e}")
                raise
    
    # Example change handlers
    class ChangeHandlers:
    
        @staticmethod
        def audit_logger(changes: List[Dict[str, Any]]):
            """Log all changes for audit purposes"""
            for change in changes:
                audit_entry = {
                    'timestamp': datetime.utcnow().isoformat(),
                    'operation': 'change_detected',
                    'document_id': change.get('id'),
                    'partition_key': change.get('partitionKey'),
                    'change_type': 'update' if '_ts' in change else 'insert'
                }
                logger.info(f"AUDIT: {json.dumps(audit_entry)}")
    
        @staticmethod
        async def real_time_analytics(changes: List[Dict[str, Any]]):
            """Process changes for real-time analytics"""
            for change in changes:
                # Example: Update analytics counters
                if change.get('category') == 'product':
                    await ChangeHandlers._update_product_analytics(change)
                elif change.get('category') == 'order':
                    await ChangeHandlers._update_order_analytics(change)
    
        @staticmethod
        async def _update_product_analytics(product_change: Dict[str, Any]):
            """Update product-related analytics"""
            # Example analytics update
            logger.info(f"Updating product analytics for: {product_change.get('id')}")
    
        @staticmethod
        async def _update_order_analytics(order_change: Dict[str, Any]):
            """Update order-related analytics"""
            # Example analytics update
            logger.info(f"Updating order analytics for: {order_change.get('id')}")
    
        @staticmethod
        def data_synchronization(changes: List[Dict[str, Any]]):
            """Synchronize data to external systems"""
            for change in changes:
                # Example: Sync to search index, cache, or other databases
                sync_data = {
                    'id': change.get('id'),
                    'operation': 'upsert',
                    'data': change,
                    'timestamp': datetime.utcnow().isoformat()
                }
                logger.info(f"Syncing data: {sync_data['id']}")
    
        @staticmethod
        def event_sourcing(changes: List[Dict[str, Any]]):
            """Process changes for event sourcing pattern"""
            for change in changes:
                event = {
                    'event_id': f"event_{change.get('id')}_{change.get('_ts')}",
                    'aggregate_id': change.get('id'),
                    'event_type': 'DocumentChanged',
                    'event_data': change,
                    'timestamp': datetime.utcnow().isoformat(),
                    'version': change.get('_ts')
                }
                logger.info(f"Event sourcing: {event['event_id']}")
    
    # Change feed monitoring and management
    class ChangeFeedMonitor:
        def __init__(self, change_feed_processor: ChangeFeedProcessor):
            self.processor = change_feed_processor
            self.is_running = False
    
        async def start_monitoring(self):
            """Start monitoring the change feed"""
            self.is_running = True
    
            # Add built-in handlers
            self.processor.add_change_handler(ChangeHandlers.audit_logger)
            self.processor.add_change_handler(ChangeHandlers.real_time_analytics)
    
            try:
                await self.processor.start_processing("production-processor")
            except Exception as e:
                logger.error(f"Change feed monitoring failed: {e}")
                self.is_running = False
                raise
    
        async def stop_monitoring(self):
            """Stop monitoring the change feed"""
            self.is_running = False
            logger.info("Change feed monitoring stopped")
    
        def get_monitoring_status(self) -> Dict[str, Any]:
            """Get the current monitoring status"""
            return {
                'is_running': self.is_running,
                'handlers_count': len(self.processor.change_handlers),
                'processor_status': 'active' if self.is_running else 'stopped'
            }
    
    # Usage example
    async def example_change_feed_usage():
        """Example of how to use the change feed processor"""
        config = CosmosConfig()
    
        # Initialize change feed processor
        change_processor = ChangeFeedProcessor(config)
        monitor = ChangeFeedMonitor(change_processor)
    
        # Add custom handler
        def custom_handler(changes):
            print(f"Custom handler received {len(changes)} changes")
    
        change_processor.add_change_handler(custom_handler)
    
        # Start monitoring
        try:
            await monitor.start_monitoring()
        except KeyboardInterrupt:
            await monitor.stop_monitoring()
            print("Change feed processing stopped")
    
    # Run the example
    if __name__ == "__main__":
        asyncio.run(example_change_feed_usage())
    Python

    12. Stored Procedures and Triggers

    Understanding Server-Side Programming

    graph TD
        A[Server-Side Programming] --> B[Stored Procedures]
        A --> C[User Defined Functions]
        A --> D[Triggers]
    
        B --> E[Transactional Operations]
        B --> F[Complex Business Logic]
        B --> G[Batch Processing]
    
        C --> H[Query Helper Functions]
        C --> I[Data Transformation]
        C --> J[Reusable Logic]
    
        D --> K[Pre-triggers]
        D --> L[Post-triggers]
        D --> M[Data Validation]
        D --> N[Audit Logging]

    Stored Procedures Implementation

    # stored_procedures.py
    from azure.cosmos import scripts
    import json
    from typing import Dict, Any, List
    
    class StoredProcedureManager:
        def __init__(self, container_client):
            self.container = container_client
    
        def create_bulk_import_procedure(self):
            """Create a stored procedure for efficient bulk imports"""
    
            # JavaScript code for the stored procedure
            bulk_import_sproc = """
            function bulkImport(docs) {
                var collection = getContext().getCollection();
                var collectionLink = collection.getSelfLink();
                var response = getContext().getResponse();
                var count = 0;
    
                if (!docs) throw new Error("The array is undefined or null.");
    
                var docsLength = docs.length;
                if (docsLength == 0) {
                    response.setBody(0);
                    return;
                }
    
                // Try to create each document
                tryCreate(docs[count], callback);
    
                function tryCreate(doc, callback) {
                    var isAccepted = collection.createDocument(collectionLink, doc, callback);
    
                    if (!isAccepted) {
                        response.setBody(count);
                    }
                }
    
                function callback(err, doc, options) {
                    if (err) throw err;
    
                    count++;
                    if (count >= docsLength) {
                        response.setBody(count);
                    } else {
                        tryCreate(docs[count], callback);
                    }
                }
            }
            """
    
            try:
                sproc_definition = {
                    'id': 'bulkImport',
                    'body': bulk_import_sproc
                }
    
                # Create the stored procedure
                created_sproc = self.container.scripts.create_stored_procedure(
                    body=sproc_definition
                )
    
                logger.info("Bulk import stored procedure created successfully")
                return created_sproc
    
            except Exception as e:
                logger.error(f"Failed to create stored procedure: {e}")
                raise
    
        def create_conditional_update_procedure(self):
            """Create a stored procedure for conditional updates"""
    
            conditional_update_sproc = """
            function conditionalUpdate(id, partitionKey, updates, condition) {
                var collection = getContext().getCollection();
                var collectionLink = collection.getSelfLink();
                var response = getContext().getResponse();
    
                // Read the document
                var isAccepted = collection.readDocument(
                    `${collectionLink}/docs/${id}`,
                    { partitionKey: partitionKey },
                    function(err, doc) {
                        if (err) {
                            if (err.number == 404) {
                                response.setBody({ error: "Document not found" });
                                return;
                            }
                            throw err;
                        }
    
                        // Check condition
                        if (evaluateCondition(doc, condition)) {
                            // Apply updates
                            for (var key in updates) {
                                doc[key] = updates[key];
                            }
    
                            // Update timestamp
                            doc._lastModified = new Date().toISOString();
    
                            // Replace the document
                            var replaceAccepted = collection.replaceDocument(
                                doc._self,
                                doc,
                                function(replaceErr, updatedDoc) {
                                    if (replaceErr) throw replaceErr;
                                    response.setBody({
                                        success: true,
                                        document: updatedDoc
                                    });
                                }
                            );
    
                            if (!replaceAccepted) {
                                response.setBody({ error: "Replace not accepted" });
                            }
                        } else {
                            response.setBody({
                                success: false,
                                reason: "Condition not met"
                            });
                        }
                    }
                );
    
                if (!isAccepted) {
                    response.setBody({ error: "Read not accepted" });
                }
    
                function evaluateCondition(doc, condition) {
                    // Simple condition evaluation
                    if (condition.field && condition.operator && condition.value !== undefined) {
                        var fieldValue = doc[condition.field];
    
                        switch (condition.operator) {
                            case "==":
                                return fieldValue == condition.value;
                            case "!=":
                                return fieldValue != condition.value;
                            case ">":
                                return fieldValue > condition.value;
                            case "<":
                                return fieldValue < condition.value;
                            case ">=":
                                return fieldValue >= condition.value;
                            case "<=":
                                return fieldValue <= condition.value;
                            default:
                                return true;
                        }
                    }
                    return true;
                }
            }
            """
    
            try:
                sproc_definition = {
                    'id': 'conditionalUpdate',
                    'body': conditional_update_sproc
                }
    
                created_sproc = self.container.scripts.create_stored_procedure(
                    body=sproc_definition
                )
    
                logger.info("Conditional update stored procedure created successfully")
                return created_sproc
    
            except Exception as e:
                logger.error(f"Failed to create conditional update procedure: {e}")
                raise
    
        def execute_bulk_import(self, documents: List[Dict[str, Any]], partition_key: str):
            """Execute the bulk import stored procedure"""
            try:
                result = self.container.scripts.execute_stored_procedure(
                    sproc='bulkImport',
                    params=[documents],
                    partition_key=partition_key
                )
    
                logger.info(f"Bulk import completed. Documents imported: {result}")
                return result
    
            except Exception as e:
                logger.error(f"Bulk import failed: {e}")
                raise
    
        def execute_conditional_update(self, item_id: str, partition_key: str, 
                                     updates: Dict[str, Any], condition: Dict[str, Any]):
            """Execute conditional update stored procedure"""
            try:
                result = self.container.scripts.execute_stored_procedure(
                    sproc='conditionalUpdate',
                    params=[item_id, partition_key, updates, condition],
                    partition_key=partition_key
                )
    
                logger.info(f"Conditional update result: {result}")
                return result
    
            except Exception as e:
                logger.error(f"Conditional update failed: {e}")
                raise
    
    class TriggerManager:
        def __init__(self, container_client):
            self.container = container_client
    
        def create_audit_trigger(self):
            """Create a pre-trigger for auditing"""
    
            audit_trigger = """
            function auditTrigger() {
                var context = getContext();
                var request = context.getRequest();
                var item = request.getBody();
    
                // Add audit fields
                item._createdBy = "system";
                item._createdAt = new Date().toISOString();
                item._version = 1;
    
                // Set the updated body
                request.setBody(item);
            }
            """
    
            try:
                trigger_definition = {
                    'id': 'auditTrigger',
                    'body': audit_trigger,
                    'triggerType': scripts.TriggerType.Pre,
                    'triggerOperation': scripts.TriggerOperation.Create
                }
    
                created_trigger = self.container.scripts.create_trigger(
                    body=trigger_definition
                )
    
                logger.info("Audit trigger created successfully")
                return created_trigger
    
            except Exception as e:
                logger.error(f"Failed to create audit trigger: {e}")
                raise
    
        def create_validation_trigger(self):
            """Create a pre-trigger for data validation"""
    
            validation_trigger = """
            function validationTrigger() {
                var context = getContext();
                var request = context.getRequest();
                var item = request.getBody();
    
                // Validation rules
                if (!item.id) {
                    throw new Error("Item must have an id");
                }
    
                if (!item.category) {
                    throw new Error("Item must have a category");
                }
    
                if (item.price && item.price < 0) {
                    throw new Error("Price cannot be negative");
                }
    
                // Add validation timestamp
                item._validatedAt = new Date().toISOString();
    
                request.setBody(item);
            }
            """
    
            try:
                trigger_definition = {
                    'id': 'validationTrigger',
                    'body': validation_trigger,
                    'triggerType': scripts.TriggerType.Pre,
                    'triggerOperation': scripts.TriggerOperation.All
                }
    
                created_trigger = self.container.scripts.create_trigger(
                    body=trigger_definition
                )
    
                logger.info("Validation trigger created successfully")
                return created_trigger
    
            except Exception as e:
                logger.error(f"Failed to create validation trigger: {e}")
                raise
    
    class UserDefinedFunctionManager:
        def __init__(self, container_client):
            self.container = container_client
    
        def create_tax_calculation_udf(self):
            """Create a UDF for tax calculations"""
    
            tax_calculation_udf = """
            function calculateTax(price, taxRate, region) {
                if (!price || price <= 0) return 0;
                if (!taxRate || taxRate <= 0) return 0;
    
                var baseTax = price * taxRate;
    
                // Regional adjustments
                switch (region) {
                    case "EU":
                        return baseTax * 1.2; // VAT adjustment
                    case "US":
                        return baseTax; // Standard rate
                    case "CA":
                        return baseTax * 1.1; // GST adjustment
                    default:
                        return baseTax;
                }
            }
            """
    
            try:
                udf_definition = {
                    'id': 'calculateTax',
                    'body': tax_calculation_udf
                }
    
                created_udf = self.container.scripts.create_user_defined_function(
                    body=udf_definition
                )
    
                logger.info("Tax calculation UDF created successfully")
                return created_udf
    
            except Exception as e:
                logger.error(f"Failed to create UDF: {e}")
                raise
    
        def create_distance_calculation_udf(self):
            """Create a UDF for distance calculations"""
    
            distance_udf = """
            function calculateDistance(lat1, lon1, lat2, lon2) {
                var R = 6371; // Earth's radius in kilometers
                var dLat = (lat2 - lat1) * Math.PI / 180;
                var dLon = (lon2 - lon1) * Math.PI / 180;
    
                var a = Math.sin(dLat/2) * Math.sin(dLat/2) +
                        Math.cos(lat1 * Math.PI / 180) * Math.cos(lat2 * Math.PI / 180) *
                        Math.sin(dLon/2) * Math.sin(dLon/2);
    
                var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
                var distance = R * c;
    
                return distance;
            }
            """
    
            try:
                udf_definition = {
                    'id': 'calculateDistance',
                    'body': distance_udf
                }
    
                created_udf = self.container.scripts.create_user_defined_function(
                    body=udf_definition
                )
    
                logger.info("Distance calculation UDF created successfully")
                return created_udf
    
            except Exception as e:
                logger.error(f"Failed to create distance UDF: {e}")
                raise
    
    # Usage example
    def example_server_side_programming():
        """Example of using stored procedures, triggers, and UDFs"""
    
        # Assume we have a container client
        container = get_container_client()  # Your container client
    
        # Initialize managers
        sproc_manager = StoredProcedureManager(container)
        trigger_manager = TriggerManager(container)
        udf_manager = UserDefinedFunctionManager(container)
    
        try:
            # Create server-side objects
            sproc_manager.create_bulk_import_procedure()
            sproc_manager.create_conditional_update_procedure()
            trigger_manager.create_audit_trigger()
            trigger_manager.create_validation_trigger()
            udf_manager.create_tax_calculation_udf()
    
            # Example usage of bulk import
            sample_docs = [
                {"id": "1", "name": "Product 1", "price": 10.99, "category": "electronics"},
                {"id": "2", "name": "Product 2", "price": 25.50, "category": "electronics"},
                {"id": "3", "name": "Product 3", "price": 15.75, "category": "electronics"}
            ]
    
            result = sproc_manager.execute_bulk_import(sample_docs, "electronics")
            print(f"Imported {result} documents")
    
            # Example usage of conditional update
            condition = {"field": "price", "operator": "<", "value": 20.0}
            updates = {"discounted": True, "discount_rate": 0.1}
    
            update_result = sproc_manager.execute_conditional_update(
                "1", "electronics", updates, condition
            )
            print(f"Update result: {update_result}")
    
            # Example query using UDF
            query_with_udf = """
            SELECT c.id, c.name, c.price, 
                   udf.calculateTax(c.price, 0.08, "US") as tax_amount
            FROM c 
            WHERE c.category = "electronics"
            """
    
            items = list(container.query_items(
                query=query_with_udf,
                enable_cross_partition_query=True
            ))
    
            for item in items:
                print(f"Product: {item['name']}, Tax: ${item['tax_amount']:.2f}")
    
        except Exception as e:
            logger.error(f"Server-side programming example failed: {e}")
    Python

    13. Security and Authentication

    Authentication Methods

    graph TD
        A[Authentication Methods] --> B[Primary/Secondary Keys]
        A --> C[Azure Active Directory]
        A --> D[Resource Tokens]
        A --> E[Managed Identity]
    
        B --> F[Full Access]
        B --> G[Account Level]
        B --> H[Rotation Required]
    
        C --> I[Role-Based Access]
        C --> J[User Authentication]
        C --> K[Service Principal]
    
        D --> L[Fine-Grained Access]
        D --> M[Time-Limited]
        D --> N[Resource-Specific]
    
        E --> O[Azure Resources]
        E --> P[No Key Management]
        E --> Q[Automatic Rotation]

    Security Implementation

    # security_manager.py
    from azure.identity import DefaultAzureCredential, ClientSecretCredential
    from azure.keyvault.secrets import SecretClient
    from azure.cosmos import CosmosClient, exceptions
    from typing import Dict, Any, Optional
    import hashlib
    import hmac
    import base64
    from datetime import datetime, timedelta
    
    class SecurityManager:
        def __init__(self, config: CosmosConfig):
            self.config = config
            self.key_vault_client = None
    
        def setup_azure_ad_authentication(self, tenant_id: str, client_id: str, client_secret: str):
            """Setup Azure AD authentication"""
            try:
                credential = ClientSecretCredential(
                    tenant_id=tenant_id,
                    client_id=client_id,
                    client_secret=client_secret
                )
    
                # Create Cosmos client with Azure AD
                cosmos_client = CosmosClient(
                    url=self.config.endpoint,
                    credential=credential
                )
    
                logger.info("Azure AD authentication configured successfully")
                return cosmos_client
    
            except Exception as e:
                logger.error(f"Failed to setup Azure AD authentication: {e}")
                raise
    
        def setup_managed_identity_authentication(self):
            """Setup Managed Identity authentication"""
            try:
                # Use DefaultAzureCredential which automatically handles managed identity
                credential = DefaultAzureCredential()
    
                cosmos_client = CosmosClient(
                    url=self.config.endpoint,
                    credential=credential
                )
    
                logger.info("Managed Identity authentication configured successfully")
                return cosmos_client
    
            except Exception as e:
                logger.error(f"Failed to setup Managed Identity authentication: {e}")
                raise
    
        def setup_key_vault_integration(self, key_vault_url: str):
            """Setup Azure Key Vault for secret management"""
            try:
                credential = DefaultAzureCredential()
                self.key_vault_client = SecretClient(
                    vault_url=key_vault_url,
                    credential=credential
                )
    
                logger.info("Key Vault integration configured successfully")
    
            except Exception as e:
                logger.error(f"Failed to setup Key Vault integration: {e}")
                raise
    
        def get_secret_from_key_vault(self, secret_name: str) -> Optional[str]:
            """Retrieve secret from Azure Key Vault"""
            try:
                if not self.key_vault_client:
                    raise ValueError("Key Vault client not initialized")
    
                secret = self.key_vault_client.get_secret(secret_name)
                logger.info(f"Successfully retrieved secret: {secret_name}")
                return secret.value
    
            except Exception as e:
                logger.error(f"Failed to retrieve secret {secret_name}: {e}")
                return None
    
        def create_resource_token(self, user_id: str, resource_path: str, 
                                permission_mode: str = "read", duration_hours: int = 1):
            """Create a resource token for fine-grained access control"""
            try:
                # Note: This is a simplified example. In practice, you'd use Cosmos DB's
                # permission system to create actual resource tokens
    
                expiry_time = datetime.utcnow() + timedelta(hours=duration_hours)
    
                token_data = {
                    'user_id': user_id,
                    'resource_path': resource_path,
                    'permission_mode': permission_mode,
                    'expires_at': expiry_time.isoformat()
                }
    
                # Create a signed token (simplified version)
                token_string = base64.b64encode(str(token_data).encode()).decode()
    
                logger.info(f"Created resource token for user: {user_id}")
                return {
                    'token': token_string,
                    'expires_at': expiry_time,
                    'permissions': permission_mode
                }
    
            except Exception as e:
                logger.error(f"Failed to create resource token: {e}")
                raise
    
    class DataEncryption:
        """Handle client-side encryption for sensitive data"""
    
        def __init__(self, encryption_key: str):
            self.encryption_key = encryption_key.encode()
    
        def encrypt_field(self, value: str) -> str:
            """Encrypt a field value"""
            try:
                import cryptography.fernet as fernet
    
                f = fernet.Fernet(base64.urlsafe_b64encode(self.encryption_key[:32]))
                encrypted_value = f.encrypt(value.encode())
    
                return base64.b64encode(encrypted_value).decode()
    
            except Exception as e:
                logger.error(f"Encryption failed: {e}")
                raise
    
        def decrypt_field(self, encrypted_value: str) -> str:
            """Decrypt a field value"""
            try:
                import cryptography.fernet as fernet
    
                f = fernet.Fernet(base64.urlsafe_b64encode(self.encryption_key[:32]))
                encrypted_bytes = base64.b64decode(encrypted_value.encode())
                decrypted_value = f.decrypt(encrypted_bytes)
    
                return decrypted_value.decode()
    
            except Exception as e:
                logger.error(f"Decryption failed: {e}")
                raise
    
        def encrypt_document(self, document: Dict[str, Any], 
                            fields_to_encrypt: List[str]) -> Dict[str, Any]:
            """Encrypt specified fields in a document"""
            encrypted_doc = document.copy()
    
            for field in fields_to_encrypt:
                if field in encrypted_doc:
                    encrypted_doc[field] = self.encrypt_field(str(encrypted_doc[field]))
                    encrypted_doc[f"{field}_encrypted"] = True
    
            return encrypted_doc
    
        def decrypt_document(self, document: Dict[str, Any], 
                            fields_to_decrypt: List[str]) -> Dict[str, Any]:
            """Decrypt specified fields in a document"""
            decrypted_doc = document.copy()
    
            for field in fields_to_decrypt:
                if field in decrypted_doc and document.get(f"{field}_encrypted"):
                    decrypted_doc[field] = self.decrypt_field(decrypted_doc[field])
                    decrypted_doc.pop(f"{field}_encrypted", None)
    
            return decrypted_doc
    
    class AccessControlManager:
        """Manage role-based access control"""
    
        def __init__(self):
            self.user_roles = {}
            self.role_permissions = {
                'admin': ['read', 'write', 'delete', 'manage'],
                'editor': ['read', 'write'],
                'viewer': ['read'],
                'guest': []
            }
    
        def assign_role(self, user_id: str, role: str):
            """Assign a role to a user"""
            if role not in self.role_permissions:
                raise ValueError(f"Invalid role: {role}")
    
            self.user_roles[user_id] = role
            logger.info(f"Assigned role '{role}' to user '{user_id}'")
    
        def check_permission(self, user_id: str, required_permission: str) -> bool:
            """Check if user has required permission"""
            user_role = self.user_roles.get(user_id)
            if not user_role:
                return False
    
            role_permissions = self.role_permissions.get(user_role, [])
            return required_permission in role_permissions
    
        def get_user_permissions(self, user_id: str) -> List[str]:
            """Get all permissions for a user"""
            user_role = self.user_roles.get(user_id)
            if not user_role:
                return []
    
            return self.role_permissions.get(user_role, [])
    
    class AuditLogger:
        """Audit logging for security compliance"""
    
        def __init__(self, container_client):
            self.container = container_client
    
        def log_access_attempt(self, user_id: str, resource: str, 
                              action: str, success: bool, details: Dict[str, Any] = None):
            """Log access attempts for auditing"""
            audit_entry = {
                'id': f"audit_{datetime.utcnow().isoformat()}_{user_id}",
                'timestamp': datetime.utcnow().isoformat(),
                'user_id': user_id,
                'resource': resource,
                'action': action,
                'success': success,
                'details': details or {},
                'category': 'audit',
                'ip_address': details.get('ip_address') if details else None,
                'user_agent': details.get('user_agent') if details else None
            }
    
            try:
                self.container.create_item(body=audit_entry)
                logger.info(f"Audit log created for user {user_id}")
    
            except Exception as e:
                logger.error(f"Failed to create audit log: {e}")
    
        def query_audit_logs(self, user_id: str = None, start_date: datetime = None, 
                            end_date: datetime = None) -> List[Dict[str, Any]]:
            """Query audit logs with filters"""
            query_parts = ["SELECT * FROM c WHERE c.category = 'audit'"]
            parameters = []
    
            if user_id:
                query_parts.append("AND c.user_id = @user_id")
                parameters.append({"name": "@user_id", "value": user_id})
    
            if start_date:
                query_parts.append("AND c.timestamp >= @start_date")
                parameters.append({"name": "@start_date", "value": start_date.isoformat()})
    
            if end_date:
                query_parts.append("AND c.timestamp <= @end_date")
                parameters.append({"name": "@end_date", "value": end_date.isoformat()})
    
            query = " ".join(query_parts) + " ORDER BY c.timestamp DESC"
    
            try:
                items = list(self.container.query_items(
                    query=query,
                    parameters=parameters,
                    enable_cross_partition_query=True
                ))
    
                logger.info(f"Retrieved {len(items)} audit log entries")
                return items
    
            except Exception as e:
                logger.error(f"Failed to query audit logs: {e}")
                return []
    
    # Security best practices implementation
    class SecurityBestPractices:
    
        @staticmethod
        def validate_input(data: Dict[str, Any]) -> Dict[str, Any]:
            """Validate and sanitize input data"""
            # Remove potentially dangerous fields
            dangerous_fields = ['_self', '_etag', '_attachments', '_ts']
            sanitized_data = {k: v for k, v in data.items() if k not in dangerous_fields}
    
            # Validate required fields
            if 'id' not in sanitized_data or not sanitized_data['id']:
                raise ValueError("Document must have a valid id")
    
            # Sanitize string fields
            for key, value in sanitized_data.items():
                if isinstance(value, str):
                    # Basic XSS prevention
                    sanitized_data[key] = value.replace('<', '<').replace('>', '>')
    
            return sanitized_data
    
        @staticmethod
        def generate_secure_id() -> str:
            """Generate a secure, random document ID"""
            import uuid
            return str(uuid.uuid4())
    
        @staticmethod
        def hash_sensitive_data(data: str, salt: str = None) -> str:
            """Hash sensitive data like passwords"""
            if not salt:
                import os
                salt = base64.b64encode(os.urandom(32)).decode()
    
            hashed = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000)
            return base64.b64encode(hashed).decode()
    
        @staticmethod
        def verify_hash(data: str, hashed_data: str, salt: str) -> bool:
            """Verify hashed data"""
            return SecurityBestPractices.hash_sensitive_data(data, salt) == hashed_data
    
    # Usage example
    def security_implementation_example():
        """Example of implementing security features"""
        config = CosmosConfig()
    
        # Setup security manager
        security_manager = SecurityManager(config)
    
        # Setup encryption
        encryption_key = "your-encryption-key-here"  # Should be from Key Vault
        encryptor = DataEncryption(encryption_key)
    
        # Setup access control
        access_control = AccessControlManager()
        access_control.assign_role("user123", "editor")
    
        # Example document with sensitive data
        sensitive_doc = {
            "id": SecurityBestPractices.generate_secure_id(),
            "name": "John Doe",
            "email": "john.doe@example.com",
            "ssn": "123-45-6789",  # Sensitive field
            "credit_card": "1234-5678-9012-3456"  # Sensitive field
        }
    
        # Encrypt sensitive fields
        encrypted_doc = encryptor.encrypt_document(
            sensitive_doc, 
            ['ssn', 'credit_card']
        )
    
        print("Security implementation example completed")
        return encrypted_doc
    Python

    14. Monitoring and Troubleshooting

    Azure Monitor Integration

    graph TD
        A[Azure Cosmos DB] --> B[Metrics & Logs]
        B --> C[Azure Monitor]
        C --> D[Application Insights]
        C --> E[Log Analytics]
        C --> F[Alerts & Notifications]
    
        G[Key Metrics] --> H[Request Units]
        G --> I[Latency]
        G --> J[Availability]
        G --> K[Storage Usage]
        G --> L[Throttling Rate]
    
        M[Diagnostic Logs] --> N[DataPlaneRequests]
        M --> O[QueryRuntimeStatistics]
        M --> P[PartitionKeyStatistics]
        M --> Q[ControlPlaneRequests]

    Monitoring Implementation

    # monitoring.py
    import time
    import logging
    from typing import Dict, Any, List, Optional
    from datetime import datetime, timedelta
    from dataclasses import dataclass, asdict
    import json
    
    @dataclass
    class PerformanceMetric:
        timestamp: datetime
        operation_type: str
        request_charge: float
        duration_ms: float
        status_code: int
        partition_key: str
        item_count: int = 0
        error_message: Optional[str] = None
    
    class CosmosDBMonitor:
        def __init__(self, container_client, enable_detailed_logging: bool = True):
            self.container = container_client
            self.enable_detailed_logging = enable_detailed_logging
            self.metrics_buffer = []
            self.alert_thresholds = {
                'high_latency_ms': 1000,
                'high_ru_consumption': 100,
                'error_rate_threshold': 0.05
            }
    
            # Setup structured logging
            self.logger = self._setup_monitoring_logger()
    
        def _setup_monitoring_logger(self):
            """Setup structured logging for monitoring"""
            logger = logging.getLogger('cosmos_monitor')
            logger.setLevel(logging.INFO)
    
            # Create console handler with structured format
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            logger.addHandler(handler)
    
            return logger
    
        def track_operation(self, operation_func, operation_type: str, *args, **kwargs):
            """Track performance metrics for any Cosmos DB operation"""
            start_time = time.time()
            start_timestamp = datetime.utcnow()
    
            try:
                # Execute the operation
                result = operation_func(*args, **kwargs)
    
                # Calculate metrics
                duration_ms = (time.time() - start_time) * 1000
    
                # Extract request charge if available
                request_charge = 0
                if hasattr(result, 'request_charge'):
                    request_charge = result.request_charge
                elif isinstance(result, dict) and '_request_charge' in result:
                    request_charge = result['_request_charge']
    
                # Extract partition key from args/kwargs
                partition_key = self._extract_partition_key(args, kwargs)
    
                # Create metric record
                metric = PerformanceMetric(
                    timestamp=start_timestamp,
                    operation_type=operation_type,
                    request_charge=request_charge,
                    duration_ms=duration_ms,
                    status_code=200,  # Assume success
                    partition_key=partition_key,
                    item_count=len(result) if isinstance(result, list) else 1
                )
    
                # Store metric
                self._store_metric(metric)
    
                # Check for alerts
                self._check_performance_alerts(metric)
    
                return result
    
            except Exception as e:
                duration_ms = (time.time() - start_time) * 1000
                partition_key = self._extract_partition_key(args, kwargs)
    
                # Create error metric
                error_metric = PerformanceMetric(
                    timestamp=start_timestamp,
                    operation_type=operation_type,
                    request_charge=0,
                    duration_ms=duration_ms,
                    status_code=500,  # Error status
                    partition_key=partition_key,
                    error_message=str(e)
                )
    
                self._store_metric(error_metric)
                self.logger.error(f"Operation failed: {operation_type}, Error: {e}")
                raise
    
        def _extract_partition_key(self, args, kwargs) -> str:
            """Extract partition key from operation arguments"""
            # Look for partition_key in kwargs
            if 'partition_key' in kwargs:
                return str(kwargs['partition_key'])
    
            # Look for common patterns in args
            for arg in args:
                if isinstance(arg, dict) and 'category' in arg:
                    return arg['category']
                elif isinstance(arg, dict) and 'partitionKey' in arg:
                    return arg['partitionKey']
    
            return 'unknown'
    
        def _store_metric(self, metric: PerformanceMetric):
            """Store metric in buffer and optionally send to external system"""
            self.metrics_buffer.append(metric)
    
            # Log metric if detailed logging is enabled
            if self.enable_detailed_logging:
                metric_dict = asdict(metric)
                metric_dict['timestamp'] = metric.timestamp.isoformat()
                self.logger.info(f"METRIC: {json.dumps(metric_dict)}")
    
            # Keep buffer size manageable
            if len(self.metrics_buffer) > 1000:
                self.metrics_buffer = self.metrics_buffer[-500:]
    
        def _check_performance_alerts(self, metric: PerformanceMetric):
            """Check metric against alert thresholds"""
            alerts = []
    
            # High latency alert
            if metric.duration_ms > self.alert_thresholds['high_latency_ms']:
                alerts.append({
                    'type': 'high_latency',
                    'message': f"High latency detected: {metric.duration_ms:.2f}ms",
                    'metric': metric
                })
    
            # High RU consumption alert
            if metric.request_charge > self.alert_thresholds['high_ru_consumption']:
                alerts.append({
                    'type': 'high_ru_consumption',
                    'message': f"High RU consumption: {metric.request_charge} RUs",
                    'metric': metric
                })
    
            # Process alerts
            for alert in alerts:
                self._handle_alert(alert)
    
        def _handle_alert(self, alert: Dict[str, Any]):
            """Handle performance alerts"""
            self.logger.warning(f"ALERT: {alert['type']} - {alert['message']}")
    
            # Here you could integrate with external alerting systems
            # like Azure Monitor, PagerDuty, Slack, etc.
    
        def get_performance_summary(self, hours: int = 24) -> Dict[str, Any]:
            """Get performance summary for the specified time period"""
            cutoff_time = datetime.utcnow() - timedelta(hours=hours)
    
            # Filter metrics within time window
            recent_metrics = [
                m for m in self.metrics_buffer 
                if m.timestamp >= cutoff_time
            ]
    
            if not recent_metrics:
                return {'message': 'No metrics available for the specified period'}
    
            # Calculate summary statistics
            total_operations = len(recent_metrics)
            successful_operations = len([m for m in recent_metrics if m.status_code == 200])
            failed_operations = total_operations - successful_operations
    
            latencies = [m.duration_ms for m in recent_metrics]
            ru_consumptions = [m.request_charge for m in recent_metrics]
    
            return {
                'time_period_hours': hours,
                'total_operations': total_operations,
                'successful_operations': successful_operations,
                'failed_operations': failed_operations,
                'success_rate': successful_operations / total_operations if total_operations > 0 else 0,
                'latency_stats': {
                    'avg_ms': sum(latencies) / len(latencies) if latencies else 0,
                    'min_ms': min(latencies) if latencies else 0,
                    'max_ms': max(latencies) if latencies else 0,
                    'p95_ms': self._calculate_percentile(latencies, 95) if latencies else 0
                },
                'ru_stats': {
                    'total_consumed': sum(ru_consumptions),
                    'avg_per_operation': sum(ru_consumptions) / len(ru_consumptions) if ru_consumptions else 0,
                    'max_consumption': max(ru_consumptions) if ru_consumptions else 0
                },
                'operation_breakdown': self._get_operation_breakdown(recent_metrics)
            }
    
        def _calculate_percentile(self, values: List[float], percentile: int) -> float:
            """Calculate percentile value"""
            if not values:
                return 0
    
            sorted_values = sorted(values)
            index = int((percentile / 100) * len(sorted_values))
            if index >= len(sorted_values):
                index = len(sorted_values) - 1
    
            return sorted_values[index]
    
        def _get_operation_breakdown(self, metrics: List[PerformanceMetric]) -> Dict[str, Any]:
            """Get breakdown of operations by type"""
            breakdown = {}
    
            for metric in metrics:
                op_type = metric.operation_type
                if op_type not in breakdown:
                    breakdown[op_type] = {
                        'count': 0,
                        'total_ru': 0,
                        'total_duration_ms': 0,
                        'errors': 0
                    }
    
                breakdown[op_type]['count'] += 1
                breakdown[op_type]['total_ru'] += metric.request_charge
                breakdown[op_type]['total_duration_ms'] += metric.duration_ms
    
                if metric.status_code != 200:
                    breakdown[op_type]['errors'] += 1
    
            # Calculate averages
            for op_type in breakdown:
                stats = breakdown[op_type]
                count = stats['count']
                if count > 0:
                    stats['avg_ru'] = stats['total_ru'] / count
                    stats['avg_duration_ms'] = stats['total_duration_ms'] / count
                    stats['error_rate'] = stats['errors'] / count
    
            return breakdown
    
    class TroubleshootingHelper:
        def __init__(self, container_client):
            self.container = container_client
    
        def diagnose_slow_queries(self, queries: List[str]) -> Dict[str, Any]:
            """Diagnose slow-performing queries"""
            diagnostics = []
    
            for query in queries:
                diagnosis = self._analyze_single_query(query)
                diagnostics.append(diagnosis)
    
            return {
                'total_queries_analyzed': len(queries),
                'diagnostics': diagnostics,
                'recommendations': self._generate_query_recommendations(diagnostics)
            }
    
        def _analyze_single_query(self, query: str) -> Dict[str, Any]:
            """Analyze a single query for performance issues"""
            try:
                start_time = time.time()
    
                # Execute query with metrics
                query_iterable = self.container.query_items(
                    query=query,
                    enable_cross_partition_query=True,
                    populate_query_metrics=True
                )
    
                # Collect results and metrics
                results = list(query_iterable)
                execution_time = (time.time() - start_time) * 1000
    
                # Extract query metrics
                query_metrics = query_iterable.query_metrics
                request_charge = query_iterable.request_charge
    
                return {
                    'query': query,
                    'execution_time_ms': execution_time,
                    'request_charge': request_charge,
                    'result_count': len(results),
                    'query_metrics': query_metrics,
                    'status': 'success',
                    'performance_issues': self._identify_performance_issues(
                        execution_time, request_charge, query_metrics, query
                    )
                }
    
            except Exception as e:
                return {
                    'query': query,
                    'status': 'error',
                    'error': str(e),
                    'performance_issues': ['Query execution failed']
                }
    
        def _identify_performance_issues(self, execution_time: float, 
                                       request_charge: float, 
                                       query_metrics: Dict,
                                       query: str) -> List[str]:
            """Identify potential performance issues with a query"""
            issues = []
    
            # Check execution time
            if execution_time > 1000:  # 1 second
                issues.append(f"High execution time: {execution_time:.2f}ms")
    
            # Check RU consumption
            if request_charge > 100:
                issues.append(f"High RU consumption: {request_charge} RUs")
    
            # Check for cross-partition queries
            if 'SELECT *' in query.upper():
                issues.append("Using SELECT * - consider selecting specific fields")
    
            # Check for missing WHERE clauses
            if 'WHERE' not in query.upper():
                issues.append("No WHERE clause - full collection scan")
    
            # Check for ORDER BY without proper indexing
            if 'ORDER BY' in query.upper() and request_charge > 50:
                issues.append("ORDER BY clause may require composite indexing")
    
            return issues
    
        def _generate_query_recommendations(self, diagnostics: List[Dict]) -> List[str]:
            """Generate recommendations based on query analysis"""
            recommendations = []
    
            # Analyze common issues
            high_ru_queries = [d for d in diagnostics if d.get('request_charge', 0) > 50]
            slow_queries = [d for d in diagnostics if d.get('execution_time_ms', 0) > 1000]
    
            if high_ru_queries:
                recommendations.append("Consider optimizing indexing for high RU consumption queries")
    
            if slow_queries:
                recommendations.append("Review slow queries for proper WHERE clauses and indexing")
    
            # Check for common patterns
            select_all_queries = [
                d for d in diagnostics 
                if 'SELECT *' in d.get('query', '').upper()
            ]
    
            if select_all_queries:
                recommendations.append("Replace SELECT * with specific field selection")
    
            return recommendations
    
        def check_partition_health(self) -> Dict[str, Any]:
            """Check partition key distribution and health"""
            # This would require specific queries to analyze partition distribution
            # For demonstration, we'll show the structure
    
            return {
                'analysis_timestamp': datetime.utcnow().isoformat(),
                'partition_distribution': 'Analysis would require access to partition statistics',
                'recommendations': [
                    'Monitor partition key distribution regularly',
                    'Watch for hot partitions in Azure Portal metrics',
                    'Consider partition key redesign if distribution is uneven'
                ]
            }
    
    class AlertingSystem:
        def __init__(self, monitor: CosmosDBMonitor):
            self.monitor = monitor
            self.alert_rules = []
    
        def add_alert_rule(self, name: str, condition_func, action_func, cooldown_minutes: int = 5):
            """Add a custom alert rule"""
            self.alert_rules.append({
                'name': name,
                'condition': condition_func,
                'action': action_func,
                'cooldown_minutes': cooldown_minutes,
                'last_triggered': None
            })
    
        def check_alerts(self):
            """Check all alert rules against current metrics"""
            current_time = datetime.utcnow()
            recent_metrics = self.monitor.metrics_buffer[-100:]  # Check last 100 metrics
    
            for rule in self.alert_rules:
                # Check cooldown
                if (rule['last_triggered'] and 
                    current_time - rule['last_triggered'] < timedelta(minutes=rule['cooldown_minutes'])):
                    continue
    
                # Check condition
                if rule'condition':
                    try:
                        rule['action'](rule['name'], recent_metrics)
                        rule['last_triggered'] = current_time
                    except Exception as e:
                        logging.error(f"Alert action failed for {rule['name']}: {e}")
    
    # Usage examples
    def setup_monitoring_example():
        """Example of setting up comprehensive monitoring"""
    
        # Assume we have container client
        container = get_container_client()  # Your container client
    
        # Setup monitoring
        monitor = CosmosDBMonitor(container, enable_detailed_logging=True)
    
        # Setup alerting
        alerting = AlertingSystem(monitor)
    
        # Add custom alert rules
        def high_error_rate_condition(metrics):
            if len(metrics) < 10:
                return False
            error_count = len([m for m in metrics if m.status_code != 200])
            return error_count / len(metrics) > 0.1  # 10% error rate
    
        def error_rate_action(rule_name, metrics):
            print(f"ALERT: {rule_name} triggered - High error rate detected!")
            # Could send to Slack, email, etc.
    
        alerting.add_alert_rule(
            "high_error_rate",
            high_error_rate_condition,
            error_rate_action,
            cooldown_minutes=10
        )
    
        # Example usage with tracking
        def tracked_create_item(item_data):
            return monitor.track_operation(
                container.create_item,
                'create_item',
                body=item_data
            )
    
        # Use tracked operations
        try:
            result = tracked_create_item({
                'id': 'test-item',
                'name': 'Test Product',
                'category': 'test'
            })
            print("Item created successfully")
        except Exception as e:
            print(f"Item creation failed: {e}")
    
        # Get performance summary
        summary = monitor.get_performance_summary(hours=1)
        print(f"Performance Summary: {json.dumps(summary, indent=2)}")
    
        # Check alerts
        alerting.check_alerts()
    
        return monitor, alerting
    Python

    15. Integration Patterns

    Event-Driven Architecture

    graph LR
        A[Application] --> B[Cosmos DB]
        B --> C[Change Feed]
        C --> D[Azure Functions]
        C --> E[Event Hubs]
        C --> F[Service Bus]
    
        D --> G[Process Events]
        E --> H[Stream Analytics]
        F --> I[Message Processing]
    
        G --> J[Update Search Index]
        G --> K[Send Notifications]
        G --> L[Update Cache]
    
        H --> M[Real-time Analytics]
        I --> N[Business Logic]

    Integration Implementation

    # integration_patterns.py
    import asyncio
    import json
    from typing import Dict, Any, List, Callable
    from abc import ABC, abstractmethod
    from datetime import datetime
    import aiohttp
    
    class IntegrationPattern(ABC):
        """Base class for integration patterns"""
    
        @abstractmethod
        async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
            pass
    
    class SearchIndexIntegration(IntegrationPattern):
        """Integration with Azure Cognitive Search or Elasticsearch"""
    
        def __init__(self, search_endpoint: str, search_key: str):
            self.search_endpoint = search_endpoint
            self.search_key = search_key
            self.session = None
    
        async def initialize(self):
            """Initialize HTTP session for search operations"""
            self.session = aiohttp.ClientSession(
                headers={
                    'Content-Type': 'application/json',
                    'api-key': self.search_key
                }
            )
    
        async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
            """Process Cosmos DB change event and update search index"""
            try:
                if not self.session:
                    await self.initialize()
    
                # Transform Cosmos DB document for search index
                search_document = self._transform_for_search(event)
    
                # Determine operation type
                operation_type = self._get_operation_type(event)
    
                if operation_type == 'upsert':
                    result = await self._upsert_document(search_document)
                elif operation_type == 'delete':
                    result = await self._delete_document(event.get('id'))
                else:
                    result = {'status': 'skipped', 'reason': f'Unknown operation: {operation_type}'}
    
                return {
                    'integration': 'search_index',
                    'operation': operation_type,
                    'document_id': event.get('id'),
                    'result': result,
                    'timestamp': datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    'integration': 'search_index',
                    'error': str(e),
                    'document_id': event.get('id'),
                    'timestamp': datetime.utcnow().isoformat()
                }
    
        def _transform_for_search(self, document: Dict[str, Any]) -> Dict[str, Any]:
            """Transform Cosmos DB document for search index"""
            # Remove Cosmos DB specific fields
            search_doc = {k: v for k, v in document.items() 
                         if not k.startswith('_')}
    
            # Add searchable text field
            searchable_fields = ['name', 'description', 'category']
            search_text = ' '.join([
                str(search_doc.get(field, '')) 
                for field in searchable_fields
            ])
            search_doc['searchText'] = search_text
    
            # Add last updated timestamp
            search_doc['lastUpdated'] = datetime.utcnow().isoformat()
    
            return search_doc
    
        def _get_operation_type(self, event: Dict[str, Any]) -> str:
            """Determine operation type from event"""
            # In a real implementation, you'd check change feed metadata
            # For now, assume upsert for all events
            return 'upsert'
    
        async def _upsert_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
            """Upsert document in search index"""
            index_name = 'products'  # Your search index name
            url = f"{self.search_endpoint}/indexes/{index_name}/docs/index"
    
            payload = {
                "value": [
                    {
                        "@search.action": "mergeOrUpload",
                        **document
                    }
                ]
            }
    
            async with self.session.post(url, json=payload) as response:
                result = await response.json()
                return {
                    'status_code': response.status,
                    'response': result
                }
    
        async def _delete_document(self, document_id: str) -> Dict[str, Any]:
            """Delete document from search index"""
            index_name = 'products'
            url = f"{self.search_endpoint}/indexes/{index_name}/docs/index"
    
            payload = {
                "value": [
                    {
                        "@search.action": "delete",
                        "id": document_id
                    }
                ]
            }
    
            async with self.session.post(url, json=payload) as response:
                result = await response.json()
                return {
                    'status_code': response.status,
                    'response': result
                }
    
    class CacheIntegration(IntegrationPattern):
        """Integration with Azure Cache for Redis"""
    
        def __init__(self, redis_connection_string: str):
            self.redis_connection_string = redis_connection_string
            self.redis_client = None
    
        async def initialize(self):
            """Initialize Redis client"""
            import aioredis
            self.redis_client = await aioredis.from_url(self.redis_connection_string)
    
        async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
            """Process Cosmos DB change event and update cache"""
            try:
                if not self.redis_client:
                    await self.initialize()
    
                document_id = event.get('id')
                category = event.get('category', 'unknown')
    
                # Update individual document cache
                cache_key = f"doc:{category}:{document_id}"
                await self.redis_client.setex(
                    cache_key, 
                    3600,  # 1 hour TTL
                    json.dumps(event)
                )
    
                # Update category list cache
                category_key = f"category:{category}"
                await self.redis_client.sadd(category_key, document_id)
                await self.redis_client.expire(category_key, 3600)
    
                # Invalidate related caches
                await self._invalidate_related_caches(event)
    
                return {
                    'integration': 'cache',
                    'operation': 'update',
                    'document_id': document_id,
                    'cache_keys_updated': [cache_key, category_key],
                    'timestamp': datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    'integration': 'cache',
                    'error': str(e),
                    'document_id': event.get('id'),
                    'timestamp': datetime.utcnow().isoformat()
                }
    
        async def _invalidate_related_caches(self, event: Dict[str, Any]):
            """Invalidate caches related to the changed document"""
            category = event.get('category')
            if category:
                # Invalidate category summary cache
                summary_key = f"summary:{category}"
                await self.redis_client.delete(summary_key)
    
    class NotificationIntegration(IntegrationPattern):
        """Integration with notification systems (Email, SMS, Push)"""
    
        def __init__(self, notification_config: Dict[str, Any]):
            self.config = notification_config
    
        async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
            """Process event and send notifications"""
            try:
                notifications_sent = []
    
                # Check if event triggers notifications
                if self._should_notify(event):
                    notification_message = self._create_notification_message(event)
    
                    # Send email notification
                    if self.config.get('email_enabled'):
                        email_result = await self._send_email_notification(notification_message)
                        notifications_sent.append({'type': 'email', 'result': email_result})
    
                    # Send webhook notification
                    if self.config.get('webhook_enabled'):
                        webhook_result = await self._send_webhook_notification(event)
                        notifications_sent.append({'type': 'webhook', 'result': webhook_result})
    
                return {
                    'integration': 'notifications',
                    'document_id': event.get('id'),
                    'notifications_sent': notifications_sent,
                    'timestamp': datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    'integration': 'notifications',
                    'error': str(e),
                    'document_id': event.get('id'),
                    'timestamp': datetime.utcnow().isoformat()
                }
    
        def _should_notify(self, event: Dict[str, Any]) -> bool:
            """Determine if event should trigger notifications"""
            # Example: notify for high-value orders
            if event.get('category') == 'order' and event.get('total', 0) > 1000:
                return True
    
            # Example: notify for low inventory
            if event.get('category') == 'product' and event.get('inventory', 0) < 10:
                return True
    
            return False
    
        def _create_notification_message(self, event: Dict[str, Any]) -> str:
            """Create notification message from event"""
            if event.get('category') == 'order':
                return f"High-value order received: ${event.get('total')} (Order ID: {event.get('id')})"
            elif event.get('category') == 'product':
                return f"Low inventory alert: {event.get('name')} has {event.get('inventory')} items left"
            else:
                return f"Document updated: {event.get('id')}"
    
        async def _send_email_notification(self, message: str) -> Dict[str, Any]:
            """Send email notification (placeholder implementation)"""
            # In real implementation, integrate with SendGrid, AWS SES, etc.
            return {
                'status': 'sent',
                'message': message,
                'method': 'email'
            }
    
        async def _send_webhook_notification(self, event: Dict[str, Any]) -> Dict[str, Any]:
            """Send webhook notification"""
            webhook_url = self.config.get('webhook_url')
            if not webhook_url:
                return {'status': 'skipped', 'reason': 'No webhook URL configured'}
    
            try:
                async with aiohttp.ClientSession() as session:
                    payload = {
                        'event_type': 'cosmos_db_change',
                        'data': event,
                        'timestamp': datetime.utcnow().isoformat()
                    }
    
                    async with session.post(webhook_url, json=payload) as response:
                        return {
                            'status': 'sent',
                            'status_code': response.status,
                            'webhook_url': webhook_url
                        }
    
            except Exception as e:
                return {
                    'status': 'failed',
                    'error': str(e),
                    'webhook_url': webhook_url
                }
    
    class IntegrationOrchestrator:
        """Orchestrates multiple integration patterns"""
    
        def __init__(self):
            self.integrations: List[IntegrationPattern] = []
            self.event_queue = asyncio.Queue()
            self.processing = False
    
        def add_integration(self, integration: IntegrationPattern):
            """Add an integration pattern"""
            self.integrations.append(integration)
    
        async def process_change_feed_event(self, event: Dict[str, Any]):
            """Process a single change feed event through all integrations"""
            await self.event_queue.put(event)
    
        async def start_processing(self):
            """Start processing events from the queue"""
            self.processing = True
    
            while self.processing:
                try:
                    # Wait for event with timeout
                    event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
    
                    # Process event through all integrations
                    await self._process_event_through_integrations(event)
    
                except asyncio.TimeoutError:
                    # No events to process, continue
                    continue
                except Exception as e:
                    logging.error(f"Error processing event: {e}")
    
        async def stop_processing(self):
            """Stop processing events"""
            self.processing = False
    
        async def _process_event_through_integrations(self, event: Dict[str, Any]):
            """Process event through all registered integrations"""
            tasks = []
    
            for integration in self.integrations:
                task = asyncio.create_task(integration.process_event(event))
                tasks.append(task)
    
            # Wait for all integrations to complete
            results = await asyncio.gather(*tasks, return_exceptions=True)
    
            # Log results
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    logging.error(f"Integration {i} failed: {result}")
                else:
                    logging.info(f"Integration {i} result: {result}")
    
    # Microservices integration pattern
    class MicroserviceEventPublisher:
        """Publish events to microservices via message broker"""
    
        def __init__(self, message_broker_config: Dict[str, Any]):
            self.config = message_broker_config
            self.publisher = None
    
        async def initialize(self):
            """Initialize message broker connection"""
            # Example for Azure Service Bus
            from azure.servicebus.aio import ServiceBusClient
    
            connection_string = self.config.get('connection_string')
            self.publisher = ServiceBusClient.from_connection_string(connection_string)
    
        async def publish_event(self, event: Dict[str, Any], routing_key: str = None):
            """Publish event to message broker"""
            try:
                if not self.publisher:
                    await self.initialize()
    
                # Determine topic/queue based on event type
                topic_name = routing_key or self._get_topic_for_event(event)
    
                # Create message
                message_body = json.dumps({
                    'event_type': 'cosmos_db_change',
                    'data': event,
                    'timestamp': datetime.utcnow().isoformat(),
                    'source': 'cosmos_db_change_feed'
                })
    
                # Send message
                sender = self.publisher.get_topic_sender(topic_name)
                async with sender:
                    from azure.servicebus import ServiceBusMessage
                    message = ServiceBusMessage(message_body)
                    await sender.send_messages(message)
    
                return {
                    'status': 'published',
                    'topic': topic_name,
                    'message_size': len(message_body)
                }
    
            except Exception as e:
                return {
                    'status': 'failed',
                    'error': str(e),
                    'topic': topic_name
                }
    
        def _get_topic_for_event(self, event: Dict[str, Any]) -> str:
            """Determine message topic based on event content"""
            category = event.get('category', 'unknown')
            return f"cosmos-changes-{category}"
    
    # Usage example
    async def setup_integration_example():
        """Example of setting up comprehensive integrations"""
    
        # Initialize integrations
        search_integration = SearchIndexIntegration(
            search_endpoint="https://your-search-service.search.windows.net",
            search_key="your-search-key"
        )
    
        cache_integration = CacheIntegration(
            redis_connection_string="redis://your-redis-instance:6379"
        )
    
        notification_integration = NotificationIntegration({
            'email_enabled': True,
            'webhook_enabled': True,
            'webhook_url': 'https://your-webhook-endpoint.com/cosmos-events'
        })
    
        # Setup orchestrator
        orchestrator = IntegrationOrchestrator()
        orchestrator.add_integration(search_integration)
        orchestrator.add_integration(cache_integration)
        orchestrator.add_integration(notification_integration)
    
        # Simulate processing change feed events
        sample_events = [
            {
                'id': 'product-001',
                'name': 'Laptop',
                'category': 'electronics',
                'price': 999.99,
                'inventory': 5
            },
            {
                'id': 'order-001',
                'category': 'order',
                'total': 1500.00,
                'customer_id': 'customer-123'
            }
        ]
    
        # Process events
        for event in sample_events:
            await orchestrator.process_change_feed_event(event)
    
        # Start processing (in real scenario, this would run continuously)
        # await orchestrator.start_processing()
    
        print("Integration example setup completed")
    Python

    16. Best Practices and Production Deployment

    Production Readiness Checklist

    graph TD
        A[Production Readiness] --> B[Security]
        A --> C[Performance]
        A --> D[Monitoring]
        A --> E[Disaster Recovery]
        A --> F[Cost Optimization]
    
        B --> G[Authentication/Authorization]
        B --> H[Network Security]
        B --> I[Data Encryption]
    
        C --> J[Throughput Planning]
        C --> K[Indexing Strategy]
        C --> L[Partition Design]
    
        D --> M[Metrics & Alerts]
        D --> N[Logging Strategy]
        D --> O[Health Checks]
    
        E --> P[Backup Strategy]
        E --> Q[Multi-region Setup]
        E --> R[Failover Testing]
    
        F --> S[Reserved Capacity]
        F --> T[Autoscaling]
        F --> U[Resource Optimization]

    Production Best Practices Implementation

    # production_best_practices.py
    import asyncio
    import logging
    from typing import Dict, Any, List, Optional
    from datetime import datetime, timedelta
    from dataclasses import dataclass
    import json
    import os
    
    @dataclass
    class ProductionConfig:
        # Connection settings
        endpoint: str
        key: str
        database_name: str
    
        # Performance settings
        max_connections: int = 100
        connection_timeout: int = 30
        retry_attempts: int = 3
    
        # Monitoring settings
        enable_detailed_logging: bool = True
        log_level: str = "INFO"
        metrics_collection_interval: int = 60
    
        # Security settings
        enable_ssl: bool = True
        certificate_verification: bool = True
    
        # Backup settings
        backup_enabled: bool = True
        backup_retention_days: int = 30
    
        # Multi-region settings
        read_regions: List[str] = None
        write_regions: List[str] = None
    
        @classmethod
        def from_environment(cls):
            """Create configuration from environment variables"""
            return cls(
                endpoint=os.getenv('COSMOS_ENDPOINT'),
                key=os.getenv('COSMOS_KEY'),
                database_name=os.getenv('COSMOS_DATABASE'),
                max_connections=int(os.getenv('COSMOS_MAX_CONNECTIONS', '100')),
                connection_timeout=int(os.getenv('COSMOS_CONNECTION_TIMEOUT', '30')),
                enable_detailed_logging=os.getenv('COSMOS_DETAILED_LOGGING', 'true').lower() == 'true'
            )
    
    class ProductionCosmosClient:
        """Production-ready Cosmos DB client with best practices"""
    
        def __init__(self, config: ProductionConfig):
            self.config = config
            self.client = None
            self.database = None
            self.containers = {}
            self.connection_pool = None
            self._setup_logging()
    
        def _setup_logging(self):
            """Setup structured logging for production"""
            self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
            self.logger.setLevel(getattr(logging, self.config.log_level))
    
            # Create structured formatter
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - '
                '%(funcName)s:%(lineno)d - %(message)s'
            )
    
            # Add console handler if not exists
            if not self.logger.handlers:
                handler = logging.StreamHandler()
                handler.setFormatter(formatter)
                self.logger.addHandler(handler)
    
        async def initialize(self):
            """Initialize client with production settings"""
            try:
                from azure.cosmos.aio import CosmosClient
                from azure.cosmos import ConnectionPolicy
    
                # Configure connection policy for production
                connection_policy = ConnectionPolicy()
                connection_policy.RequestTimeout = self.config.connection_timeout * 1000
                connection_policy.ConnectionMode = 'Gateway'  # More stable for production
                connection_policy.MaxPoolSize = self.config.max_connections
                connection_policy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = self.config.retry_attempts
    
                # Create client with production settings
                self.client = CosmosClient(
                    url=self.config.endpoint,
                    credential=self.config.key,
                    connection_policy=connection_policy
                )
    
                # Initialize database
                self.database = self.client.get_database_client(self.config.database_name)
    
                # Verify connection
                await self._verify_connection()
    
                self.logger.info("Production Cosmos DB client initialized successfully")
    
            except Exception as e:
                self.logger.error(f"Failed to initialize Cosmos DB client: {e}")
                raise
    
        async def _verify_connection(self):
            """Verify database connection and accessibility"""
            try:
                # Test connection by reading database properties
                properties = await self.database.read()
                self.logger.info(f"Connected to database: {properties['id']}")
    
            except Exception as e:
                self.logger.error(f"Connection verification failed: {e}")
                raise
    
        async def get_container(self, container_name: str, 
                              partition_key_path: str = "/id",
                              create_if_not_exists: bool = False):
            """Get container with caching and error handling"""
            if container_name in self.containers:
                return self.containers[container_name]
    
            try:
                if create_if_not_exists:
                    from azure.cosmos import PartitionKey
                    container = await self.database.create_container_if_not_exists(
                        id=container_name,
                        partition_key=PartitionKey(path=partition_key_path),
                        offer_throughput=400
                    )
                else:
                    container = self.database.get_container_client(container_name)
    
                # Cache container reference
                self.containers[container_name] = container
    
                self.logger.info(f"Container '{container_name}' ready")
                return container
    
            except Exception as e:
                self.logger.error(f"Failed to get container '{container_name}': {e}")
                raise
    
        async def health_check(self) -> Dict[str, Any]:
            """Perform health check on Cosmos DB connection"""
            health_status = {
                'timestamp': datetime.utcnow().isoformat(),
                'status': 'unknown',
                'checks': {}
            }
    
            try:
                # Check database connectivity
                start_time = datetime.utcnow()
                await self.database.read()
                connectivity_time = (datetime.utcnow() - start_time).total_seconds() * 1000
    
                health_status['checks']['database_connectivity'] = {
                    'status': 'healthy',
                    'response_time_ms': connectivity_time
                }
    
                # Check container accessibility (if any containers are cached)
                for container_name, container in self.containers.items():
                    try:
                        start_time = datetime.utcnow()
                        # Simple query to test container
                        query_iterable = container.query_items(
                            query="SELECT TOP 1 * FROM c",
                            enable_cross_partition_query=True
                        )
                        await query_iterable.__anext__()  # Get first result
                        container_time = (datetime.utcnow() - start_time).total_seconds() * 1000
    
                        health_status['checks'][f'container_{container_name}'] = {
                            'status': 'healthy',
                            'response_time_ms': container_time
                        }
                    except Exception as e:
                        health_status['checks'][f'container_{container_name}'] = {
                            'status': 'unhealthy',
                            'error': str(e)
                        }
    
                # Determine overall status
                unhealthy_checks = [
                    check for check in health_status['checks'].values()
                    if check['status'] != 'healthy'
                ]
    
                health_status['status'] = 'healthy' if not unhealthy_checks else 'unhealthy'
    
            except Exception as e:
                health_status['status'] = 'unhealthy'
                health_status['error'] = str(e)
    
            return health_status
    
        async def close(self):
            """Properly close the client and cleanup resources"""
            try:
                if self.client:
                    await self.client.close()
                    self.logger.info("Cosmos DB client closed successfully")
            except Exception as e:
                self.logger.error(f"Error closing Cosmos DB client: {e}")
    
    class ProductionOperationWrapper:
        """Wrapper for production-ready database operations"""
    
        def __init__(self, client: ProductionCosmosClient):
            self.client = client
            self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
    
        async def execute_with_retry(self, operation_func, *args, **kwargs):
            """Execute operation with retry logic and proper error handling"""
            max_retries = self.client.config.retry_attempts
    
            for attempt in range(max_retries + 1):
                try:
                    start_time = datetime.utcnow()
                    result = await operation_func(*args, **kwargs)
    
                    # Log successful operation
                    duration = (datetime.utcnow() - start_time).total_seconds() * 1000
                    self.logger.info(
                        f"Operation {operation_func.__name__} completed successfully "
                        f"in {duration:.2f}ms (attempt {attempt + 1})"
                    )
    
                    return result
    
                except Exception as e:
                    self.logger.warning(
                        f"Operation {operation_func.__name__} failed "
                        f"(attempt {attempt + 1}/{max_retries + 1}): {e}"
                    )
    
                    if attempt == max_retries:
                        self.logger.error(
                            f"Operation {operation_func.__name__} failed "
                            f"after {max_retries + 1} attempts"
                        )
                        raise
    
                    # Wait before retry (exponential backoff)
                    wait_time = (2 ** attempt) + (0.1 * attempt)
                    await asyncio.sleep(wait_time)
    
        async def batch_operation(self, operations: List[Dict[str, Any]], 
                                container, batch_size: int = 25):
            """Execute batch operations efficiently"""
            results = []
    
            for i in range(0, len(operations), batch_size):
                batch = operations[i:i + batch_size]
    
                try:
                    # Group operations by partition key for transaction batches
                    partition_groups = {}
                    for op in batch:
                        partition_key = op.get('partition_key', 'unknown')
                        if partition_key not in partition_groups:
                            partition_groups[partition_key] = []
                        partition_groups[partition_key].append(op)
    
                    # Execute each partition group
                    for partition_key, group_ops in partition_groups.items():
                        batch_result = await self._execute_partition_batch(
                            group_ops, container, partition_key
                        )
                        results.extend(batch_result)
    
                except Exception as e:
                    self.logger.error(f"Batch operation failed for batch {i//batch_size + 1}: {e}")
                    # Add error results for this batch
                    for op in batch:
                        results.append({
                            'operation': op,
                            'status': 'failed',
                            'error': str(e)
                        })
    
            return results
    
        async def _execute_partition_batch(self, operations: List[Dict[str, Any]], 
                                         container, partition_key: str):
            """Execute operations for a single partition"""
            from azure.cosmos import TransactionalBatch
    
            try:
                batch = TransactionalBatch()
    
                for op in operations:
                    op_type = op['type']
                    data = op['data']
    
                    if op_type == 'create':
                        batch.create_item(body=data)
                    elif op_type == 'upsert':
                        batch.upsert_item(body=data)
                    elif op_type == 'replace':
                        batch.replace_item(item=data['id'], body=data)
                    elif op_type == 'delete':
                        batch.delete_item(item=data['id'])
    
                # Execute the batch
                batch_response = await container.execute_item_batch(
                    batch_operations=batch,
                    partition_key=partition_key
                )
    
                # Process results
                results = []
                for i, op in enumerate(operations):
                    results.append({
                        'operation': op,
                        'status': 'success',
                        'result': batch_response[i] if i < len(batch_response) else None
                    })
    
                return results
    
            except Exception as e:
                # Return error for all operations in this batch
                return [
                    {
                        'operation': op,
                        'status': 'failed',
                        'error': str(e)
                    }
                    for op in operations
                ]
    
    class ProductionMonitoring:
        """Production monitoring and alerting"""
    
        def __init__(self, client: ProductionCosmosClient):
            self.client = client
            self.metrics = {
                'operations_total': 0,
                'operations_failed': 0,
                'total_ru_consumed': 0,
                'avg_latency_ms': 0,
                'last_health_check': None
            }
            self.alerts_enabled = True
    
        async def start_monitoring(self):
            """Start continuous monitoring"""
            while True:
                try:
                    await self._collect_metrics()
                    await self._check_alerts()
                    await asyncio.sleep(self.client.config.metrics_collection_interval)
                except Exception as e:
                    logging.error(f"Monitoring error: {e}")
                    await asyncio.sleep(60)  # Wait longer on error
    
        async def _collect_metrics(self):
            """Collect performance metrics"""
            try:
                # Perform health check
                health_result = await self.client.health_check()
                self.metrics['last_health_check'] = health_result
    
                # Log metrics
                logging.info(f"Metrics collected: {json.dumps(self.metrics, indent=2)}")
    
            except Exception as e:
                logging.error(f"Failed to collect metrics: {e}")
    
        async def _check_alerts(self):
            """Check for alert conditions"""
            if not self.alerts_enabled:
                return
    
            # Check error rate
            if self.metrics['operations_total'] > 0:
                error_rate = self.metrics['operations_failed'] / self.metrics['operations_total']
                if error_rate > 0.05:  # 5% error rate threshold
                    await self._send_alert('high_error_rate', f"Error rate: {error_rate:.2%}")
    
            # Check health status
            health_check = self.metrics.get('last_health_check')
            if health_check and health_check['status'] != 'healthy':
                await self._send_alert('health_check_failed', "Health check failed")
    
        async def _send_alert(self, alert_type: str, message: str):
            """Send alert (placeholder implementation)"""
            alert_message = {
                'timestamp': datetime.utcnow().isoformat(),
                'alert_type': alert_type,
                'message': message,
                'metrics': self.metrics
            }
    
            logging.warning(f"ALERT: {json.dumps(alert_message)}")
    
            # In production, integrate with your alerting system:
            # - Send to Azure Monitor
            # - Send to PagerDuty
            # - Send to Slack
            # - Send email notifications
    
    # Deployment utilities
    class DeploymentHelper:
        """Helper utilities for production deployment"""
    
        @staticmethod
        def validate_environment():
            """Validate environment configuration"""
            required_env_vars = [
                'COSMOS_ENDPOINT',
                'COSMOS_KEY',
                'COSMOS_DATABASE'
            ]
    
            missing_vars = []
            for var in required_env_vars:
                if not os.getenv(var):
                    missing_vars.append(var)
    
            if missing_vars:
                raise ValueError(f"Missing required environment variables: {missing_vars}")
    
            return True
    
        @staticmethod
        def create_production_containers(database_client, container_configs: List[Dict]):
            """Create containers with production settings"""
            created_containers = []
    
            for config in container_configs:
                try:
                    from azure.cosmos import PartitionKey
    
                    container = database_client.create_container_if_not_exists(
                        id=config['name'],
                        partition_key=PartitionKey(path=config['partition_key_path']),
                        offer_throughput=config.get('throughput', 400),
                        indexing_policy=config.get('indexing_policy')
                    )
    
                    created_containers.append(container)
                    logging.info(f"Container '{config['name']}' created/verified")
    
                except Exception as e:
                    logging.error(f"Failed to create container '{config['name']}': {e}")
                    raise
    
            return created_containers
    
        @staticmethod
        def setup_production_logging():
            """Setup production logging configuration"""
            logging.basicConfig(
                level=logging.INFO,
                format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                handlers=[
                    logging.StreamHandler(),
                    logging.FileHandler('cosmos_app.log')
                ]
            )
    
    # Usage example for production deployment
    async def production_deployment_example():
        """Example of production deployment setup"""
    
        # Validate environment
        DeploymentHelper.validate_environment()
    
        # Setup logging
        DeploymentHelper.setup_production_logging()
    
        # Create production configuration
        config = ProductionConfig.from_environment()
    
        # Initialize production client
        cosmos_client = ProductionCosmosClient(config)
        await cosmos_client.initialize()
    
        # Create production containers
        container_configs = [
            {
                'name': 'products',
                'partition_key_path': '/category',
                'throughput': 1000,
                'indexing_policy': {
                    'indexingMode': 'consistent',
                    'automatic': True,
                    'includedPaths': [{'path': '/*'}],
                    'excludedPaths': [{'path': '/description/*'}]
                }
            },
            {
                'name': 'orders',
                'partition_key_path': '/customerId',
                'throughput': 800
            }
        ]
    
        created_containers = DeploymentHelper.create_production_containers(
            cosmos_client.database, 
            container_configs
        )
    
        # Setup operation wrapper
        operation_wrapper = ProductionOperationWrapper(cosmos_client)
    
        # Setup monitoring
        monitoring = ProductionMonitoring(cosmos_client)
    
        # Start monitoring in background
        monitoring_task = asyncio.create_task(monitoring.start_monitoring())
    
        try:
            # Your application logic here
            logging.info("Production deployment completed successfully")
    
            # Keep monitoring running
            await monitoring_task
    
        except KeyboardInterrupt:
            logging.info("Shutting down...")
        finally:
            await cosmos_client.close()
    
    if __name__ == "__main__":
        asyncio.run(production_deployment_example())
    Python

    17. Advanced Scenarios

    Multi-Master Global Distribution

    graph TB
        A[Global Application] --> B[Region 1: West US]
        A --> C[Region 2: East Europe]
        A --> D[Region 3: Southeast Asia]
    
        B --> E[Read/Write Replicas]
        C --> F[Read/Write Replicas]
        D --> G[Read/Write Replicas]
    
        E <--> F
        F <--> G
        G <--> E
    
        H[Conflict Resolution] --> I[Last Writer Wins]
        H --> J[Custom Merge Procedures]
        H --> K[Application-Defined]

    Advanced Scenarios Implementation

    # advanced_scenarios.py
    import asyncio
    import json
    from typing import Dict, Any, List, Optional, Union
    from datetime import datetime, timedelta
    from enum import Enum
    import hashlib
    
    class ConflictResolutionMode(Enum):
        LAST_WRITER_WINS = "lastWriterWins"
        CUSTOM_PROCEDURE = "customProcedure"
        APPLICATION_DEFINED = "applicationDefined"
    
    class GlobalDistributionManager:
        """Manage global distribution and multi-master scenarios"""
    
        def __init__(self, primary_client, region_clients: Dict[str, Any]):
            self.primary_client = primary_client
            self.region_clients = region_clients
            self.conflict_resolver = ConflictResolver()
    
        async def setup_global_distribution(self, containers: List[str], 
                                          conflict_resolution_mode: ConflictResolutionMode):
            """Setup global distribution for containers"""
    
            setup_results = {}
    
            for container_name in containers:
                try:
                    # Configure conflict resolution for each container
                    await self._configure_conflict_resolution(
                        container_name, 
                        conflict_resolution_mode
                    )
    
                    # Verify replication across regions
                    replication_status = await self._verify_global_replication(container_name)
    
                    setup_results[container_name] = {
                        'status': 'success',
                        'conflict_resolution': conflict_resolution_mode.value,
                        'replication_status': replication_status
                    }
    
                except Exception as e:
                    setup_results[container_name] = {
                        'status': 'failed',
                        'error': str(e)
                    }
    
            return setup_results
    
        async def _configure_conflict_resolution(self, container_name: str, 
                                               mode: ConflictResolutionMode):
            """Configure conflict resolution for a container"""
    
            conflict_resolution_policy = {
                'mode': mode.value
            }
    
            if mode == ConflictResolutionMode.CUSTOM_PROCEDURE:
                # Set up custom conflict resolution stored procedure
                conflict_resolution_policy['conflictResolutionProcedure'] = 'customConflictResolver'
    
                # Create the conflict resolution stored procedure
                await self._create_conflict_resolution_procedure(container_name)
    
            elif mode == ConflictResolutionMode.LAST_WRITER_WINS:
                conflict_resolution_policy['conflictResolutionPath'] = '/_ts'
    
            # Apply the policy (this would be done through Azure Portal or ARM templates in practice)
            # For demonstration, we log the configuration
            print(f"Configured conflict resolution for {container_name}: {conflict_resolution_policy}")
    
        async def _create_conflict_resolution_procedure(self, container_name: str):
            """Create custom conflict resolution stored procedure"""
    
            conflict_resolution_sproc = """
            function customConflictResolver(incomingItem, existingItem, isTombstone, conflictingItems) {
                var context = getContext();
                var response = context.getResponse();
    
                // Custom logic for conflict resolution
                if (isTombstone) {
                    // Handle delete conflicts
                    response.setBody(null); // Accept delete
                    return;
                }
    
                if (!existingItem) {
                    // No conflict, accept incoming item
                    response.setBody(incomingItem);
                    return;
                }
    
                // Merge strategy: prefer higher priority or newer timestamp
                var resolved = mergeItems(incomingItem, existingItem);
                response.setBody(resolved);
    
                function mergeItems(incoming, existing) {
                    // Example merge logic
                    var merged = Object.assign({}, existing);
    
                    // Prefer incoming item if it has higher priority
                    if (incoming.priority > existing.priority) {
                        merged = Object.assign(merged, incoming);
                    } else if (incoming.priority === existing.priority) {
                        // Same priority, merge specific fields
                        if (incoming._ts > existing._ts) {
                            merged.lastUpdated = incoming.lastUpdated;
                            merged.data = incoming.data;
                        }
                    }
    
                    // Always update the modification timestamp
                    merged._lastResolved = new Date().toISOString();
    
                    return merged;
                }
            }
            """
    
            # In practice, you would create this stored procedure on the container
            print(f"Created conflict resolution procedure for {container_name}")
    
        async def _verify_global_replication(self, container_name: str) -> Dict[str, Any]:
            """Verify replication status across regions"""
            replication_status = {}
    
            for region, client in self.region_clients.items():
                try:
                    # Test read operation from each region
                    container = await client.get_container(container_name)
    
                    # Perform a simple query to verify accessibility
                    query_result = container.query_items(
                        query="SELECT TOP 1 * FROM c",
                        enable_cross_partition_query=True
                    )
    
                    # Try to get first item (may be empty)
                    try:
                        first_item = await query_result.__anext__()
                        item_count = 1
                    except StopAsyncIteration:
                        item_count = 0
    
                    replication_status[region] = {
                        'status': 'healthy',
                        'accessible': True,
                        'sample_item_count': item_count,
                        'test_timestamp': datetime.utcnow().isoformat()
                    }
    
                except Exception as e:
                    replication_status[region] = {
                        'status': 'unhealthy',
                        'accessible': False,
                        'error': str(e),
                        'test_timestamp': datetime.utcnow().isoformat()
                    }
    
            return replication_status
    
        async def write_with_preference(self, data: Dict[str, Any], 
                                      preferred_region: str = None,
                                      consistency_level: str = "Session"):
            """Write data with region preference"""
    
            target_client = self.primary_client
            if preferred_region and preferred_region in self.region_clients:
                target_client = self.region_clients[preferred_region]
    
            try:
                # Add metadata for conflict resolution
                data['_writtenAt'] = datetime.utcnow().isoformat()
                data['_writtenFrom'] = preferred_region or 'primary'
                data['_version'] = data.get('_version', 0) + 1
    
                # Perform write operation
                container = await target_client.get_container('global_data')
                result = await container.upsert_item(body=data)
    
                return {
                    'success': True,
                    'region': preferred_region or 'primary',
                    'result': result,
                    'timestamp': datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                # Fallback to primary region if preferred region fails
                if preferred_region and target_client != self.primary_client:
                    return await self.write_with_preference(data, None, consistency_level)
                else:
                    return {
                        'success': False,
                        'error': str(e),
                        'region': preferred_region or 'primary'
                    }
    
    class ConflictResolver:
        """Handle conflict resolution scenarios"""
    
        def __init__(self):
            self.resolution_strategies = {
                'timestamp': self._resolve_by_timestamp,
                'priority': self._resolve_by_priority,
                'merge': self._resolve_by_merge,
                'custom': self._resolve_custom
            }
    
        async def resolve_conflict(self, conflict_data: Dict[str, Any], 
                                 strategy: str = 'timestamp') -> Dict[str, Any]:
            """Resolve conflicts using specified strategy"""
    
            if strategy not in self.resolution_strategies:
                raise ValueError(f"Unknown resolution strategy: {strategy}")
    
            resolver = self.resolution_strategies[strategy]
            return await resolver(conflict_data)
    
        async def _resolve_by_timestamp(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
            """Resolve by last write timestamp"""
            items = conflict_data.get('conflicting_items', [])
    
            if not items:
                return {'resolved_item': None, 'strategy': 'timestamp'}
    
            # Find item with latest timestamp
            latest_item = max(items, key=lambda x: x.get('_ts', 0))
    
            return {
                'resolved_item': latest_item,
                'strategy': 'timestamp',
                'resolution_reason': f"Selected item with latest timestamp: {latest_item.get('_ts')}"
            }
    
        async def _resolve_by_priority(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
            """Resolve by priority field"""
            items = conflict_data.get('conflicting_items', [])
    
            if not items:
                return {'resolved_item': None, 'strategy': 'priority'}
    
            # Find item with highest priority
            highest_priority_item = max(items, key=lambda x: x.get('priority', 0))
    
            return {
                'resolved_item': highest_priority_item,
                'strategy': 'priority',
                'resolution_reason': f"Selected item with highest priority: {highest_priority_item.get('priority')}"
            }
    
        async def _resolve_by_merge(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
            """Resolve by merging conflicting items"""
            items = conflict_data.get('conflicting_items', [])
    
            if not items:
                return {'resolved_item': None, 'strategy': 'merge'}
    
            if len(items) == 1:
                return {'resolved_item': items[0], 'strategy': 'merge'}
    
            # Merge items
            merged_item = {}
    
            # Start with the first item
            merged_item.update(items[0])
    
            # Merge subsequent items
            for item in items[1:]:
                for key, value in item.items():
                    if key.startswith('_'):
                        continue  # Skip system fields
    
                    if key not in merged_item:
                        merged_item[key] = value
                    else:
                        # Merge strategy based on field type
                        if isinstance(value, (int, float)) and isinstance(merged_item[key], (int, float)):
                            merged_item[key] = max(value, merged_item[key])  # Take maximum
                        elif isinstance(value, list):
                            # Merge arrays
                            if isinstance(merged_item[key], list):
                                merged_item[key] = list(set(merged_item[key] + value))
                            else:
                                merged_item[key] = value
                        elif isinstance(value, str) and len(value) > len(str(merged_item[key])):
                            merged_item[key] = value  # Take longer string
    
            # Add merge metadata
            merged_item['_merged'] = True
            merged_item['_mergedAt'] = datetime.utcnow().isoformat()
            merged_item['_mergedFrom'] = [item.get('id') for item in items]
    
            return {
                'resolved_item': merged_item,
                'strategy': 'merge',
                'resolution_reason': f"Merged {len(items)} conflicting items"
            }
    
        async def _resolve_custom(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
            """Custom resolution logic"""
            # Implement your custom business logic here
            items = conflict_data.get('conflicting_items', [])
    
            # Example: Business-specific resolution
            business_rules_item = None
            for item in items:
                if item.get('source') == 'authoritative_system':
                    business_rules_item = item
                    break
    
            if business_rules_item:
                return {
                    'resolved_item': business_rules_item,
                    'strategy': 'custom',
                    'resolution_reason': 'Selected item from authoritative system'
                }
    
            # Fallback to timestamp resolution
            return await self._resolve_by_timestamp(conflict_data)
    
    class EventSourcingPattern:
        """Implement event sourcing pattern with Cosmos DB"""
    
        def __init__(self, container_client):
            self.container = container_client
            self.event_types = {}
    
        def register_event_type(self, event_type: str, handler_func):
            """Register event type with its handler"""
            self.event_types[event_type] = handler_func
    
        async def append_event(self, aggregate_id: str, event_type: str, 
                              event_data: Dict[str, Any], expected_version: int = None):
            """Append event to event stream"""
    
            event = {
                'id': f"{aggregate_id}_{event_type}_{datetime.utcnow().isoformat()}",
                'aggregateId': aggregate_id,
                'eventType': event_type,
                'eventData': event_data,
                'version': expected_version or await self._get_next_version(aggregate_id),
                'timestamp': datetime.utcnow().isoformat(),
                'category': 'event'
            }
    
            try:
                # Optimistic concurrency check
                if expected_version:
                    current_version = await self._get_current_version(aggregate_id)
                    if current_version >= expected_version:
                        raise Exception(f"Concurrency conflict: expected version {expected_version}, current version {current_version}")
    
                # Store event
                result = await self.container.create_item(body=event)
    
                return {
                    'success': True,
                    'event': result,
                    'version': event['version']
                }
    
            except Exception as e:
                return {
                    'success': False,
                    'error': str(e),
                    'aggregate_id': aggregate_id
                }
    
        async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Dict[str, Any]]:
            """Get events for an aggregate"""
    
            query = """
            SELECT * FROM c 
            WHERE c.aggregateId = @aggregate_id 
            AND c.version >= @from_version 
            AND c.category = 'event'
            ORDER BY c.version
            """
    
            parameters = [
                {"name": "@aggregate_id", "value": aggregate_id},
                {"name": "@from_version", "value": from_version}
            ]
    
            events = []
            query_iterable = self.container.query_items(
                query=query,
                parameters=parameters,
                partition_key=aggregate_id
            )
    
            async for event in query_iterable:
                events.append(event)
    
            return events
    
        async def rebuild_aggregate(self, aggregate_id: str, target_version: int = None) -> Dict[str, Any]:
            """Rebuild aggregate from events"""
    
            events = await self.get_events(aggregate_id)
    
            if target_version:
                events = [e for e in events if e['version'] <= target_version]
    
            aggregate_state = {}
    
            for event in events:
                event_type = event['eventType']
    
                if event_type in self.event_types:
                    handler = self.event_types[event_type]
                    aggregate_state = handler(aggregate_state, event['eventData'])
                else:
                    # Default handling - just merge event data
                    aggregate_state.update(event['eventData'])
    
            # Add metadata
            aggregate_state['_aggregateId'] = aggregate_id
            aggregate_state['_version'] = events[-1]['version'] if events else 0
            aggregate_state['_rebuiltAt'] = datetime.utcnow().isoformat()
    
            return aggregate_state
    
        async def _get_next_version(self, aggregate_id: str) -> int:
            """Get next version number for aggregate"""
            current_version = await self._get_current_version(aggregate_id)
            return current_version + 1
    
        async def _get_current_version(self, aggregate_id: str) -> int:
            """Get current version number for aggregate"""
    
            query = """
            SELECT TOP 1 c.version FROM c 
            WHERE c.aggregateId = @aggregate_id 
            AND c.category = 'event'
            ORDER BY c.version DESC
            """
    
            parameters = [{"name": "@aggregate_id", "value": aggregate_id}]
    
            query_iterable = self.container.query_items(
                query=query,
                parameters=parameters,
                partition_key=aggregate_id
            )
    
            try:
                latest_event = await query_iterable.__anext__()
                return latest_event['version']
            except StopAsyncIteration:
                return 0
    
    class CQRSPattern:
        """Command Query Responsibility Segregation pattern"""
    
        def __init__(self, write_container, read_container):
            self.write_container = write_container  # Command side
            self.read_container = read_container    # Query side
            self.projections = {}
    
        def register_projection(self, projection_name: str, projection_func):
            """Register a read projection"""
            self.projections[projection_name] = projection_func
    
        async def execute_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
            """Execute a command (write operation)"""
    
            try:
                # Validate command
                validation_result = await self._validate_command(command)
                if not validation_result['valid']:
                    return {
                        'success': False,
                        'error': validation_result['error']
                    }
    
                # Execute command
                command_result = await self._process_command(command)
    
                # Update read projections asynchronously
                asyncio.create_task(self._update_projections(command))
    
                return {
                    'success': True,
                    'result': command_result,
                    'command_id': command.get('id')
                }
    
            except Exception as e:
                return {
                    'success': False,
                    'error': str(e),
                    'command_id': command.get('id')
                }
    
        async def execute_query(self, query: Dict[str, Any]) -> Dict[str, Any]:
            """Execute a query (read operation)"""
    
            try:
                query_type = query.get('type')
    
                if query_type == 'get_by_id':
                    result = await self._query_by_id(query)
                elif query_type == 'search':
                    result = await self._query_search(query)
                elif query_type == 'aggregate':
                    result = await self._query_aggregate(query)
                else:
                    result = await self._query_custom(query)
    
                return {
                    'success': True,
                    'data': result,
                    'query_id': query.get('id')
                }
    
            except Exception as e:
                return {
                    'success': False,
                    'error': str(e),
                    'query_id': query.get('id')
                }
    
        async def _validate_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
            """Validate command before execution"""
    
            # Basic validation
            if not command.get('type'):
                return {'valid': False, 'error': 'Command type is required'}
    
            if not command.get('data'):
                return {'valid': False, 'error': 'Command data is required'}
    
            # Business rule validation
            command_type = command['type']
    
            if command_type == 'create_order':
                return await self._validate_create_order(command['data'])
            elif command_type == 'update_inventory':
                return await self._validate_update_inventory(command['data'])
            else:
                return {'valid': True}
    
        async def _validate_create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
            """Validate create order command"""
    
            if not order_data.get('customer_id'):
                return {'valid': False, 'error': 'Customer ID is required'}
    
            if not order_data.get('items') or len(order_data['items']) == 0:
                return {'valid': False, 'error': 'Order must have at least one item'}
    
            # Check inventory availability
            for item in order_data['items']:
                product_id = item.get('product_id')
                quantity = item.get('quantity', 0)
    
                if quantity <= 0:
                    return {'valid': False, 'error': f'Invalid quantity for product {product_id}'}
    
                # Check inventory (this would query current inventory)
                # For demo, assume inventory check passes
    
            return {'valid': True}
    
        async def _validate_update_inventory(self, inventory_data: Dict[str, Any]) -> Dict[str, Any]:
            """Validate update inventory command"""
    
            if not inventory_data.get('product_id'):
                return {'valid': False, 'error': 'Product ID is required'}
    
            if 'quantity' not in inventory_data:
                return {'valid': False, 'error': 'Quantity is required'}
    
            if inventory_data['quantity'] < 0:
                return {'valid': False, 'error': 'Quantity cannot be negative'}
    
            return {'valid': True}
    
        async def _process_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
            """Process the validated command"""
    
            command_record = {
                'id': command.get('id', f"cmd_{datetime.utcnow().isoformat()}"),
                'type': command['type'],
                'data': command['data'],
                'timestamp': datetime.utcnow().isoformat(),
                'status': 'executed',
                'category': 'command'
            }
    
            # Store command in write side
            result = await self.write_container.create_item(body=command_record)
    
            return result
    
        async def _update_projections(self, command: Dict[str, Any]):
            """Update read projections based on command"""
    
            for projection_name, projection_func in self.projections.items():
                try:
                    projection_data = await projection_func(command)
    
                    if projection_data:
                        # Update read side
                        await self.read_container.upsert_item(body=projection_data)
    
                except Exception as e:
                    print(f"Failed to update projection {projection_name}: {e}")
    
        async def _query_by_id(self, query: Dict[str, Any]) -> Dict[str, Any]:
            """Query by ID from read side"""
    
            item_id = query.get('id')
            partition_key = query.get('partition_key', item_id)
    
            try:
                result = await self.read_container.read_item(
                    item=item_id,
                    partition_key=partition_key
                )
                return result
            except Exception:
                return None
    
        async def _query_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
            """Search query from read side"""
    
            search_query = query.get('query', "SELECT * FROM c")
            parameters = query.get('parameters', [])
    
            results = []
            query_iterable = self.read_container.query_items(
                query=search_query,
                parameters=parameters,
                enable_cross_partition_query=True
            )
    
            async for item in query_iterable:
                results.append(item)
    
            return results
    
        async def _query_aggregate(self, query: Dict[str, Any]) -> Dict[str, Any]:
            """Aggregate query from read side"""
    
            # Example aggregate query
            aggregate_query = """
            SELECT 
                COUNT(1) as total_count,
                AVG(c.price) as avg_price,
                SUM(c.quantity) as total_quantity
            FROM c 
            WHERE c.category = @category
            """
    
            parameters = [
                {"name": "@category", "value": query.get('category', 'unknown')}
            ]
    
            query_iterable = self.read_container.query_items(
                query=aggregate_query,
                parameters=parameters,
                enable_cross_partition_query=True
            )
    
            try:
                result = await query_iterable.__anext__()
                return result
            except StopAsyncIteration:
                return {}
    
        async def _query_custom(self, query: Dict[str, Any]) -> Any:
            """Custom query logic"""
            # Implement custom query logic based on your needs
            return {"message": "Custom query not implemented"}
    
    # Usage examples for advanced scenarios
    async def advanced_scenarios_example():
        """Example demonstrating advanced scenarios"""
    
        # Setup clients (mock for demonstration)
        primary_client = None  # Your primary cosmos client
        region_clients = {
            'west-us': None,      # Your west US client
            'east-europe': None,  # Your east Europe client
            'southeast-asia': None # Your southeast Asia client
        }
    
        # Global distribution example
        global_dist = GlobalDistributionManager(primary_client, region_clients)
    
        # Setup global distribution
        containers = ['products', 'orders', 'customers']
        distribution_result = await global_dist.setup_global_distribution(
            containers, 
            ConflictResolutionMode.LAST_WRITER_WINS
        )
    
        print(f"Global distribution setup: {distribution_result}")
    
        # Write with region preference
        product_data = {
            'id': 'product-001',
            'name': 'Global Product',
            'price': 99.99,
            'category': 'electronics'
        }
    
        write_result = await global_dist.write_with_preference(
            product_data, 
            preferred_region='west-us'
        )
    
        print(f"Write result: {write_result}")
    
        # Event sourcing example
        container = None  # Your container client
        event_sourcing = EventSourcingPattern(container)
    
        # Register event handlers
        def handle_product_created(state, event_data):
            state.update(event_data)
            state['status'] = 'active'
            return state
    
        def handle_price_updated(state, event_data):
            state['price'] = event_data['new_price']
            state['price_history'] = state.get('price_history', [])
            state['price_history'].append({
                'old_price': event_data['old_price'],
                'new_price': event_data['new_price'],
                'changed_at': datetime.utcnow().isoformat()
            })
            return state
    
        event_sourcing.register_event_type('ProductCreated', handle_product_created)
        event_sourcing.register_event_type('PriceUpdated', handle_price_updated)
    
        # Append events
        aggregate_id = 'product-001'
    
        create_event = await event_sourcing.append_event(
            aggregate_id,
            'ProductCreated',
            {
                'name': 'Sample Product',
                'price': 100.0,
                'category': 'electronics'
            }
        )
    
        price_update_event = await event_sourcing.append_event(
            aggregate_id,
            'PriceUpdated',
            {
                'old_price': 100.0,
                'new_price': 89.99
            }
        )
    
        # Rebuild aggregate
        current_state = await event_sourcing.rebuild_aggregate(aggregate_id)
        print(f"Current product state: {current_state}")
    
        # CQRS example
        write_container = None  # Your write container
        read_container = None   # Your read container
    
        cqrs = CQRSPattern(write_container, read_container)
    
        # Register projections
        async def order_summary_projection(command):
            if command['type'] == 'create_order':
                order_data = command['data']
                return {
                    'id': f"summary_{order_data['customer_id']}",
                    'customer_id': order_data['customer_id'],
                    'total_orders': 1,
                    'total_amount': order_data.get('total', 0),
                    'last_order_date': datetime.utcnow().isoformat(),
                    'category': 'order_summary'
                }
            return None
    
        cqrs.register_projection('order_summary', order_summary_projection)
    
        # Execute command
        create_order_command = {
            'id': 'cmd-001',
            'type': 'create_order',
            'data': {
                'customer_id': 'customer-123',
                'items': [
                    {'product_id': 'product-001', 'quantity': 2, 'price': 89.99}
                ],
                'total': 179.98
            }
        }
    
        command_result = await cqrs.execute_command(create_order_command)
        print(f"Command result: {command_result}")
    
        # Execute query
        query = {
            'id': 'query-001',
            'type': 'get_by_id',
            'id': 'summary_customer-123',
            'partition_key': 'customer-123'
        }
    
        query_result = await cqrs.execute_query(query)
        print(f"Query result: {query_result}")
    
    if __name__ == "__main__":
        asyncio.run(advanced_scenarios_example())
    Python

    Conclusion

    This comprehensive guide has covered Azure Cosmos DB development with Python from beginner to expert level. Here’s a summary of what we’ve covered:

    Key Topics Covered:

    1. Foundation: Core concepts, setup, and basic operations
    2. Intermediate: Advanced querying, partitioning, and performance optimization
    3. Advanced: Global distribution, event sourcing, CQRS patterns
    4. Production: Security, monitoring, best practices, and deployment strategies

    Key Takeaways:

    • Partition Key Design is crucial for performance and scalability
    • Consistency Levels should be chosen based on application requirements
    • Change Feed enables real-time processing and event-driven architectures
    • Monitoring and Alerting are essential for production systems
    • Security should be implemented at multiple layers
    • Global Distribution provides low-latency access worldwide

    Next Steps:

    1. Practice with the provided examples
    2. Implement monitoring and alerting in your applications
    3. Experiment with different consistency levels and partition strategies
    4. Build production-ready applications using the patterns shown
    5. Stay Updated with Azure Cosmos DB feature updates and best practices

    This guide provides a solid foundation for building scalable, high-performance applications with Azure Cosmos DB and Python. The patterns and practices shown here are used in production systems handling millions of operations daily.

    Remember to always test your implementations thoroughly and monitor performance in production environments. Azure Cosmos DB’s flexibility allows you to adapt your approach as your application requirements evolve.


    Discover more from Altgr Blog

    Subscribe to get the latest posts sent to your email.

    Leave a Reply

    Your email address will not be published. Required fields are marked *