A Complete Guide to Building Scalable Applications with Azure Cosmos DB and Python
Table of Contents
- Introduction to Azure Cosmos DB
- Setting Up Your Development Environment
- Understanding Cosmos DB Core Concepts
- Getting Started with Python SDK
- Database and Container Operations
- CRUD Operations in Detail
- Querying Data
- Partitioning Strategies
- Performance Optimization
- Consistency Levels
- Change Feed Processing
- Stored Procedures and Triggers
- Security and Authentication
- Monitoring and Troubleshooting
- Integration Patterns
- Best Practices and Production Deployment
- Advanced Scenarios
1. Introduction to Azure Cosmos DB
What is Azure Cosmos DB?
Azure Cosmos DB is Microsoft’s globally distributed, multi-model database service designed for modern applications that require low latency, elastic scale, and high availability. It provides comprehensive SLAs for throughput, latency, availability, and consistency.
graph TB
A[Azure Cosmos DB] --> B[Multi-Model Support]
A --> C[Global Distribution]
A --> D[Elastic Scale]
A --> E[Multiple Consistency Levels]
B --> F[SQL API]
B --> G[MongoDB API]
B --> H[Cassandra API]
B --> I[Gremlin API]
B --> J[Table API]
C --> K[Multi-Region Writes]
C --> L[Automatic Failover]
C --> M[Data Replication]
D --> N[Horizontal Scaling]
D --> O[Auto-scaling]
D --> P[Reserved Capacity]Key Features
- Turnkey Global Distribution: Replicate your data across any number of Azure regions
- Multi-Model Support: Work with document, key-value, graph, and column-family data
- Elastic Scale: Scale throughput and storage independently and elastically
- Low Latency: Single-digit millisecond latencies at the 99th percentile
- Five Consistency Levels: Choose from strong, bounded staleness, session, consistent prefix, and eventual consistency
Use Cases
mindmap
root((Azure Cosmos DB Use Cases))
Web Applications
E-commerce platforms
Content management
User profiles
IoT Applications
Telemetry data
Real-time analytics
Device management
Gaming
Leaderboards
Player profiles
Game state
Mobile Apps
Offline sync
Real-time features
User data
Financial Services
Transaction processing
Risk management
Compliance reporting2. Setting Up Your Development Environment
Prerequisites
Before we begin, ensure you have the following:
- Python 3.7 or higher
- An Azure subscription
- Visual Studio Code (recommended) or your preferred IDE
- Git for version control
Installing Required Packages
# Install the Azure Cosmos DB Python SDK
pip install azure-cosmos
# Additional useful packages
pip install python-dotenv # For environment variables
pip install azure-identity # For authentication
pip install azure-keyvault-secrets # For secret management
pip install pytest # For testing
pip install jupyter # For interactive developmentBashCreating an Azure Cosmos DB Account
sequenceDiagram
participant User
participant AzurePortal
participant CosmosDB
participant Python
User->>AzurePortal: Create Cosmos DB Account
AzurePortal->>CosmosDB: Provision Resources
CosmosDB->>AzurePortal: Return Connection Details
AzurePortal->>User: Provide Endpoint & Keys
User->>Python: Configure SDK with credentials
Python->>CosmosDB: Establish ConnectionEnvironment Configuration
Create a .env file to store your configuration:
# .env file
COSMOS_ENDPOINT=https://your-account.documents.azure.com:443/
COSMOS_KEY=your-primary-key-here
COSMOS_DATABASE_NAME=SampleDB
COSMOS_CONTAINER_NAME=SampleContainerBashBasic Configuration Module
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
class CosmosConfig:
def __init__(self):
self.endpoint = os.getenv('COSMOS_ENDPOINT')
self.key = os.getenv('COSMOS_KEY')
self.database_name = os.getenv('COSMOS_DATABASE_NAME')
self.container_name = os.getenv('COSMOS_CONTAINER_NAME')
def validate(self):
"""Validate that all required configuration is present"""
required_vars = [
self.endpoint,
self.key,
self.database_name,
self.container_name
]
if not all(required_vars):
raise ValueError("Missing required configuration variables")
return True
# Usage
config = CosmosConfig()
config.validate()Python3. Understanding Cosmos DB Core Concepts
Hierarchy of Resources
graph TD
A[Cosmos DB Account] --> B[Database]
B --> C[Container]
C --> D[Items/Documents]
C --> E[Stored Procedures]
C --> F[User Defined Functions]
C --> G[Triggers]
A --> H[Account Settings]
H --> I[Consistency Level]
H --> J[Geo-Replication]
H --> K[Failover Policy]
C --> L[Container Settings]
L --> M[Partition Key]
L --> N[Throughput]
L --> O[Indexing Policy]Request Units (RUs)
Request Units represent the cost of database operations in Cosmos DB. Understanding RUs is crucial for performance and cost optimization.
graph LR
A[Database Operation] --> B[Request Units Consumed]
B --> C[Point Read: 1 RU per 1KB]
B --> D[Point Write: ~5 RUs per 1KB]
B --> E[Query: Variable RUs]
B --> F[Stored Procedure: Variable RUs]
G[Factors Affecting RU Consumption] --> H[Item Size]
G --> I[Operation Type]
G --> J[Indexing]
G --> K[Consistency Level]
G --> L[Query Complexity]Partitioning
graph TB
A[Logical Partition] --> B[Partition Key Value: 'Electronics']
A --> C[Partition Key Value: 'Clothing']
A --> D[Partition Key Value: 'Books']
B --> E[Physical Partition 1]
C --> F[Physical Partition 2]
D --> G[Physical Partition 3]
E --> H[Items with Electronics category]
F --> I[Items with Clothing category]
G --> J[Items with Books category]
K[Partition Key Selection] --> L[High Cardinality]
K --> M[Even Distribution]
K --> N[Query Efficiency]4. Getting Started with Python SDK
Basic Connection
# cosmos_client.py
from azure.cosmos import CosmosClient, PartitionKey, exceptions
from config import CosmosConfig
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CosmosDBClient:
def __init__(self, config: CosmosConfig):
"""Initialize Cosmos DB client with configuration"""
self.config = config
self.client = None
self.database = None
self.container = None
def connect(self):
"""Establish connection to Cosmos DB"""
try:
self.client = CosmosClient(
url=self.config.endpoint,
credential=self.config.key
)
logger.info("Successfully connected to Cosmos DB")
return self
except Exception as e:
logger.error(f"Failed to connect to Cosmos DB: {e}")
raise
def get_database(self, create_if_not_exists=True):
"""Get or create database"""
try:
if create_if_not_exists:
self.database = self.client.create_database_if_not_exists(
id=self.config.database_name
)
else:
self.database = self.client.get_database_client(
database=self.config.database_name
)
logger.info(f"Database '{self.config.database_name}' ready")
return self.database
except exceptions.CosmosResourceNotFoundError:
logger.error(f"Database '{self.config.database_name}' not found")
raise
except Exception as e:
logger.error(f"Error accessing database: {e}")
raise
def get_container(self, partition_key_path="/id", create_if_not_exists=True):
"""Get or create container"""
if not self.database:
self.get_database()
try:
if create_if_not_exists:
self.container = self.database.create_container_if_not_exists(
id=self.config.container_name,
partition_key=PartitionKey(path=partition_key_path),
offer_throughput=400 # Minimum throughput
)
else:
self.container = self.database.get_container_client(
container=self.config.container_name
)
logger.info(f"Container '{self.config.container_name}' ready")
return self.container
except Exception as e:
logger.error(f"Error accessing container: {e}")
raise
# Usage example
if __name__ == "__main__":
config = CosmosConfig()
# Initialize and connect
cosmos_client = CosmosDBClient(config)
cosmos_client.connect()
# Get database and container
database = cosmos_client.get_database()
container = cosmos_client.get_container(partition_key_path="/category")
print("Cosmos DB setup complete!")PythonError Handling Best Practices
# error_handling.py
from azure.cosmos import exceptions
import time
import random
def handle_cosmos_exceptions(func):
"""Decorator for handling common Cosmos DB exceptions"""
def wrapper(*args, **kwargs):
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions.CosmosResourceNotFoundError as e:
logger.error(f"Resource not found: {e}")
raise
except exceptions.CosmosResourceExistsError as e:
logger.warning(f"Resource already exists: {e}")
raise
except exceptions.CosmosHttpResponseError as e:
if e.status_code == 429: # Rate limited
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Rate limited. Retrying in {wait_time:.2f} seconds...")
time.sleep(wait_time)
continue
logger.error(f"HTTP error: {e.status_code} - {e.message}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
raise Exception(f"Max retries ({max_retries}) exceeded")
return wrapperPython5. Database and Container Operations
Database Management
# database_operations.py
from azure.cosmos import exceptions
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, cosmos_client):
self.client = cosmos_client
@handle_cosmos_exceptions
def create_database(self, database_id, throughput=None):
"""Create a new database"""
try:
database = self.client.create_database(
id=database_id,
offer_throughput=throughput
)
logger.info(f"Created database: {database_id}")
return database
except exceptions.CosmosResourceExistsError:
logger.warning(f"Database {database_id} already exists")
return self.client.get_database_client(database_id)
@handle_cosmos_exceptions
def list_databases(self):
"""List all databases in the account"""
databases = list(self.client.list_databases())
logger.info(f"Found {len(databases)} databases")
return databases
@handle_cosmos_exceptions
def delete_database(self, database_id):
"""Delete a database"""
database = self.client.get_database_client(database_id)
database.delete_database()
logger.info(f"Deleted database: {database_id}")
@handle_cosmos_exceptions
def get_database_properties(self, database_id):
"""Get database properties and throughput information"""
database = self.client.get_database_client(database_id)
properties = database.read()
try:
throughput = database.read_offer()
properties['throughput'] = throughput['content']['offerThroughput']
except exceptions.CosmosResourceNotFoundError:
properties['throughput'] = None # Serverless or container-level throughput
return propertiesPythonContainer Management
# container_operations.py
from azure.cosmos import PartitionKey
import json
class ContainerManager:
def __init__(self, database_client):
self.database = database_client
@handle_cosmos_exceptions
def create_container(self, container_id, partition_key_path,
throughput=400, indexing_policy=None):
"""Create a new container with specified partition key"""
container_definition = {
'id': container_id,
'partitionKey': {
'paths': [partition_key_path],
'kind': 'Hash'
}
}
if indexing_policy:
container_definition['indexingPolicy'] = indexing_policy
try:
container = self.database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
offer_throughput=throughput,
indexing_policy=indexing_policy
)
logger.info(f"Created container: {container_id}")
return container
except exceptions.CosmosResourceExistsError:
logger.warning(f"Container {container_id} already exists")
return self.database.get_container_client(container_id)
@handle_cosmos_exceptions
def create_container_with_custom_indexing(self, container_id, partition_key_path):
"""Create container with custom indexing policy"""
# Custom indexing policy example
indexing_policy = {
"indexingMode": "consistent",
"automatic": True,
"includedPaths": [
{
"path": "/*"
}
],
"excludedPaths": [
{
"path": "/description/*"
},
{
"path": "/\"_etag\"/?"
}
],
"compositeIndexes": [
[
{
"path": "/category",
"order": "ascending"
},
{
"path": "/price",
"order": "descending"
}
]
]
}
return self.create_container(
container_id=container_id,
partition_key_path=partition_key_path,
throughput=400,
indexing_policy=indexing_policy
)
@handle_cosmos_exceptions
def list_containers(self):
"""List all containers in the database"""
containers = list(self.database.list_containers())
logger.info(f"Found {len(containers)} containers")
return containers
@handle_cosmos_exceptions
def delete_container(self, container_id):
"""Delete a container"""
container = self.database.get_container_client(container_id)
container.delete_container()
logger.info(f"Deleted container: {container_id}")
@handle_cosmos_exceptions
def scale_container_throughput(self, container_id, new_throughput):
"""Scale container throughput"""
container = self.database.get_container_client(container_id)
try:
current_offer = container.read_offer()
container.replace_throughput(new_throughput)
logger.info(f"Scaled container {container_id} throughput to {new_throughput} RU/s")
except exceptions.CosmosResourceNotFoundError:
logger.error(f"Container {container_id} does not have provisioned throughput")
raisePythonContainer Throughput Management
graph TD
A[Container Throughput] --> B[Provisioned]
A --> C[Serverless]
A --> D[Autoscale]
B --> E[Fixed RU/s]
B --> F[Manual Scaling]
B --> G[Predictable Costs]
C --> H[Pay per Request]
C --> I[No Throughput Management]
C --> J[Variable Costs]
D --> K[Auto-scaling Range]
D --> L[Scales to Zero]
D --> M[Handles Traffic Spikes]
N[Choosing Throughput Model] --> O[Workload Pattern]
N --> P[Cost Considerations]
N --> Q[Performance Requirements]6. CRUD Operations in Detail
Create Operations
# crud_operations.py
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
class DocumentOperations:
def __init__(self, container_client):
self.container = container_client
@handle_cosmos_exceptions
def create_item(self, item_data: Dict[str, Any], partition_key_value: str = None) -> Dict[str, Any]:
"""Create a new document in the container"""
# Ensure the item has an id
if 'id' not in item_data:
item_data['id'] = str(uuid.uuid4())
# Add timestamp
item_data['_ts'] = datetime.utcnow().isoformat()
# Set partition key if provided
if partition_key_value:
item_data['partitionKey'] = partition_key_value
created_item = self.container.create_item(body=item_data)
logger.info(f"Created item with id: {created_item['id']}")
return created_item
@handle_cosmos_exceptions
def create_multiple_items(self, items: list) -> list:
"""Create multiple items efficiently"""
created_items = []
for item in items:
try:
created_item = self.create_item(item)
created_items.append(created_item)
except Exception as e:
logger.error(f"Failed to create item {item.get('id', 'unknown')}: {e}")
# Continue with other items
logger.info(f"Successfully created {len(created_items)} out of {len(items)} items")
return created_itemsPythonRead Operations
@handle_cosmos_exceptions
def read_item(self, item_id: str, partition_key: str) -> Optional[Dict[str, Any]]:
"""Read a specific item by id and partition key"""
try:
item = self.container.read_item(
item=item_id,
partition_key=partition_key
)
logger.info(f"Retrieved item: {item_id}")
return item
except exceptions.CosmosResourceNotFoundError:
logger.warning(f"Item not found: {item_id}")
return None
@handle_cosmos_exceptions
def read_all_items(self, max_item_count: int = None) -> list:
"""Read all items in the container"""
items = list(self.container.read_all_items(max_item_count=max_item_count))
logger.info(f"Retrieved {len(items)} items")
return items
@handle_cosmos_exceptions
def read_items_by_partition(self, partition_key: str) -> list:
"""Read all items in a specific partition"""
query = "SELECT * FROM c WHERE c.partitionKey = @partition_key"
parameters = [{"name": "@partition_key", "value": partition_key}]
items = list(self.container.query_items(
query=query,
parameters=parameters,
partition_key=partition_key
))
logger.info(f"Retrieved {len(items)} items from partition: {partition_key}")
return itemsPythonUpdate Operations
@handle_cosmos_exceptions
def update_item(self, item_id: str, partition_key: str,
updates: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing item"""
try:
# First, read the current item
current_item = self.read_item(item_id, partition_key)
if not current_item:
raise ValueError(f"Item {item_id} not found")
# Apply updates
current_item.update(updates)
current_item['_lastModified'] = datetime.utcnow().isoformat()
# Replace the item
updated_item = self.container.replace_item(
item=item_id,
body=current_item
)
logger.info(f"Updated item: {item_id}")
return updated_item
except exceptions.CosmosResourceNotFoundError:
logger.error(f"Item not found for update: {item_id}")
raise
@handle_cosmos_exceptions
def upsert_item(self, item_data: Dict[str, Any]) -> Dict[str, Any]:
"""Insert or update an item"""
item_data['_lastModified'] = datetime.utcnow().isoformat()
upserted_item = self.container.upsert_item(body=item_data)
logger.info(f"Upserted item: {upserted_item['id']}")
return upserted_item
@handle_cosmos_exceptions
def patch_item(self, item_id: str, partition_key: str,
patch_operations: list) -> Dict[str, Any]:
"""Perform patch operations on an item"""
# Example patch operations:
# [
# {"op": "replace", "path": "/price", "value": 29.99},
# {"op": "add", "path": "/tags/-", "value": "new-tag"},
# {"op": "remove", "path": "/discontinued"}
# ]
patched_item = self.container.patch_item(
item=item_id,
partition_key=partition_key,
patch_operations=patch_operations
)
logger.info(f"Patched item: {item_id}")
return patched_itemPythonDelete Operations
@handle_cosmos_exceptions
def delete_item(self, item_id: str, partition_key: str) -> bool:
"""Delete a specific item"""
try:
self.container.delete_item(
item=item_id,
partition_key=partition_key
)
logger.info(f"Deleted item: {item_id}")
return True
except exceptions.CosmosResourceNotFoundError:
logger.warning(f"Item not found for deletion: {item_id}")
return False
@handle_cosmos_exceptions
def delete_items_by_condition(self, condition: str, parameters: list = None) -> int:
"""Delete items that match a condition"""
# First, query to get items to delete
query = f"SELECT c.id, c.partitionKey FROM c WHERE {condition}"
items_to_delete = list(self.container.query_items(
query=query,
parameters=parameters or []
))
deleted_count = 0
for item in items_to_delete:
try:
self.delete_item(item['id'], item['partitionKey'])
deleted_count += 1
except Exception as e:
logger.error(f"Failed to delete item {item['id']}: {e}")
logger.info(f"Deleted {deleted_count} items")
return deleted_countPythonBulk Operations
# bulk_operations.py
from azure.cosmos import TransactionalBatch
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
class BulkOperations:
def __init__(self, container_client):
self.container = container_client
self._lock = threading.Lock()
def bulk_create_threaded(self, items: list, max_workers: int = 5) -> list:
"""Create items using multiple threads"""
created_items = []
failed_items = []
def create_item_worker(item):
try:
doc_ops = DocumentOperations(self.container)
return doc_ops.create_item(item)
except Exception as e:
logger.error(f"Failed to create item: {e}")
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_item = {
executor.submit(create_item_worker, item): item
for item in items
}
for future in as_completed(future_to_item):
result = future.result()
if result:
with self._lock:
created_items.append(result)
else:
with self._lock:
failed_items.append(future_to_item[future])
logger.info(f"Bulk create completed: {len(created_items)} succeeded, {len(failed_items)} failed")
return created_items, failed_items
@handle_cosmos_exceptions
def transactional_batch_operations(self, partition_key: str, operations: list):
"""Perform multiple operations in a transaction"""
# Create a transactional batch
batch = TransactionalBatch()
for operation in operations:
op_type = operation['type']
item_data = operation['data']
if op_type == 'create':
batch.create_item(body=item_data)
elif op_type == 'upsert':
batch.upsert_item(body=item_data)
elif op_type == 'replace':
batch.replace_item(item=item_data['id'], body=item_data)
elif op_type == 'delete':
batch.delete_item(item=item_data['id'])
# Execute the batch
batch_response = self.container.execute_item_batch(
batch_operations=batch,
partition_key=partition_key
)
logger.info(f"Executed batch with {len(operations)} operations")
return batch_responsePython7. Querying Data
SQL Query Fundamentals
# query_operations.py
from typing import List, Dict, Any, Optional
class QueryOperations:
def __init__(self, container_client):
self.container = container_client
@handle_cosmos_exceptions
def simple_query(self, query: str, parameters: List[Dict] = None,
cross_partition: bool = False) -> List[Dict[str, Any]]:
"""Execute a simple SQL query"""
enable_cross_partition_query = cross_partition
items = list(self.container.query_items(
query=query,
parameters=parameters or [],
enable_cross_partition_query=enable_cross_partition_query
))
logger.info(f"Query returned {len(items)} items")
return items
def query_with_pagination(self, query: str, page_size: int = 100,
parameters: List[Dict] = None) -> List[Dict[str, Any]]:
"""Execute query with pagination support"""
all_items = []
continuation_token = None
while True:
query_iterable = self.container.query_items(
query=query,
parameters=parameters or [],
max_item_count=page_size,
continuation=continuation_token,
enable_cross_partition_query=True
)
# Get the current page
current_page = []
iterator = iter(query_iterable)
try:
for _ in range(page_size):
current_page.append(next(iterator))
except StopIteration:
pass
all_items.extend(current_page)
# Check if there are more pages
continuation_token = query_iterable.continuation
if not continuation_token or len(current_page) < page_size:
break
logger.info(f"Paginated query returned {len(all_items)} total items")
return all_itemsPythonQuery Examples
class QueryExamples:
def __init__(self, query_ops: QueryOperations):
self.query_ops = query_ops
def find_by_category(self, category: str) -> List[Dict]:
"""Find all items in a specific category"""
query = "SELECT * FROM c WHERE c.category = @category"
parameters = [{"name": "@category", "value": category}]
return self.query_ops.simple_query(query, parameters)
def find_by_price_range(self, min_price: float, max_price: float) -> List[Dict]:
"""Find items within a price range"""
query = """
SELECT c.id, c.name, c.price, c.category
FROM c
WHERE c.price >= @min_price AND c.price <= @max_price
ORDER BY c.price
"""
parameters = [
{"name": "@min_price", "value": min_price},
{"name": "@max_price", "value": max_price}
]
return self.query_ops.simple_query(query, parameters, cross_partition=True)
def search_by_text(self, search_term: str) -> List[Dict]:
"""Search items by text in name or description"""
query = """
SELECT * FROM c
WHERE CONTAINS(UPPER(c.name), UPPER(@search_term))
OR CONTAINS(UPPER(c.description), UPPER(@search_term))
"""
parameters = [{"name": "@search_term", "value": search_term}]
return self.query_ops.simple_query(query, parameters, cross_partition=True)
def aggregate_by_category(self) -> List[Dict]:
"""Get count and average price by category"""
query = """
SELECT
c.category,
COUNT(1) as item_count,
AVG(c.price) as avg_price,
MIN(c.price) as min_price,
MAX(c.price) as max_price
FROM c
GROUP BY c.category
"""
return self.query_ops.simple_query(query, cross_partition=True)
def find_recent_items(self, days: int = 7) -> List[Dict]:
"""Find items created in the last N days"""
query = """
SELECT * FROM c
WHERE c._ts > @timestamp
ORDER BY c._ts DESC
"""
import time
timestamp = int(time.time()) - (days * 24 * 60 * 60)
parameters = [{"name": "@timestamp", "value": timestamp}]
return self.query_ops.simple_query(query, parameters, cross_partition=True)
def complex_join_query(self) -> List[Dict]:
"""Example of a complex query with subqueries"""
query = """
SELECT
c.category,
c.name,
c.price,
ARRAY(
SELECT VALUE t
FROM t IN c.tags
WHERE t != "legacy"
) as filtered_tags
FROM c
WHERE c.price > (
SELECT VALUE AVG(p.price)
FROM p IN c.category
)
"""
return self.query_ops.simple_query(query, cross_partition=True)PythonQuery Performance Optimization
graph TD
A[Query Performance] --> B[Index Usage]
A --> C[Partition Key]
A --> D[Query Structure]
A --> E[RU Consumption]
B --> F[Included Paths]
B --> G[Excluded Paths]
B --> H[Composite Indexes]
C --> I[Single Partition Queries]
C --> J[Cross-Partition Queries]
D --> K[SELECT Clause Optimization]
D --> L[WHERE Clause Efficiency]
D --> M[ORDER BY Considerations]
E --> N[Query Metrics]
E --> O[Request Charge]
E --> P[Response Time]# query_optimization.py
class QueryOptimization:
def __init__(self, container_client):
self.container = container_client
def analyze_query_metrics(self, query: str, parameters: list = None):
"""Analyze query performance metrics"""
query_iterable = self.container.query_items(
query=query,
parameters=parameters or [],
enable_cross_partition_query=True,
populate_query_metrics=True
)
# Execute query and collect metrics
items = list(query_iterable)
# Get query metrics
metrics = query_iterable.query_metrics
analysis = {
'item_count': len(items),
'request_charge': query_iterable.request_charge,
'query_metrics': metrics
}
logger.info(f"Query Analysis: {analysis}")
return analysis
def optimized_pagination(self, query: str, page_size: int = 100):
"""Optimized pagination with continuation tokens"""
pages = []
continuation_token = None
total_ru_charge = 0
while True:
query_iterable = self.container.query_items(
query=query,
max_item_count=page_size,
continuation=continuation_token,
enable_cross_partition_query=True
)
# Convert to list to execute the query
page_items = list(query_iterable)
if not page_items:
break
pages.append({
'items': page_items,
'request_charge': query_iterable.request_charge,
'continuation_token': query_iterable.continuation
})
total_ru_charge += query_iterable.request_charge
continuation_token = query_iterable.continuation
if not continuation_token:
break
logger.info(f"Retrieved {len(pages)} pages, Total RU: {total_ru_charge}")
return pages, total_ru_chargePython8. Partitioning Strategies
Understanding Partition Keys
graph TB
A[Partition Key Selection] --> B[High Cardinality]
A --> C[Even Distribution]
A --> D[Query Pattern Alignment]
B --> E[Many Unique Values]
B --> F[Avoid Hot Partitions]
C --> G[Balanced Load]
C --> H[Efficient Scaling]
D --> I[Single Partition Queries]
D --> J[Minimal Cross-Partition]
K[Common Patterns] --> L[Entity ID]
K --> M[Category/Type]
K --> N[Time-based]
K --> O[User/Tenant ID]
K --> P[Geographic Region]
Q[Anti-patterns] --> R[Sequential Values]
Q --> S[Low Cardinality]
Q --> T[Uneven Distribution]Partition Key Design Patterns
# partitioning_strategies.py
import hashlib
from datetime import datetime
from typing import Any, Dict
class PartitionKeyStrategies:
@staticmethod
def user_based_partition(user_id: str) -> str:
"""Use user ID as partition key - good for user-centric applications"""
return user_id
@staticmethod
def category_based_partition(category: str) -> str:
"""Use category as partition key - good for catalog applications"""
return category.lower()
@staticmethod
def hash_based_partition(value: str, num_partitions: int = 100) -> str:
"""Create hash-based partition for even distribution"""
hash_object = hashlib.md5(value.encode())
hash_hex = hash_object.hexdigest()
partition_num = int(hash_hex, 16) % num_partitions
return f"partition_{partition_num:03d}"
@staticmethod
def time_based_partition(date_value: datetime = None, granularity: str = "month") -> str:
"""Create time-based partitions"""
if not date_value:
date_value = datetime.utcnow()
if granularity == "year":
return f"{date_value.year}"
elif granularity == "month":
return f"{date_value.year}_{date_value.month:02d}"
elif granularity == "day":
return f"{date_value.year}_{date_value.month:02d}_{date_value.day:02d}"
else:
raise ValueError("Granularity must be 'year', 'month', or 'day'")
@staticmethod
def composite_partition(primary: str, secondary: str, separator: str = "_") -> str:
"""Create composite partition key"""
return f"{primary}{separator}{secondary}"
@staticmethod
def tenant_based_partition(tenant_id: str, entity_type: str = None) -> str:
"""Multi-tenant partition strategy"""
if entity_type:
return f"{tenant_id}_{entity_type}"
return tenant_id
@staticmethod
def geo_based_partition(country: str, region: str = None) -> str:
"""Geographic-based partitioning"""
if region:
return f"{country}_{region}"
return country
class PartitionDesignHelper:
def __init__(self):
self.strategies = PartitionKeyStrategies()
def analyze_partition_distribution(self, items: list, partition_key_func) -> Dict[str, Any]:
"""Analyze how items would be distributed across partitions"""
partition_counts = {}
for item in items:
partition_key = partition_key_func(item)
partition_counts[partition_key] = partition_counts.get(partition_key, 0) + 1
total_items = len(items)
num_partitions = len(partition_counts)
# Calculate distribution metrics
avg_items_per_partition = total_items / num_partitions if num_partitions > 0 else 0
max_items = max(partition_counts.values()) if partition_counts else 0
min_items = min(partition_counts.values()) if partition_counts else 0
# Calculate skew
skew = (max_items - min_items) / avg_items_per_partition if avg_items_per_partition > 0 else 0
analysis = {
'total_items': total_items,
'num_partitions': num_partitions,
'avg_items_per_partition': avg_items_per_partition,
'max_items_in_partition': max_items,
'min_items_in_partition': min_items,
'distribution_skew': skew,
'partition_distribution': partition_counts
}
return analysis
def recommend_partition_strategy(self, sample_data: list, query_patterns: list) -> Dict[str, Any]:
"""Recommend partition strategy based on data and query patterns"""
recommendations = []
# Analyze different strategies
strategies_to_test = [
("user_id", lambda item: self.strategies.user_based_partition(item.get('userId', 'unknown'))),
("category", lambda item: self.strategies.category_based_partition(item.get('category', 'unknown'))),
("hash_user", lambda item: self.strategies.hash_based_partition(item.get('userId', 'unknown'))),
("time_month", lambda item: self.strategies.time_based_partition(
datetime.fromisoformat(item.get('createdDate', datetime.utcnow().isoformat())), "month"
)),
]
for strategy_name, strategy_func in strategies_to_test:
try:
analysis = self.analyze_partition_distribution(sample_data, strategy_func)
analysis['strategy_name'] = strategy_name
recommendations.append(analysis)
except Exception as e:
logger.warning(f"Could not analyze strategy {strategy_name}: {e}")
# Sort by distribution quality (lower skew is better)
recommendations.sort(key=lambda x: x['distribution_skew'])
return {
'recommended_strategy': recommendations[0] if recommendations else None,
'all_analyses': recommendations,
'query_pattern_considerations': self._analyze_query_patterns(query_patterns)
}
def _analyze_query_patterns(self, query_patterns: list) -> Dict[str, Any]:
"""Analyze query patterns to understand partition key requirements"""
cross_partition_queries = []
single_partition_queries = []
for pattern in query_patterns:
if pattern.get('requires_cross_partition', False):
cross_partition_queries.append(pattern)
else:
single_partition_queries.append(pattern)
return {
'total_patterns': len(query_patterns),
'cross_partition_ratio': len(cross_partition_queries) / len(query_patterns) if query_patterns else 0,
'single_partition_ratio': len(single_partition_queries) / len(query_patterns) if query_patterns else 0,
'recommendations': [
"Consider time-based partitioning for time-series data",
"Use user/tenant ID for user-centric applications",
"Use hash-based partitioning for even distribution",
"Avoid low-cardinality partition keys"
]
}PythonHot Partition Detection and Mitigation
# hot_partition_detection.py
import time
from collections import defaultdict, deque
from datetime import datetime, timedelta
class HotPartitionMonitor:
def __init__(self, container_client, monitoring_window_minutes: int = 15):
self.container = container_client
self.monitoring_window = timedelta(minutes=monitoring_window_minutes)
self.partition_metrics = defaultdict(lambda: deque())
def record_operation(self, partition_key: str, operation_type: str, ru_consumed: float):
"""Record an operation for hot partition analysis"""
timestamp = datetime.utcnow()
self.partition_metrics[partition_key].append({
'timestamp': timestamp,
'operation_type': operation_type,
'ru_consumed': ru_consumed
})
# Clean old records
self._cleanup_old_records(partition_key)
def _cleanup_old_records(self, partition_key: str):
"""Remove records outside the monitoring window"""
cutoff_time = datetime.utcnow() - self.monitoring_window
while (self.partition_metrics[partition_key] and
self.partition_metrics[partition_key][0]['timestamp'] < cutoff_time):
self.partition_metrics[partition_key].popleft()
def detect_hot_partitions(self, ru_threshold: float = 1000) -> list:
"""Detect partitions that exceed RU threshold in the monitoring window"""
hot_partitions = []
for partition_key, operations in self.partition_metrics.items():
total_ru = sum(op['ru_consumed'] for op in operations)
if total_ru > ru_threshold:
hot_partitions.append({
'partition_key': partition_key,
'total_ru_consumed': total_ru,
'operation_count': len(operations),
'avg_ru_per_operation': total_ru / len(operations) if operations else 0,
'time_window_minutes': self.monitoring_window.total_seconds() / 60
})
return sorted(hot_partitions, key=lambda x: x['total_ru_consumed'], reverse=True)
def suggest_mitigation_strategies(self, hot_partitions: list) -> Dict[str, Any]:
"""Suggest strategies to mitigate hot partitions"""
strategies = []
for hot_partition in hot_partitions:
partition_strategies = []
# Strategy 1: Partition key redesign
partition_strategies.append({
'type': 'partition_key_redesign',
'description': 'Consider using a more granular or hash-based partition key',
'implementation': 'Add a suffix or prefix to distribute load'
})
# Strategy 2: Caching
partition_strategies.append({
'type': 'caching',
'description': 'Implement caching for frequently accessed data',
'implementation': 'Use Azure Cache for Redis or in-memory caching'
})
# Strategy 3: Read replicas
partition_strategies.append({
'type': 'read_replicas',
'description': 'Use read regions for read-heavy workloads',
'implementation': 'Configure multi-region reads'
})
# Strategy 4: Data archiving
partition_strategies.append({
'type': 'data_archiving',
'description': 'Archive old data to reduce partition size',
'implementation': 'Move historical data to cheaper storage'
})
strategies.append({
'partition_key': hot_partition['partition_key'],
'severity': 'high' if hot_partition['total_ru_consumed'] > 5000 else 'medium',
'strategies': partition_strategies
})
return {
'hot_partitions_detected': len(hot_partitions),
'mitigation_strategies': strategies,
'general_recommendations': [
'Monitor partition metrics regularly',
'Test partition key design with realistic data',
'Consider using synthetic partition keys for even distribution',
'Implement circuit breaker patterns for hot partitions'
]
}Python9. Performance Optimization
Request Units (RU) Optimization
graph TD
A[RU Optimization] --> B[Query Optimization]
A --> C[Indexing Strategy]
A --> D[Document Design]
A --> E[Operation Patterns]
B --> F[SELECT Clause]
B --> G[WHERE Filters]
B --> H[ORDER BY Usage]
B --> I[JOINs and Subqueries]
C --> J[Included Paths]
C --> K[Excluded Paths]
C --> L[Composite Indexes]
C --> M[Spatial Indexes]
D --> N[Document Size]
D --> O[Property Structure]
D --> P[Array Optimization]
E --> Q[Batch Operations]
E --> R[Bulk Imports]
E --> S[Connection Pooling]# performance_optimization.py
import time
import statistics
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
@dataclass
class PerformanceMetrics:
operation_type: str
duration_ms: float
ru_consumed: float
item_count: int
success: bool
error_message: str = None
class PerformanceOptimizer:
def __init__(self, container_client):
self.container = container_client
self.metrics_history = []
def benchmark_operation(self, operation_func, *args, **kwargs) -> PerformanceMetrics:
"""Benchmark a Cosmos DB operation"""
start_time = time.time()
ru_consumed = 0
success = False
error_message = None
item_count = 0
try:
# Execute the operation
if hasattr(operation_func, '__self__'):
# Method call
result = operation_func(*args, **kwargs)
else:
# Function call
result = operation_func(self.container, *args, **kwargs)
success = True
# Try to extract RU consumption and item count
if hasattr(result, 'request_charge'):
ru_consumed = result.request_charge
elif isinstance(result, list):
item_count = len(result)
elif isinstance(result, dict) and '_request_charge' in result:
ru_consumed = result['_request_charge']
except Exception as e:
error_message = str(e)
duration_ms = (time.time() - start_time) * 1000
metrics = PerformanceMetrics(
operation_type=operation_func.__name__,
duration_ms=duration_ms,
ru_consumed=ru_consumed,
item_count=item_count,
success=success,
error_message=error_message
)
self.metrics_history.append(metrics)
return metrics
def optimize_query_performance(self, query: str, parameters: list = None) -> Dict[str, Any]:
"""Analyze and optimize query performance"""
# Test different query variations
optimizations = []
# Original query
original_metrics = self._test_query_performance(query, parameters)
optimizations.append({
'type': 'original',
'query': query,
'metrics': original_metrics
})
# Test with TOP clause if not present
if 'TOP' not in query.upper():
top_query = query.replace('SELECT', 'SELECT TOP 1000', 1)
top_metrics = self._test_query_performance(top_query, parameters)
optimizations.append({
'type': 'with_top_limit',
'query': top_query,
'metrics': top_metrics
})
# Test with specific field selection instead of *
if 'SELECT *' in query:
optimized_query = query.replace('SELECT *', 'SELECT c.id, c.name, c.category')
field_metrics = self._test_query_performance(optimized_query, parameters)
optimizations.append({
'type': 'specific_fields',
'query': optimized_query,
'metrics': field_metrics
})
# Sort by RU consumption
optimizations.sort(key=lambda x: x['metrics']['avg_ru_consumed'])
return {
'optimizations': optimizations,
'best_option': optimizations[0],
'improvement_percentage': self._calculate_improvement(
optimizations[0]['metrics'],
original_metrics
)
}
def _test_query_performance(self, query: str, parameters: list = None, runs: int = 3) -> Dict[str, float]:
"""Test query performance multiple times"""
metrics = []
for _ in range(runs):
try:
start_time = time.time()
query_iterable = self.container.query_items(
query=query,
parameters=parameters or [],
enable_cross_partition_query=True
)
items = list(query_iterable)
duration = (time.time() - start_time) * 1000
metrics.append({
'duration_ms': duration,
'ru_consumed': query_iterable.request_charge,
'item_count': len(items)
})
except Exception as e:
logger.error(f"Query performance test failed: {e}")
if not metrics:
return {'avg_duration_ms': 0, 'avg_ru_consumed': 0, 'avg_item_count': 0}
return {
'avg_duration_ms': statistics.mean([m['duration_ms'] for m in metrics]),
'avg_ru_consumed': statistics.mean([m['ru_consumed'] for m in metrics]),
'avg_item_count': statistics.mean([m['item_count'] for m in metrics]),
'min_duration_ms': min([m['duration_ms'] for m in metrics]),
'max_duration_ms': max([m['duration_ms'] for m in metrics])
}
def optimize_bulk_operations(self, items: list, operation_type: str = 'create') -> Dict[str, Any]:
"""Optimize bulk operations performance"""
strategies = []
# Strategy 1: Sequential processing
sequential_metrics = self._test_bulk_sequential(items, operation_type)
strategies.append({
'type': 'sequential',
'metrics': sequential_metrics
})
# Strategy 2: Parallel processing
parallel_metrics = self._test_bulk_parallel(items, operation_type, max_workers=5)
strategies.append({
'type': 'parallel_5_workers',
'metrics': parallel_metrics
})
# Strategy 3: Batch processing
batch_metrics = self._test_bulk_batched(items, operation_type, batch_size=25)
strategies.append({
'type': 'batched_25_items',
'metrics': batch_metrics
})
# Find best strategy
best_strategy = min(strategies, key=lambda x: x['metrics']['total_duration_ms'])
return {
'strategies_tested': strategies,
'recommended_strategy': best_strategy,
'performance_comparison': self._compare_strategies(strategies)
}
def _test_bulk_sequential(self, items: list, operation_type: str) -> Dict[str, float]:
"""Test sequential bulk operations"""
start_time = time.time()
successful_operations = 0
total_ru = 0
doc_ops = DocumentOperations(self.container)
for item in items:
try:
if operation_type == 'create':
result = doc_ops.create_item(item)
# Extract RU if available
# Note: Individual operation RU tracking would need modification to DocumentOperations
successful_operations += 1
except Exception as e:
logger.error(f"Sequential operation failed: {e}")
total_duration = (time.time() - start_time) * 1000
return {
'total_duration_ms': total_duration,
'successful_operations': successful_operations,
'operations_per_second': successful_operations / (total_duration / 1000) if total_duration > 0 else 0,
'total_ru_consumed': total_ru
}
def _test_bulk_parallel(self, items: list, operation_type: str, max_workers: int) -> Dict[str, float]:
"""Test parallel bulk operations"""
start_time = time.time()
successful_operations = 0
def process_item(item):
try:
doc_ops = DocumentOperations(self.container)
if operation_type == 'create':
return doc_ops.create_item(item)
return None
except Exception as e:
logger.error(f"Parallel operation failed: {e}")
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_item, items))
successful_operations = sum(1 for r in results if r is not None)
total_duration = (time.time() - start_time) * 1000
return {
'total_duration_ms': total_duration,
'successful_operations': successful_operations,
'operations_per_second': successful_operations / (total_duration / 1000) if total_duration > 0 else 0,
'total_ru_consumed': 0 # Would need to aggregate from individual operations
}PythonIndexing Optimization
# indexing_optimization.py
class IndexingOptimizer:
def __init__(self, container_client):
self.container = container_client
def analyze_query_index_usage(self, queries: List[str]) -> Dict[str, Any]:
"""Analyze which indexes would benefit the given queries"""
index_recommendations = []
for query in queries:
# Parse query to identify used fields
query_analysis = self._analyze_query_fields(query)
# Recommend indexes
recommendations = self._recommend_indexes_for_query(query_analysis)
index_recommendations.append({
'query': query,
'analysis': query_analysis,
'recommended_indexes': recommendations
})
# Consolidate recommendations
consolidated = self._consolidate_index_recommendations(index_recommendations)
return {
'individual_recommendations': index_recommendations,
'consolidated_recommendations': consolidated,
'suggested_indexing_policy': self._generate_indexing_policy(consolidated)
}
def _analyze_query_fields(self, query: str) -> Dict[str, Any]:
"""Analyze fields used in a query"""
import re
# Simple regex patterns to identify field usage
where_pattern = r'WHERE\s+.*?(?:ORDER\s+BY|GROUP\s+BY|$)'
order_pattern = r'ORDER\s+BY\s+(.*?)(?:GROUP\s+BY|$)'
where_match = re.search(where_pattern, query, re.IGNORECASE | re.DOTALL)
order_match = re.search(order_pattern, query, re.IGNORECASE)
filter_fields = []
sort_fields = []
if where_match:
# Extract field names from WHERE clause
where_clause = where_match.group(0)
field_pattern = r'c\.(\w+)'
filter_fields = re.findall(field_pattern, where_clause)
if order_match:
# Extract field names from ORDER BY clause
order_clause = order_match.group(1)
field_pattern = r'c\.(\w+)'
sort_fields = re.findall(field_pattern, order_clause)
return {
'filter_fields': list(set(filter_fields)),
'sort_fields': list(set(sort_fields)),
'requires_composite_index': len(sort_fields) > 1 or (filter_fields and sort_fields)
}
def _recommend_indexes_for_query(self, query_analysis: Dict) -> List[Dict]:
"""Recommend specific indexes for a query"""
recommendations = []
# Single field indexes for filters
for field in query_analysis['filter_fields']:
recommendations.append({
'type': 'single_field',
'path': f'/{field}',
'purpose': 'Optimize WHERE clause filtering'
})
# Composite indexes for sorting with filtering
if query_analysis['requires_composite_index']:
all_fields = query_analysis['filter_fields'] + query_analysis['sort_fields']
if len(all_fields) > 1:
recommendations.append({
'type': 'composite',
'paths': [{'path': f'/{field}', 'order': 'ascending'} for field in all_fields],
'purpose': 'Optimize combined filtering and sorting'
})
return recommendations
def create_optimized_indexing_policy(self, field_usage_stats: Dict[str, int]) -> Dict:
"""Create an optimized indexing policy based on field usage statistics"""
# Sort fields by usage frequency
frequently_used = [field for field, count in field_usage_stats.items() if count > 10]
occasionally_used = [field for field, count in field_usage_stats.items() if 5 <= count <= 10]
rarely_used = [field for field, count in field_usage_stats.items() if count < 5]
indexing_policy = {
"indexingMode": "consistent",
"automatic": True,
"includedPaths": [],
"excludedPaths": [
{"path": "/\"_etag\"/?"} # Always exclude _etag
],
"compositeIndexes": []
}
# Include frequently used fields
for field in frequently_used:
indexing_policy["includedPaths"].append({"path": f"/{field}/*"})
# Exclude rarely used fields to reduce index size
for field in rarely_used:
indexing_policy["excludedPaths"].append({"path": f"/{field}/*"})
# Add composite indexes for common query patterns
if len(frequently_used) >= 2:
composite_index = []
for field in frequently_used[:3]: # Limit to 3 fields for performance
composite_index.append({
"path": f"/{field}",
"order": "ascending"
})
indexing_policy["compositeIndexes"].append(composite_index)
return indexing_policy
def _consolidate_index_recommendations(self, recommendations: list) -> Dict[str, Any]:
"""Consolidate index recommendations from multiple queries"""
all_single_fields = set()
all_composite_combinations = []
for rec in recommendations:
for index in rec['recommended_indexes']:
if index['type'] == 'single_field':
all_single_fields.add(index['path'])
elif index['type'] == 'composite':
all_composite_combinations.append(index['paths'])
return {
'single_field_indexes': list(all_single_fields),
'composite_indexes': all_composite_combinations,
'total_indexes_recommended': len(all_single_fields) + len(all_composite_combinations)
}Python10. Consistency Levels
Understanding Consistency Models
graph TD
A[Consistency Levels] --> B[Strong]
A --> C[Bounded Staleness]
A --> D[Session]
A --> E[Consistent Prefix]
A --> F[Eventual]
B --> G[Linearizability]
B --> H[Highest Latency]
B --> I[Highest RU Cost]
C --> J[Time-bounded]
C --> K[Operation-bounded]
C --> L[Medium Latency]
D --> M[Read Your Writes]
D --> N[Monotonic Reads]
D --> O[Default Choice]
E --> P[Prefix Consistency]
E --> Q[Lower Latency]
E --> R[Lower Cost]
F --> S[Lowest Latency]
F --> T[Lowest Cost]
F --> U[No Guarantees]Consistency Level Implementation
# consistency_levels.py
from azure.cosmos import ConsistencyLevel
from typing import Dict, Any, Optional
class ConsistencyManager:
def __init__(self, cosmos_client):
self.client = cosmos_client
def set_session_consistency(self):
"""Configure session consistency (default and recommended for most scenarios)"""
# Session consistency is the default and doesn't require special configuration
# It provides read-your-writes and monotonic read guarantees
pass
def demonstrate_consistency_levels(self, container):
"""Demonstrate different consistency levels with examples"""
# Example item to work with
test_item = {
"id": "consistency-test-001",
"name": "Test Item",
"value": 100,
"category": "test"
}
# Create the item
created_item = container.create_item(body=test_item)
consistency_examples = {
ConsistencyLevel.Strong: self._test_strong_consistency,
ConsistencyLevel.BoundedStaleness: self._test_bounded_staleness,
ConsistencyLevel.Session: self._test_session_consistency,
ConsistencyLevel.ConsistentPrefix: self._test_consistent_prefix,
ConsistencyLevel.Eventual: self._test_eventual_consistency
}
results = {}
for level, test_func in consistency_examples.items():
try:
result = test_func(container, created_item)
results[level.name] = result
except Exception as e:
results[level.name] = {"error": str(e)}
return results
def _test_strong_consistency(self, container, item) -> Dict[str, Any]:
"""Test strong consistency behavior"""
# Strong consistency guarantees linearizability
# Every read receives the most recent write
import time
start_time = time.time()
# Update the item
item['value'] = 200
updated_item = container.replace_item(
item=item['id'],
body=item
)
# Immediately read the item - should always return updated value
read_item = container.read_item(
item=item['id'],
partition_key=item['category']
)
end_time = time.time()
return {
"consistency_level": "Strong",
"updated_value": updated_item['value'],
"read_value": read_item['value'],
"consistent": updated_item['value'] == read_item['value'],
"latency_ms": (end_time - start_time) * 1000,
"guarantees": [
"Linearizability",
"Read-your-writes",
"Monotonic reads",
"Monotonic writes",
"Consistent prefix"
]
}
def _test_session_consistency(self, container, item) -> Dict[str, Any]:
"""Test session consistency behavior"""
# Session consistency provides read-your-writes guarantee
# within the same session (connection)
import time
start_time = time.time()
# Update the item
item['value'] = 300
updated_item = container.replace_item(
item=item['id'],
body=item
)
# Read from the same session - should see the update
read_item = container.read_item(
item=item['id'],
partition_key=item['category']
)
end_time = time.time()
return {
"consistency_level": "Session",
"updated_value": updated_item['value'],
"read_value": read_item['value'],
"consistent": updated_item['value'] == read_item['value'],
"latency_ms": (end_time - start_time) * 1000,
"guarantees": [
"Read-your-writes (same session)",
"Monotonic reads",
"Consistent prefix"
]
}
def _test_eventual_consistency(self, container, item) -> Dict[str, Any]:
"""Test eventual consistency behavior"""
# Eventual consistency provides no ordering guarantees
# but offers the lowest latency and cost
import time
start_time = time.time()
# Update the item
item['value'] = 400
updated_item = container.replace_item(
item=item['id'],
body=item
)
# Read might not immediately reflect the update
read_item = container.read_item(
item=item['id'],
partition_key=item['category']
)
end_time = time.time()
return {
"consistency_level": "Eventual",
"updated_value": updated_item['value'],
"read_value": read_item['value'],
"consistent": updated_item['value'] == read_item['value'],
"latency_ms": (end_time - start_time) * 1000,
"guarantees": [
"No ordering guarantees",
"Lowest latency",
"Lowest cost"
]
}
class ConsistencyLevelSelector:
"""Helper class to choose appropriate consistency level"""
@staticmethod
def recommend_consistency_level(requirements: Dict[str, Any]) -> Dict[str, Any]:
"""Recommend consistency level based on application requirements"""
# Analyze requirements
needs_strong_consistency = requirements.get('financial_transactions', False)
needs_read_your_writes = requirements.get('user_generated_content', False)
latency_sensitive = requirements.get('real_time_applications', False)
cost_sensitive = requirements.get('cost_optimization', False)
global_distribution = requirements.get('multi_region', False)
recommendations = []
if needs_strong_consistency:
recommendations.append({
'level': 'Strong',
'reason': 'Financial transactions require linearizability',
'trade_offs': 'Higher latency and cost, lower availability'
})
elif needs_read_your_writes and not latency_sensitive:
recommendations.append({
'level': 'Session',
'reason': 'User content needs read-your-writes guarantee',
'trade_offs': 'Balanced latency, cost, and consistency'
})
elif latency_sensitive and cost_sensitive:
recommendations.append({
'level': 'Eventual',
'reason': 'Optimized for latency and cost',
'trade_offs': 'No consistency guarantees'
})
elif global_distribution:
recommendations.append({
'level': 'Bounded Staleness',
'reason': 'Good for global applications with some staleness tolerance',
'trade_offs': 'Configurable staleness bounds'
})
else:
recommendations.append({
'level': 'Session',
'reason': 'Default recommendation for most applications',
'trade_offs': 'Good balance of consistency, performance, and cost'
})
return {
'recommendations': recommendations,
'factors_considered': requirements,
'default_choice': 'Session'
}Python11. Change Feed Processing
Understanding Change Feed
graph LR
A[Cosmos DB Container] --> B[Change Feed]
B --> C[Change Feed Processor]
C --> D[Application Logic]
B --> E[Insert Events]
B --> F[Update Events]
B --> G[Replace Events]
C --> H[Lease Container]
C --> I[Checkpoint Management]
C --> J[Load Balancing]
D --> K[Real-time Analytics]
D --> L[Event Sourcing]
D --> M[Data Synchronization]
D --> N[Audit Logging]Change Feed Implementation
# change_feed_processor.py
import asyncio
from azure.cosmos.aio import CosmosClient
from azure.cosmos import PartitionKey
from typing import Dict, Any, List, Callable
import logging
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class ChangeFeedProcessor:
def __init__(self, config: CosmosConfig):
self.config = config
self.client = None
self.monitored_container = None
self.lease_container = None
self.change_handlers = []
async def initialize(self):
"""Initialize the change feed processor"""
self.client = CosmosClient(
url=self.config.endpoint,
credential=self.config.key
)
# Get the monitored container
database = self.client.get_database_client(self.config.database_name)
self.monitored_container = database.get_container_client(self.config.container_name)
# Create or get lease container
self.lease_container = await self._setup_lease_container(database)
logger.info("Change feed processor initialized")
async def _setup_lease_container(self, database):
"""Setup lease container for change feed processor"""
lease_container_name = f"{self.config.container_name}-leases"
try:
lease_container = await database.create_container_if_not_exists(
id=lease_container_name,
partition_key=PartitionKey(path="/id"),
offer_throughput=400
)
logger.info(f"Lease container '{lease_container_name}' ready")
return lease_container
except Exception as e:
logger.error(f"Failed to create lease container: {e}")
raise
def add_change_handler(self, handler: Callable[[List[Dict[str, Any]]], None]):
"""Add a handler function for processing changes"""
self.change_handlers.append(handler)
async def start_processing(self, processor_name: str = "default-processor"):
"""Start processing the change feed"""
if not self.client:
await self.initialize()
logger.info(f"Starting change feed processor: {processor_name}")
try:
# Use the change feed processor
async with self.client:
change_feed_iterator = self.monitored_container.query_items_change_feed(
is_start_from_beginning=True,
partition_key_range_id="0"
)
async for changes in change_feed_iterator:
if changes:
await self._process_changes(changes)
except Exception as e:
logger.error(f"Error in change feed processing: {e}")
raise
async def _process_changes(self, changes: List[Dict[str, Any]]):
"""Process a batch of changes"""
try:
logger.info(f"Processing {len(changes)} changes")
# Call all registered handlers
for handler in self.change_handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(changes)
else:
handler(changes)
except Exception as e:
logger.error(f"Error in change handler: {e}")
except Exception as e:
logger.error(f"Error processing changes: {e}")
raise
# Example change handlers
class ChangeHandlers:
@staticmethod
def audit_logger(changes: List[Dict[str, Any]]):
"""Log all changes for audit purposes"""
for change in changes:
audit_entry = {
'timestamp': datetime.utcnow().isoformat(),
'operation': 'change_detected',
'document_id': change.get('id'),
'partition_key': change.get('partitionKey'),
'change_type': 'update' if '_ts' in change else 'insert'
}
logger.info(f"AUDIT: {json.dumps(audit_entry)}")
@staticmethod
async def real_time_analytics(changes: List[Dict[str, Any]]):
"""Process changes for real-time analytics"""
for change in changes:
# Example: Update analytics counters
if change.get('category') == 'product':
await ChangeHandlers._update_product_analytics(change)
elif change.get('category') == 'order':
await ChangeHandlers._update_order_analytics(change)
@staticmethod
async def _update_product_analytics(product_change: Dict[str, Any]):
"""Update product-related analytics"""
# Example analytics update
logger.info(f"Updating product analytics for: {product_change.get('id')}")
@staticmethod
async def _update_order_analytics(order_change: Dict[str, Any]):
"""Update order-related analytics"""
# Example analytics update
logger.info(f"Updating order analytics for: {order_change.get('id')}")
@staticmethod
def data_synchronization(changes: List[Dict[str, Any]]):
"""Synchronize data to external systems"""
for change in changes:
# Example: Sync to search index, cache, or other databases
sync_data = {
'id': change.get('id'),
'operation': 'upsert',
'data': change,
'timestamp': datetime.utcnow().isoformat()
}
logger.info(f"Syncing data: {sync_data['id']}")
@staticmethod
def event_sourcing(changes: List[Dict[str, Any]]):
"""Process changes for event sourcing pattern"""
for change in changes:
event = {
'event_id': f"event_{change.get('id')}_{change.get('_ts')}",
'aggregate_id': change.get('id'),
'event_type': 'DocumentChanged',
'event_data': change,
'timestamp': datetime.utcnow().isoformat(),
'version': change.get('_ts')
}
logger.info(f"Event sourcing: {event['event_id']}")
# Change feed monitoring and management
class ChangeFeedMonitor:
def __init__(self, change_feed_processor: ChangeFeedProcessor):
self.processor = change_feed_processor
self.is_running = False
async def start_monitoring(self):
"""Start monitoring the change feed"""
self.is_running = True
# Add built-in handlers
self.processor.add_change_handler(ChangeHandlers.audit_logger)
self.processor.add_change_handler(ChangeHandlers.real_time_analytics)
try:
await self.processor.start_processing("production-processor")
except Exception as e:
logger.error(f"Change feed monitoring failed: {e}")
self.is_running = False
raise
async def stop_monitoring(self):
"""Stop monitoring the change feed"""
self.is_running = False
logger.info("Change feed monitoring stopped")
def get_monitoring_status(self) -> Dict[str, Any]:
"""Get the current monitoring status"""
return {
'is_running': self.is_running,
'handlers_count': len(self.processor.change_handlers),
'processor_status': 'active' if self.is_running else 'stopped'
}
# Usage example
async def example_change_feed_usage():
"""Example of how to use the change feed processor"""
config = CosmosConfig()
# Initialize change feed processor
change_processor = ChangeFeedProcessor(config)
monitor = ChangeFeedMonitor(change_processor)
# Add custom handler
def custom_handler(changes):
print(f"Custom handler received {len(changes)} changes")
change_processor.add_change_handler(custom_handler)
# Start monitoring
try:
await monitor.start_monitoring()
except KeyboardInterrupt:
await monitor.stop_monitoring()
print("Change feed processing stopped")
# Run the example
if __name__ == "__main__":
asyncio.run(example_change_feed_usage())Python12. Stored Procedures and Triggers
Understanding Server-Side Programming
graph TD
A[Server-Side Programming] --> B[Stored Procedures]
A --> C[User Defined Functions]
A --> D[Triggers]
B --> E[Transactional Operations]
B --> F[Complex Business Logic]
B --> G[Batch Processing]
C --> H[Query Helper Functions]
C --> I[Data Transformation]
C --> J[Reusable Logic]
D --> K[Pre-triggers]
D --> L[Post-triggers]
D --> M[Data Validation]
D --> N[Audit Logging]Stored Procedures Implementation
# stored_procedures.py
from azure.cosmos import scripts
import json
from typing import Dict, Any, List
class StoredProcedureManager:
def __init__(self, container_client):
self.container = container_client
def create_bulk_import_procedure(self):
"""Create a stored procedure for efficient bulk imports"""
# JavaScript code for the stored procedure
bulk_import_sproc = """
function bulkImport(docs) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
var count = 0;
if (!docs) throw new Error("The array is undefined or null.");
var docsLength = docs.length;
if (docsLength == 0) {
response.setBody(0);
return;
}
// Try to create each document
tryCreate(docs[count], callback);
function tryCreate(doc, callback) {
var isAccepted = collection.createDocument(collectionLink, doc, callback);
if (!isAccepted) {
response.setBody(count);
}
}
function callback(err, doc, options) {
if (err) throw err;
count++;
if (count >= docsLength) {
response.setBody(count);
} else {
tryCreate(docs[count], callback);
}
}
}
"""
try:
sproc_definition = {
'id': 'bulkImport',
'body': bulk_import_sproc
}
# Create the stored procedure
created_sproc = self.container.scripts.create_stored_procedure(
body=sproc_definition
)
logger.info("Bulk import stored procedure created successfully")
return created_sproc
except Exception as e:
logger.error(f"Failed to create stored procedure: {e}")
raise
def create_conditional_update_procedure(self):
"""Create a stored procedure for conditional updates"""
conditional_update_sproc = """
function conditionalUpdate(id, partitionKey, updates, condition) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();
var response = getContext().getResponse();
// Read the document
var isAccepted = collection.readDocument(
`${collectionLink}/docs/${id}`,
{ partitionKey: partitionKey },
function(err, doc) {
if (err) {
if (err.number == 404) {
response.setBody({ error: "Document not found" });
return;
}
throw err;
}
// Check condition
if (evaluateCondition(doc, condition)) {
// Apply updates
for (var key in updates) {
doc[key] = updates[key];
}
// Update timestamp
doc._lastModified = new Date().toISOString();
// Replace the document
var replaceAccepted = collection.replaceDocument(
doc._self,
doc,
function(replaceErr, updatedDoc) {
if (replaceErr) throw replaceErr;
response.setBody({
success: true,
document: updatedDoc
});
}
);
if (!replaceAccepted) {
response.setBody({ error: "Replace not accepted" });
}
} else {
response.setBody({
success: false,
reason: "Condition not met"
});
}
}
);
if (!isAccepted) {
response.setBody({ error: "Read not accepted" });
}
function evaluateCondition(doc, condition) {
// Simple condition evaluation
if (condition.field && condition.operator && condition.value !== undefined) {
var fieldValue = doc[condition.field];
switch (condition.operator) {
case "==":
return fieldValue == condition.value;
case "!=":
return fieldValue != condition.value;
case ">":
return fieldValue > condition.value;
case "<":
return fieldValue < condition.value;
case ">=":
return fieldValue >= condition.value;
case "<=":
return fieldValue <= condition.value;
default:
return true;
}
}
return true;
}
}
"""
try:
sproc_definition = {
'id': 'conditionalUpdate',
'body': conditional_update_sproc
}
created_sproc = self.container.scripts.create_stored_procedure(
body=sproc_definition
)
logger.info("Conditional update stored procedure created successfully")
return created_sproc
except Exception as e:
logger.error(f"Failed to create conditional update procedure: {e}")
raise
def execute_bulk_import(self, documents: List[Dict[str, Any]], partition_key: str):
"""Execute the bulk import stored procedure"""
try:
result = self.container.scripts.execute_stored_procedure(
sproc='bulkImport',
params=[documents],
partition_key=partition_key
)
logger.info(f"Bulk import completed. Documents imported: {result}")
return result
except Exception as e:
logger.error(f"Bulk import failed: {e}")
raise
def execute_conditional_update(self, item_id: str, partition_key: str,
updates: Dict[str, Any], condition: Dict[str, Any]):
"""Execute conditional update stored procedure"""
try:
result = self.container.scripts.execute_stored_procedure(
sproc='conditionalUpdate',
params=[item_id, partition_key, updates, condition],
partition_key=partition_key
)
logger.info(f"Conditional update result: {result}")
return result
except Exception as e:
logger.error(f"Conditional update failed: {e}")
raise
class TriggerManager:
def __init__(self, container_client):
self.container = container_client
def create_audit_trigger(self):
"""Create a pre-trigger for auditing"""
audit_trigger = """
function auditTrigger() {
var context = getContext();
var request = context.getRequest();
var item = request.getBody();
// Add audit fields
item._createdBy = "system";
item._createdAt = new Date().toISOString();
item._version = 1;
// Set the updated body
request.setBody(item);
}
"""
try:
trigger_definition = {
'id': 'auditTrigger',
'body': audit_trigger,
'triggerType': scripts.TriggerType.Pre,
'triggerOperation': scripts.TriggerOperation.Create
}
created_trigger = self.container.scripts.create_trigger(
body=trigger_definition
)
logger.info("Audit trigger created successfully")
return created_trigger
except Exception as e:
logger.error(f"Failed to create audit trigger: {e}")
raise
def create_validation_trigger(self):
"""Create a pre-trigger for data validation"""
validation_trigger = """
function validationTrigger() {
var context = getContext();
var request = context.getRequest();
var item = request.getBody();
// Validation rules
if (!item.id) {
throw new Error("Item must have an id");
}
if (!item.category) {
throw new Error("Item must have a category");
}
if (item.price && item.price < 0) {
throw new Error("Price cannot be negative");
}
// Add validation timestamp
item._validatedAt = new Date().toISOString();
request.setBody(item);
}
"""
try:
trigger_definition = {
'id': 'validationTrigger',
'body': validation_trigger,
'triggerType': scripts.TriggerType.Pre,
'triggerOperation': scripts.TriggerOperation.All
}
created_trigger = self.container.scripts.create_trigger(
body=trigger_definition
)
logger.info("Validation trigger created successfully")
return created_trigger
except Exception as e:
logger.error(f"Failed to create validation trigger: {e}")
raise
class UserDefinedFunctionManager:
def __init__(self, container_client):
self.container = container_client
def create_tax_calculation_udf(self):
"""Create a UDF for tax calculations"""
tax_calculation_udf = """
function calculateTax(price, taxRate, region) {
if (!price || price <= 0) return 0;
if (!taxRate || taxRate <= 0) return 0;
var baseTax = price * taxRate;
// Regional adjustments
switch (region) {
case "EU":
return baseTax * 1.2; // VAT adjustment
case "US":
return baseTax; // Standard rate
case "CA":
return baseTax * 1.1; // GST adjustment
default:
return baseTax;
}
}
"""
try:
udf_definition = {
'id': 'calculateTax',
'body': tax_calculation_udf
}
created_udf = self.container.scripts.create_user_defined_function(
body=udf_definition
)
logger.info("Tax calculation UDF created successfully")
return created_udf
except Exception as e:
logger.error(f"Failed to create UDF: {e}")
raise
def create_distance_calculation_udf(self):
"""Create a UDF for distance calculations"""
distance_udf = """
function calculateDistance(lat1, lon1, lat2, lon2) {
var R = 6371; // Earth's radius in kilometers
var dLat = (lat2 - lat1) * Math.PI / 180;
var dLon = (lon2 - lon1) * Math.PI / 180;
var a = Math.sin(dLat/2) * Math.sin(dLat/2) +
Math.cos(lat1 * Math.PI / 180) * Math.cos(lat2 * Math.PI / 180) *
Math.sin(dLon/2) * Math.sin(dLon/2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
var distance = R * c;
return distance;
}
"""
try:
udf_definition = {
'id': 'calculateDistance',
'body': distance_udf
}
created_udf = self.container.scripts.create_user_defined_function(
body=udf_definition
)
logger.info("Distance calculation UDF created successfully")
return created_udf
except Exception as e:
logger.error(f"Failed to create distance UDF: {e}")
raise
# Usage example
def example_server_side_programming():
"""Example of using stored procedures, triggers, and UDFs"""
# Assume we have a container client
container = get_container_client() # Your container client
# Initialize managers
sproc_manager = StoredProcedureManager(container)
trigger_manager = TriggerManager(container)
udf_manager = UserDefinedFunctionManager(container)
try:
# Create server-side objects
sproc_manager.create_bulk_import_procedure()
sproc_manager.create_conditional_update_procedure()
trigger_manager.create_audit_trigger()
trigger_manager.create_validation_trigger()
udf_manager.create_tax_calculation_udf()
# Example usage of bulk import
sample_docs = [
{"id": "1", "name": "Product 1", "price": 10.99, "category": "electronics"},
{"id": "2", "name": "Product 2", "price": 25.50, "category": "electronics"},
{"id": "3", "name": "Product 3", "price": 15.75, "category": "electronics"}
]
result = sproc_manager.execute_bulk_import(sample_docs, "electronics")
print(f"Imported {result} documents")
# Example usage of conditional update
condition = {"field": "price", "operator": "<", "value": 20.0}
updates = {"discounted": True, "discount_rate": 0.1}
update_result = sproc_manager.execute_conditional_update(
"1", "electronics", updates, condition
)
print(f"Update result: {update_result}")
# Example query using UDF
query_with_udf = """
SELECT c.id, c.name, c.price,
udf.calculateTax(c.price, 0.08, "US") as tax_amount
FROM c
WHERE c.category = "electronics"
"""
items = list(container.query_items(
query=query_with_udf,
enable_cross_partition_query=True
))
for item in items:
print(f"Product: {item['name']}, Tax: ${item['tax_amount']:.2f}")
except Exception as e:
logger.error(f"Server-side programming example failed: {e}")Python13. Security and Authentication
Authentication Methods
graph TD
A[Authentication Methods] --> B[Primary/Secondary Keys]
A --> C[Azure Active Directory]
A --> D[Resource Tokens]
A --> E[Managed Identity]
B --> F[Full Access]
B --> G[Account Level]
B --> H[Rotation Required]
C --> I[Role-Based Access]
C --> J[User Authentication]
C --> K[Service Principal]
D --> L[Fine-Grained Access]
D --> M[Time-Limited]
D --> N[Resource-Specific]
E --> O[Azure Resources]
E --> P[No Key Management]
E --> Q[Automatic Rotation]Security Implementation
# security_manager.py
from azure.identity import DefaultAzureCredential, ClientSecretCredential
from azure.keyvault.secrets import SecretClient
from azure.cosmos import CosmosClient, exceptions
from typing import Dict, Any, Optional
import hashlib
import hmac
import base64
from datetime import datetime, timedelta
class SecurityManager:
def __init__(self, config: CosmosConfig):
self.config = config
self.key_vault_client = None
def setup_azure_ad_authentication(self, tenant_id: str, client_id: str, client_secret: str):
"""Setup Azure AD authentication"""
try:
credential = ClientSecretCredential(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret
)
# Create Cosmos client with Azure AD
cosmos_client = CosmosClient(
url=self.config.endpoint,
credential=credential
)
logger.info("Azure AD authentication configured successfully")
return cosmos_client
except Exception as e:
logger.error(f"Failed to setup Azure AD authentication: {e}")
raise
def setup_managed_identity_authentication(self):
"""Setup Managed Identity authentication"""
try:
# Use DefaultAzureCredential which automatically handles managed identity
credential = DefaultAzureCredential()
cosmos_client = CosmosClient(
url=self.config.endpoint,
credential=credential
)
logger.info("Managed Identity authentication configured successfully")
return cosmos_client
except Exception as e:
logger.error(f"Failed to setup Managed Identity authentication: {e}")
raise
def setup_key_vault_integration(self, key_vault_url: str):
"""Setup Azure Key Vault for secret management"""
try:
credential = DefaultAzureCredential()
self.key_vault_client = SecretClient(
vault_url=key_vault_url,
credential=credential
)
logger.info("Key Vault integration configured successfully")
except Exception as e:
logger.error(f"Failed to setup Key Vault integration: {e}")
raise
def get_secret_from_key_vault(self, secret_name: str) -> Optional[str]:
"""Retrieve secret from Azure Key Vault"""
try:
if not self.key_vault_client:
raise ValueError("Key Vault client not initialized")
secret = self.key_vault_client.get_secret(secret_name)
logger.info(f"Successfully retrieved secret: {secret_name}")
return secret.value
except Exception as e:
logger.error(f"Failed to retrieve secret {secret_name}: {e}")
return None
def create_resource_token(self, user_id: str, resource_path: str,
permission_mode: str = "read", duration_hours: int = 1):
"""Create a resource token for fine-grained access control"""
try:
# Note: This is a simplified example. In practice, you'd use Cosmos DB's
# permission system to create actual resource tokens
expiry_time = datetime.utcnow() + timedelta(hours=duration_hours)
token_data = {
'user_id': user_id,
'resource_path': resource_path,
'permission_mode': permission_mode,
'expires_at': expiry_time.isoformat()
}
# Create a signed token (simplified version)
token_string = base64.b64encode(str(token_data).encode()).decode()
logger.info(f"Created resource token for user: {user_id}")
return {
'token': token_string,
'expires_at': expiry_time,
'permissions': permission_mode
}
except Exception as e:
logger.error(f"Failed to create resource token: {e}")
raise
class DataEncryption:
"""Handle client-side encryption for sensitive data"""
def __init__(self, encryption_key: str):
self.encryption_key = encryption_key.encode()
def encrypt_field(self, value: str) -> str:
"""Encrypt a field value"""
try:
import cryptography.fernet as fernet
f = fernet.Fernet(base64.urlsafe_b64encode(self.encryption_key[:32]))
encrypted_value = f.encrypt(value.encode())
return base64.b64encode(encrypted_value).decode()
except Exception as e:
logger.error(f"Encryption failed: {e}")
raise
def decrypt_field(self, encrypted_value: str) -> str:
"""Decrypt a field value"""
try:
import cryptography.fernet as fernet
f = fernet.Fernet(base64.urlsafe_b64encode(self.encryption_key[:32]))
encrypted_bytes = base64.b64decode(encrypted_value.encode())
decrypted_value = f.decrypt(encrypted_bytes)
return decrypted_value.decode()
except Exception as e:
logger.error(f"Decryption failed: {e}")
raise
def encrypt_document(self, document: Dict[str, Any],
fields_to_encrypt: List[str]) -> Dict[str, Any]:
"""Encrypt specified fields in a document"""
encrypted_doc = document.copy()
for field in fields_to_encrypt:
if field in encrypted_doc:
encrypted_doc[field] = self.encrypt_field(str(encrypted_doc[field]))
encrypted_doc[f"{field}_encrypted"] = True
return encrypted_doc
def decrypt_document(self, document: Dict[str, Any],
fields_to_decrypt: List[str]) -> Dict[str, Any]:
"""Decrypt specified fields in a document"""
decrypted_doc = document.copy()
for field in fields_to_decrypt:
if field in decrypted_doc and document.get(f"{field}_encrypted"):
decrypted_doc[field] = self.decrypt_field(decrypted_doc[field])
decrypted_doc.pop(f"{field}_encrypted", None)
return decrypted_doc
class AccessControlManager:
"""Manage role-based access control"""
def __init__(self):
self.user_roles = {}
self.role_permissions = {
'admin': ['read', 'write', 'delete', 'manage'],
'editor': ['read', 'write'],
'viewer': ['read'],
'guest': []
}
def assign_role(self, user_id: str, role: str):
"""Assign a role to a user"""
if role not in self.role_permissions:
raise ValueError(f"Invalid role: {role}")
self.user_roles[user_id] = role
logger.info(f"Assigned role '{role}' to user '{user_id}'")
def check_permission(self, user_id: str, required_permission: str) -> bool:
"""Check if user has required permission"""
user_role = self.user_roles.get(user_id)
if not user_role:
return False
role_permissions = self.role_permissions.get(user_role, [])
return required_permission in role_permissions
def get_user_permissions(self, user_id: str) -> List[str]:
"""Get all permissions for a user"""
user_role = self.user_roles.get(user_id)
if not user_role:
return []
return self.role_permissions.get(user_role, [])
class AuditLogger:
"""Audit logging for security compliance"""
def __init__(self, container_client):
self.container = container_client
def log_access_attempt(self, user_id: str, resource: str,
action: str, success: bool, details: Dict[str, Any] = None):
"""Log access attempts for auditing"""
audit_entry = {
'id': f"audit_{datetime.utcnow().isoformat()}_{user_id}",
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'resource': resource,
'action': action,
'success': success,
'details': details or {},
'category': 'audit',
'ip_address': details.get('ip_address') if details else None,
'user_agent': details.get('user_agent') if details else None
}
try:
self.container.create_item(body=audit_entry)
logger.info(f"Audit log created for user {user_id}")
except Exception as e:
logger.error(f"Failed to create audit log: {e}")
def query_audit_logs(self, user_id: str = None, start_date: datetime = None,
end_date: datetime = None) -> List[Dict[str, Any]]:
"""Query audit logs with filters"""
query_parts = ["SELECT * FROM c WHERE c.category = 'audit'"]
parameters = []
if user_id:
query_parts.append("AND c.user_id = @user_id")
parameters.append({"name": "@user_id", "value": user_id})
if start_date:
query_parts.append("AND c.timestamp >= @start_date")
parameters.append({"name": "@start_date", "value": start_date.isoformat()})
if end_date:
query_parts.append("AND c.timestamp <= @end_date")
parameters.append({"name": "@end_date", "value": end_date.isoformat()})
query = " ".join(query_parts) + " ORDER BY c.timestamp DESC"
try:
items = list(self.container.query_items(
query=query,
parameters=parameters,
enable_cross_partition_query=True
))
logger.info(f"Retrieved {len(items)} audit log entries")
return items
except Exception as e:
logger.error(f"Failed to query audit logs: {e}")
return []
# Security best practices implementation
class SecurityBestPractices:
@staticmethod
def validate_input(data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize input data"""
# Remove potentially dangerous fields
dangerous_fields = ['_self', '_etag', '_attachments', '_ts']
sanitized_data = {k: v for k, v in data.items() if k not in dangerous_fields}
# Validate required fields
if 'id' not in sanitized_data or not sanitized_data['id']:
raise ValueError("Document must have a valid id")
# Sanitize string fields
for key, value in sanitized_data.items():
if isinstance(value, str):
# Basic XSS prevention
sanitized_data[key] = value.replace('<', '<').replace('>', '>')
return sanitized_data
@staticmethod
def generate_secure_id() -> str:
"""Generate a secure, random document ID"""
import uuid
return str(uuid.uuid4())
@staticmethod
def hash_sensitive_data(data: str, salt: str = None) -> str:
"""Hash sensitive data like passwords"""
if not salt:
import os
salt = base64.b64encode(os.urandom(32)).decode()
hashed = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000)
return base64.b64encode(hashed).decode()
@staticmethod
def verify_hash(data: str, hashed_data: str, salt: str) -> bool:
"""Verify hashed data"""
return SecurityBestPractices.hash_sensitive_data(data, salt) == hashed_data
# Usage example
def security_implementation_example():
"""Example of implementing security features"""
config = CosmosConfig()
# Setup security manager
security_manager = SecurityManager(config)
# Setup encryption
encryption_key = "your-encryption-key-here" # Should be from Key Vault
encryptor = DataEncryption(encryption_key)
# Setup access control
access_control = AccessControlManager()
access_control.assign_role("user123", "editor")
# Example document with sensitive data
sensitive_doc = {
"id": SecurityBestPractices.generate_secure_id(),
"name": "John Doe",
"email": "john.doe@example.com",
"ssn": "123-45-6789", # Sensitive field
"credit_card": "1234-5678-9012-3456" # Sensitive field
}
# Encrypt sensitive fields
encrypted_doc = encryptor.encrypt_document(
sensitive_doc,
['ssn', 'credit_card']
)
print("Security implementation example completed")
return encrypted_docPython14. Monitoring and Troubleshooting
Azure Monitor Integration
graph TD
A[Azure Cosmos DB] --> B[Metrics & Logs]
B --> C[Azure Monitor]
C --> D[Application Insights]
C --> E[Log Analytics]
C --> F[Alerts & Notifications]
G[Key Metrics] --> H[Request Units]
G --> I[Latency]
G --> J[Availability]
G --> K[Storage Usage]
G --> L[Throttling Rate]
M[Diagnostic Logs] --> N[DataPlaneRequests]
M --> O[QueryRuntimeStatistics]
M --> P[PartitionKeyStatistics]
M --> Q[ControlPlaneRequests]Monitoring Implementation
# monitoring.py
import time
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
import json
@dataclass
class PerformanceMetric:
timestamp: datetime
operation_type: str
request_charge: float
duration_ms: float
status_code: int
partition_key: str
item_count: int = 0
error_message: Optional[str] = None
class CosmosDBMonitor:
def __init__(self, container_client, enable_detailed_logging: bool = True):
self.container = container_client
self.enable_detailed_logging = enable_detailed_logging
self.metrics_buffer = []
self.alert_thresholds = {
'high_latency_ms': 1000,
'high_ru_consumption': 100,
'error_rate_threshold': 0.05
}
# Setup structured logging
self.logger = self._setup_monitoring_logger()
def _setup_monitoring_logger(self):
"""Setup structured logging for monitoring"""
logger = logging.getLogger('cosmos_monitor')
logger.setLevel(logging.INFO)
# Create console handler with structured format
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def track_operation(self, operation_func, operation_type: str, *args, **kwargs):
"""Track performance metrics for any Cosmos DB operation"""
start_time = time.time()
start_timestamp = datetime.utcnow()
try:
# Execute the operation
result = operation_func(*args, **kwargs)
# Calculate metrics
duration_ms = (time.time() - start_time) * 1000
# Extract request charge if available
request_charge = 0
if hasattr(result, 'request_charge'):
request_charge = result.request_charge
elif isinstance(result, dict) and '_request_charge' in result:
request_charge = result['_request_charge']
# Extract partition key from args/kwargs
partition_key = self._extract_partition_key(args, kwargs)
# Create metric record
metric = PerformanceMetric(
timestamp=start_timestamp,
operation_type=operation_type,
request_charge=request_charge,
duration_ms=duration_ms,
status_code=200, # Assume success
partition_key=partition_key,
item_count=len(result) if isinstance(result, list) else 1
)
# Store metric
self._store_metric(metric)
# Check for alerts
self._check_performance_alerts(metric)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
partition_key = self._extract_partition_key(args, kwargs)
# Create error metric
error_metric = PerformanceMetric(
timestamp=start_timestamp,
operation_type=operation_type,
request_charge=0,
duration_ms=duration_ms,
status_code=500, # Error status
partition_key=partition_key,
error_message=str(e)
)
self._store_metric(error_metric)
self.logger.error(f"Operation failed: {operation_type}, Error: {e}")
raise
def _extract_partition_key(self, args, kwargs) -> str:
"""Extract partition key from operation arguments"""
# Look for partition_key in kwargs
if 'partition_key' in kwargs:
return str(kwargs['partition_key'])
# Look for common patterns in args
for arg in args:
if isinstance(arg, dict) and 'category' in arg:
return arg['category']
elif isinstance(arg, dict) and 'partitionKey' in arg:
return arg['partitionKey']
return 'unknown'
def _store_metric(self, metric: PerformanceMetric):
"""Store metric in buffer and optionally send to external system"""
self.metrics_buffer.append(metric)
# Log metric if detailed logging is enabled
if self.enable_detailed_logging:
metric_dict = asdict(metric)
metric_dict['timestamp'] = metric.timestamp.isoformat()
self.logger.info(f"METRIC: {json.dumps(metric_dict)}")
# Keep buffer size manageable
if len(self.metrics_buffer) > 1000:
self.metrics_buffer = self.metrics_buffer[-500:]
def _check_performance_alerts(self, metric: PerformanceMetric):
"""Check metric against alert thresholds"""
alerts = []
# High latency alert
if metric.duration_ms > self.alert_thresholds['high_latency_ms']:
alerts.append({
'type': 'high_latency',
'message': f"High latency detected: {metric.duration_ms:.2f}ms",
'metric': metric
})
# High RU consumption alert
if metric.request_charge > self.alert_thresholds['high_ru_consumption']:
alerts.append({
'type': 'high_ru_consumption',
'message': f"High RU consumption: {metric.request_charge} RUs",
'metric': metric
})
# Process alerts
for alert in alerts:
self._handle_alert(alert)
def _handle_alert(self, alert: Dict[str, Any]):
"""Handle performance alerts"""
self.logger.warning(f"ALERT: {alert['type']} - {alert['message']}")
# Here you could integrate with external alerting systems
# like Azure Monitor, PagerDuty, Slack, etc.
def get_performance_summary(self, hours: int = 24) -> Dict[str, Any]:
"""Get performance summary for the specified time period"""
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
# Filter metrics within time window
recent_metrics = [
m for m in self.metrics_buffer
if m.timestamp >= cutoff_time
]
if not recent_metrics:
return {'message': 'No metrics available for the specified period'}
# Calculate summary statistics
total_operations = len(recent_metrics)
successful_operations = len([m for m in recent_metrics if m.status_code == 200])
failed_operations = total_operations - successful_operations
latencies = [m.duration_ms for m in recent_metrics]
ru_consumptions = [m.request_charge for m in recent_metrics]
return {
'time_period_hours': hours,
'total_operations': total_operations,
'successful_operations': successful_operations,
'failed_operations': failed_operations,
'success_rate': successful_operations / total_operations if total_operations > 0 else 0,
'latency_stats': {
'avg_ms': sum(latencies) / len(latencies) if latencies else 0,
'min_ms': min(latencies) if latencies else 0,
'max_ms': max(latencies) if latencies else 0,
'p95_ms': self._calculate_percentile(latencies, 95) if latencies else 0
},
'ru_stats': {
'total_consumed': sum(ru_consumptions),
'avg_per_operation': sum(ru_consumptions) / len(ru_consumptions) if ru_consumptions else 0,
'max_consumption': max(ru_consumptions) if ru_consumptions else 0
},
'operation_breakdown': self._get_operation_breakdown(recent_metrics)
}
def _calculate_percentile(self, values: List[float], percentile: int) -> float:
"""Calculate percentile value"""
if not values:
return 0
sorted_values = sorted(values)
index = int((percentile / 100) * len(sorted_values))
if index >= len(sorted_values):
index = len(sorted_values) - 1
return sorted_values[index]
def _get_operation_breakdown(self, metrics: List[PerformanceMetric]) -> Dict[str, Any]:
"""Get breakdown of operations by type"""
breakdown = {}
for metric in metrics:
op_type = metric.operation_type
if op_type not in breakdown:
breakdown[op_type] = {
'count': 0,
'total_ru': 0,
'total_duration_ms': 0,
'errors': 0
}
breakdown[op_type]['count'] += 1
breakdown[op_type]['total_ru'] += metric.request_charge
breakdown[op_type]['total_duration_ms'] += metric.duration_ms
if metric.status_code != 200:
breakdown[op_type]['errors'] += 1
# Calculate averages
for op_type in breakdown:
stats = breakdown[op_type]
count = stats['count']
if count > 0:
stats['avg_ru'] = stats['total_ru'] / count
stats['avg_duration_ms'] = stats['total_duration_ms'] / count
stats['error_rate'] = stats['errors'] / count
return breakdown
class TroubleshootingHelper:
def __init__(self, container_client):
self.container = container_client
def diagnose_slow_queries(self, queries: List[str]) -> Dict[str, Any]:
"""Diagnose slow-performing queries"""
diagnostics = []
for query in queries:
diagnosis = self._analyze_single_query(query)
diagnostics.append(diagnosis)
return {
'total_queries_analyzed': len(queries),
'diagnostics': diagnostics,
'recommendations': self._generate_query_recommendations(diagnostics)
}
def _analyze_single_query(self, query: str) -> Dict[str, Any]:
"""Analyze a single query for performance issues"""
try:
start_time = time.time()
# Execute query with metrics
query_iterable = self.container.query_items(
query=query,
enable_cross_partition_query=True,
populate_query_metrics=True
)
# Collect results and metrics
results = list(query_iterable)
execution_time = (time.time() - start_time) * 1000
# Extract query metrics
query_metrics = query_iterable.query_metrics
request_charge = query_iterable.request_charge
return {
'query': query,
'execution_time_ms': execution_time,
'request_charge': request_charge,
'result_count': len(results),
'query_metrics': query_metrics,
'status': 'success',
'performance_issues': self._identify_performance_issues(
execution_time, request_charge, query_metrics, query
)
}
except Exception as e:
return {
'query': query,
'status': 'error',
'error': str(e),
'performance_issues': ['Query execution failed']
}
def _identify_performance_issues(self, execution_time: float,
request_charge: float,
query_metrics: Dict,
query: str) -> List[str]:
"""Identify potential performance issues with a query"""
issues = []
# Check execution time
if execution_time > 1000: # 1 second
issues.append(f"High execution time: {execution_time:.2f}ms")
# Check RU consumption
if request_charge > 100:
issues.append(f"High RU consumption: {request_charge} RUs")
# Check for cross-partition queries
if 'SELECT *' in query.upper():
issues.append("Using SELECT * - consider selecting specific fields")
# Check for missing WHERE clauses
if 'WHERE' not in query.upper():
issues.append("No WHERE clause - full collection scan")
# Check for ORDER BY without proper indexing
if 'ORDER BY' in query.upper() and request_charge > 50:
issues.append("ORDER BY clause may require composite indexing")
return issues
def _generate_query_recommendations(self, diagnostics: List[Dict]) -> List[str]:
"""Generate recommendations based on query analysis"""
recommendations = []
# Analyze common issues
high_ru_queries = [d for d in diagnostics if d.get('request_charge', 0) > 50]
slow_queries = [d for d in diagnostics if d.get('execution_time_ms', 0) > 1000]
if high_ru_queries:
recommendations.append("Consider optimizing indexing for high RU consumption queries")
if slow_queries:
recommendations.append("Review slow queries for proper WHERE clauses and indexing")
# Check for common patterns
select_all_queries = [
d for d in diagnostics
if 'SELECT *' in d.get('query', '').upper()
]
if select_all_queries:
recommendations.append("Replace SELECT * with specific field selection")
return recommendations
def check_partition_health(self) -> Dict[str, Any]:
"""Check partition key distribution and health"""
# This would require specific queries to analyze partition distribution
# For demonstration, we'll show the structure
return {
'analysis_timestamp': datetime.utcnow().isoformat(),
'partition_distribution': 'Analysis would require access to partition statistics',
'recommendations': [
'Monitor partition key distribution regularly',
'Watch for hot partitions in Azure Portal metrics',
'Consider partition key redesign if distribution is uneven'
]
}
class AlertingSystem:
def __init__(self, monitor: CosmosDBMonitor):
self.monitor = monitor
self.alert_rules = []
def add_alert_rule(self, name: str, condition_func, action_func, cooldown_minutes: int = 5):
"""Add a custom alert rule"""
self.alert_rules.append({
'name': name,
'condition': condition_func,
'action': action_func,
'cooldown_minutes': cooldown_minutes,
'last_triggered': None
})
def check_alerts(self):
"""Check all alert rules against current metrics"""
current_time = datetime.utcnow()
recent_metrics = self.monitor.metrics_buffer[-100:] # Check last 100 metrics
for rule in self.alert_rules:
# Check cooldown
if (rule['last_triggered'] and
current_time - rule['last_triggered'] < timedelta(minutes=rule['cooldown_minutes'])):
continue
# Check condition
if rule'condition':
try:
rule['action'](rule['name'], recent_metrics)
rule['last_triggered'] = current_time
except Exception as e:
logging.error(f"Alert action failed for {rule['name']}: {e}")
# Usage examples
def setup_monitoring_example():
"""Example of setting up comprehensive monitoring"""
# Assume we have container client
container = get_container_client() # Your container client
# Setup monitoring
monitor = CosmosDBMonitor(container, enable_detailed_logging=True)
# Setup alerting
alerting = AlertingSystem(monitor)
# Add custom alert rules
def high_error_rate_condition(metrics):
if len(metrics) < 10:
return False
error_count = len([m for m in metrics if m.status_code != 200])
return error_count / len(metrics) > 0.1 # 10% error rate
def error_rate_action(rule_name, metrics):
print(f"ALERT: {rule_name} triggered - High error rate detected!")
# Could send to Slack, email, etc.
alerting.add_alert_rule(
"high_error_rate",
high_error_rate_condition,
error_rate_action,
cooldown_minutes=10
)
# Example usage with tracking
def tracked_create_item(item_data):
return monitor.track_operation(
container.create_item,
'create_item',
body=item_data
)
# Use tracked operations
try:
result = tracked_create_item({
'id': 'test-item',
'name': 'Test Product',
'category': 'test'
})
print("Item created successfully")
except Exception as e:
print(f"Item creation failed: {e}")
# Get performance summary
summary = monitor.get_performance_summary(hours=1)
print(f"Performance Summary: {json.dumps(summary, indent=2)}")
# Check alerts
alerting.check_alerts()
return monitor, alertingPython15. Integration Patterns
Event-Driven Architecture
graph LR
A[Application] --> B[Cosmos DB]
B --> C[Change Feed]
C --> D[Azure Functions]
C --> E[Event Hubs]
C --> F[Service Bus]
D --> G[Process Events]
E --> H[Stream Analytics]
F --> I[Message Processing]
G --> J[Update Search Index]
G --> K[Send Notifications]
G --> L[Update Cache]
H --> M[Real-time Analytics]
I --> N[Business Logic]Integration Implementation
# integration_patterns.py
import asyncio
import json
from typing import Dict, Any, List, Callable
from abc import ABC, abstractmethod
from datetime import datetime
import aiohttp
class IntegrationPattern(ABC):
"""Base class for integration patterns"""
@abstractmethod
async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
pass
class SearchIndexIntegration(IntegrationPattern):
"""Integration with Azure Cognitive Search or Elasticsearch"""
def __init__(self, search_endpoint: str, search_key: str):
self.search_endpoint = search_endpoint
self.search_key = search_key
self.session = None
async def initialize(self):
"""Initialize HTTP session for search operations"""
self.session = aiohttp.ClientSession(
headers={
'Content-Type': 'application/json',
'api-key': self.search_key
}
)
async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Process Cosmos DB change event and update search index"""
try:
if not self.session:
await self.initialize()
# Transform Cosmos DB document for search index
search_document = self._transform_for_search(event)
# Determine operation type
operation_type = self._get_operation_type(event)
if operation_type == 'upsert':
result = await self._upsert_document(search_document)
elif operation_type == 'delete':
result = await self._delete_document(event.get('id'))
else:
result = {'status': 'skipped', 'reason': f'Unknown operation: {operation_type}'}
return {
'integration': 'search_index',
'operation': operation_type,
'document_id': event.get('id'),
'result': result,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
return {
'integration': 'search_index',
'error': str(e),
'document_id': event.get('id'),
'timestamp': datetime.utcnow().isoformat()
}
def _transform_for_search(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Transform Cosmos DB document for search index"""
# Remove Cosmos DB specific fields
search_doc = {k: v for k, v in document.items()
if not k.startswith('_')}
# Add searchable text field
searchable_fields = ['name', 'description', 'category']
search_text = ' '.join([
str(search_doc.get(field, ''))
for field in searchable_fields
])
search_doc['searchText'] = search_text
# Add last updated timestamp
search_doc['lastUpdated'] = datetime.utcnow().isoformat()
return search_doc
def _get_operation_type(self, event: Dict[str, Any]) -> str:
"""Determine operation type from event"""
# In a real implementation, you'd check change feed metadata
# For now, assume upsert for all events
return 'upsert'
async def _upsert_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Upsert document in search index"""
index_name = 'products' # Your search index name
url = f"{self.search_endpoint}/indexes/{index_name}/docs/index"
payload = {
"value": [
{
"@search.action": "mergeOrUpload",
**document
}
]
}
async with self.session.post(url, json=payload) as response:
result = await response.json()
return {
'status_code': response.status,
'response': result
}
async def _delete_document(self, document_id: str) -> Dict[str, Any]:
"""Delete document from search index"""
index_name = 'products'
url = f"{self.search_endpoint}/indexes/{index_name}/docs/index"
payload = {
"value": [
{
"@search.action": "delete",
"id": document_id
}
]
}
async with self.session.post(url, json=payload) as response:
result = await response.json()
return {
'status_code': response.status,
'response': result
}
class CacheIntegration(IntegrationPattern):
"""Integration with Azure Cache for Redis"""
def __init__(self, redis_connection_string: str):
self.redis_connection_string = redis_connection_string
self.redis_client = None
async def initialize(self):
"""Initialize Redis client"""
import aioredis
self.redis_client = await aioredis.from_url(self.redis_connection_string)
async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Process Cosmos DB change event and update cache"""
try:
if not self.redis_client:
await self.initialize()
document_id = event.get('id')
category = event.get('category', 'unknown')
# Update individual document cache
cache_key = f"doc:{category}:{document_id}"
await self.redis_client.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(event)
)
# Update category list cache
category_key = f"category:{category}"
await self.redis_client.sadd(category_key, document_id)
await self.redis_client.expire(category_key, 3600)
# Invalidate related caches
await self._invalidate_related_caches(event)
return {
'integration': 'cache',
'operation': 'update',
'document_id': document_id,
'cache_keys_updated': [cache_key, category_key],
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
return {
'integration': 'cache',
'error': str(e),
'document_id': event.get('id'),
'timestamp': datetime.utcnow().isoformat()
}
async def _invalidate_related_caches(self, event: Dict[str, Any]):
"""Invalidate caches related to the changed document"""
category = event.get('category')
if category:
# Invalidate category summary cache
summary_key = f"summary:{category}"
await self.redis_client.delete(summary_key)
class NotificationIntegration(IntegrationPattern):
"""Integration with notification systems (Email, SMS, Push)"""
def __init__(self, notification_config: Dict[str, Any]):
self.config = notification_config
async def process_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Process event and send notifications"""
try:
notifications_sent = []
# Check if event triggers notifications
if self._should_notify(event):
notification_message = self._create_notification_message(event)
# Send email notification
if self.config.get('email_enabled'):
email_result = await self._send_email_notification(notification_message)
notifications_sent.append({'type': 'email', 'result': email_result})
# Send webhook notification
if self.config.get('webhook_enabled'):
webhook_result = await self._send_webhook_notification(event)
notifications_sent.append({'type': 'webhook', 'result': webhook_result})
return {
'integration': 'notifications',
'document_id': event.get('id'),
'notifications_sent': notifications_sent,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
return {
'integration': 'notifications',
'error': str(e),
'document_id': event.get('id'),
'timestamp': datetime.utcnow().isoformat()
}
def _should_notify(self, event: Dict[str, Any]) -> bool:
"""Determine if event should trigger notifications"""
# Example: notify for high-value orders
if event.get('category') == 'order' and event.get('total', 0) > 1000:
return True
# Example: notify for low inventory
if event.get('category') == 'product' and event.get('inventory', 0) < 10:
return True
return False
def _create_notification_message(self, event: Dict[str, Any]) -> str:
"""Create notification message from event"""
if event.get('category') == 'order':
return f"High-value order received: ${event.get('total')} (Order ID: {event.get('id')})"
elif event.get('category') == 'product':
return f"Low inventory alert: {event.get('name')} has {event.get('inventory')} items left"
else:
return f"Document updated: {event.get('id')}"
async def _send_email_notification(self, message: str) -> Dict[str, Any]:
"""Send email notification (placeholder implementation)"""
# In real implementation, integrate with SendGrid, AWS SES, etc.
return {
'status': 'sent',
'message': message,
'method': 'email'
}
async def _send_webhook_notification(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Send webhook notification"""
webhook_url = self.config.get('webhook_url')
if not webhook_url:
return {'status': 'skipped', 'reason': 'No webhook URL configured'}
try:
async with aiohttp.ClientSession() as session:
payload = {
'event_type': 'cosmos_db_change',
'data': event,
'timestamp': datetime.utcnow().isoformat()
}
async with session.post(webhook_url, json=payload) as response:
return {
'status': 'sent',
'status_code': response.status,
'webhook_url': webhook_url
}
except Exception as e:
return {
'status': 'failed',
'error': str(e),
'webhook_url': webhook_url
}
class IntegrationOrchestrator:
"""Orchestrates multiple integration patterns"""
def __init__(self):
self.integrations: List[IntegrationPattern] = []
self.event_queue = asyncio.Queue()
self.processing = False
def add_integration(self, integration: IntegrationPattern):
"""Add an integration pattern"""
self.integrations.append(integration)
async def process_change_feed_event(self, event: Dict[str, Any]):
"""Process a single change feed event through all integrations"""
await self.event_queue.put(event)
async def start_processing(self):
"""Start processing events from the queue"""
self.processing = True
while self.processing:
try:
# Wait for event with timeout
event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
# Process event through all integrations
await self._process_event_through_integrations(event)
except asyncio.TimeoutError:
# No events to process, continue
continue
except Exception as e:
logging.error(f"Error processing event: {e}")
async def stop_processing(self):
"""Stop processing events"""
self.processing = False
async def _process_event_through_integrations(self, event: Dict[str, Any]):
"""Process event through all registered integrations"""
tasks = []
for integration in self.integrations:
task = asyncio.create_task(integration.process_event(event))
tasks.append(task)
# Wait for all integrations to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log results
for i, result in enumerate(results):
if isinstance(result, Exception):
logging.error(f"Integration {i} failed: {result}")
else:
logging.info(f"Integration {i} result: {result}")
# Microservices integration pattern
class MicroserviceEventPublisher:
"""Publish events to microservices via message broker"""
def __init__(self, message_broker_config: Dict[str, Any]):
self.config = message_broker_config
self.publisher = None
async def initialize(self):
"""Initialize message broker connection"""
# Example for Azure Service Bus
from azure.servicebus.aio import ServiceBusClient
connection_string = self.config.get('connection_string')
self.publisher = ServiceBusClient.from_connection_string(connection_string)
async def publish_event(self, event: Dict[str, Any], routing_key: str = None):
"""Publish event to message broker"""
try:
if not self.publisher:
await self.initialize()
# Determine topic/queue based on event type
topic_name = routing_key or self._get_topic_for_event(event)
# Create message
message_body = json.dumps({
'event_type': 'cosmos_db_change',
'data': event,
'timestamp': datetime.utcnow().isoformat(),
'source': 'cosmos_db_change_feed'
})
# Send message
sender = self.publisher.get_topic_sender(topic_name)
async with sender:
from azure.servicebus import ServiceBusMessage
message = ServiceBusMessage(message_body)
await sender.send_messages(message)
return {
'status': 'published',
'topic': topic_name,
'message_size': len(message_body)
}
except Exception as e:
return {
'status': 'failed',
'error': str(e),
'topic': topic_name
}
def _get_topic_for_event(self, event: Dict[str, Any]) -> str:
"""Determine message topic based on event content"""
category = event.get('category', 'unknown')
return f"cosmos-changes-{category}"
# Usage example
async def setup_integration_example():
"""Example of setting up comprehensive integrations"""
# Initialize integrations
search_integration = SearchIndexIntegration(
search_endpoint="https://your-search-service.search.windows.net",
search_key="your-search-key"
)
cache_integration = CacheIntegration(
redis_connection_string="redis://your-redis-instance:6379"
)
notification_integration = NotificationIntegration({
'email_enabled': True,
'webhook_enabled': True,
'webhook_url': 'https://your-webhook-endpoint.com/cosmos-events'
})
# Setup orchestrator
orchestrator = IntegrationOrchestrator()
orchestrator.add_integration(search_integration)
orchestrator.add_integration(cache_integration)
orchestrator.add_integration(notification_integration)
# Simulate processing change feed events
sample_events = [
{
'id': 'product-001',
'name': 'Laptop',
'category': 'electronics',
'price': 999.99,
'inventory': 5
},
{
'id': 'order-001',
'category': 'order',
'total': 1500.00,
'customer_id': 'customer-123'
}
]
# Process events
for event in sample_events:
await orchestrator.process_change_feed_event(event)
# Start processing (in real scenario, this would run continuously)
# await orchestrator.start_processing()
print("Integration example setup completed")Python16. Best Practices and Production Deployment
Production Readiness Checklist
graph TD
A[Production Readiness] --> B[Security]
A --> C[Performance]
A --> D[Monitoring]
A --> E[Disaster Recovery]
A --> F[Cost Optimization]
B --> G[Authentication/Authorization]
B --> H[Network Security]
B --> I[Data Encryption]
C --> J[Throughput Planning]
C --> K[Indexing Strategy]
C --> L[Partition Design]
D --> M[Metrics & Alerts]
D --> N[Logging Strategy]
D --> O[Health Checks]
E --> P[Backup Strategy]
E --> Q[Multi-region Setup]
E --> R[Failover Testing]
F --> S[Reserved Capacity]
F --> T[Autoscaling]
F --> U[Resource Optimization]Production Best Practices Implementation
# production_best_practices.py
import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import json
import os
@dataclass
class ProductionConfig:
# Connection settings
endpoint: str
key: str
database_name: str
# Performance settings
max_connections: int = 100
connection_timeout: int = 30
retry_attempts: int = 3
# Monitoring settings
enable_detailed_logging: bool = True
log_level: str = "INFO"
metrics_collection_interval: int = 60
# Security settings
enable_ssl: bool = True
certificate_verification: bool = True
# Backup settings
backup_enabled: bool = True
backup_retention_days: int = 30
# Multi-region settings
read_regions: List[str] = None
write_regions: List[str] = None
@classmethod
def from_environment(cls):
"""Create configuration from environment variables"""
return cls(
endpoint=os.getenv('COSMOS_ENDPOINT'),
key=os.getenv('COSMOS_KEY'),
database_name=os.getenv('COSMOS_DATABASE'),
max_connections=int(os.getenv('COSMOS_MAX_CONNECTIONS', '100')),
connection_timeout=int(os.getenv('COSMOS_CONNECTION_TIMEOUT', '30')),
enable_detailed_logging=os.getenv('COSMOS_DETAILED_LOGGING', 'true').lower() == 'true'
)
class ProductionCosmosClient:
"""Production-ready Cosmos DB client with best practices"""
def __init__(self, config: ProductionConfig):
self.config = config
self.client = None
self.database = None
self.containers = {}
self.connection_pool = None
self._setup_logging()
def _setup_logging(self):
"""Setup structured logging for production"""
self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
self.logger.setLevel(getattr(logging, self.config.log_level))
# Create structured formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - '
'%(funcName)s:%(lineno)d - %(message)s'
)
# Add console handler if not exists
if not self.logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
async def initialize(self):
"""Initialize client with production settings"""
try:
from azure.cosmos.aio import CosmosClient
from azure.cosmos import ConnectionPolicy
# Configure connection policy for production
connection_policy = ConnectionPolicy()
connection_policy.RequestTimeout = self.config.connection_timeout * 1000
connection_policy.ConnectionMode = 'Gateway' # More stable for production
connection_policy.MaxPoolSize = self.config.max_connections
connection_policy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = self.config.retry_attempts
# Create client with production settings
self.client = CosmosClient(
url=self.config.endpoint,
credential=self.config.key,
connection_policy=connection_policy
)
# Initialize database
self.database = self.client.get_database_client(self.config.database_name)
# Verify connection
await self._verify_connection()
self.logger.info("Production Cosmos DB client initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Cosmos DB client: {e}")
raise
async def _verify_connection(self):
"""Verify database connection and accessibility"""
try:
# Test connection by reading database properties
properties = await self.database.read()
self.logger.info(f"Connected to database: {properties['id']}")
except Exception as e:
self.logger.error(f"Connection verification failed: {e}")
raise
async def get_container(self, container_name: str,
partition_key_path: str = "/id",
create_if_not_exists: bool = False):
"""Get container with caching and error handling"""
if container_name in self.containers:
return self.containers[container_name]
try:
if create_if_not_exists:
from azure.cosmos import PartitionKey
container = await self.database.create_container_if_not_exists(
id=container_name,
partition_key=PartitionKey(path=partition_key_path),
offer_throughput=400
)
else:
container = self.database.get_container_client(container_name)
# Cache container reference
self.containers[container_name] = container
self.logger.info(f"Container '{container_name}' ready")
return container
except Exception as e:
self.logger.error(f"Failed to get container '{container_name}': {e}")
raise
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on Cosmos DB connection"""
health_status = {
'timestamp': datetime.utcnow().isoformat(),
'status': 'unknown',
'checks': {}
}
try:
# Check database connectivity
start_time = datetime.utcnow()
await self.database.read()
connectivity_time = (datetime.utcnow() - start_time).total_seconds() * 1000
health_status['checks']['database_connectivity'] = {
'status': 'healthy',
'response_time_ms': connectivity_time
}
# Check container accessibility (if any containers are cached)
for container_name, container in self.containers.items():
try:
start_time = datetime.utcnow()
# Simple query to test container
query_iterable = container.query_items(
query="SELECT TOP 1 * FROM c",
enable_cross_partition_query=True
)
await query_iterable.__anext__() # Get first result
container_time = (datetime.utcnow() - start_time).total_seconds() * 1000
health_status['checks'][f'container_{container_name}'] = {
'status': 'healthy',
'response_time_ms': container_time
}
except Exception as e:
health_status['checks'][f'container_{container_name}'] = {
'status': 'unhealthy',
'error': str(e)
}
# Determine overall status
unhealthy_checks = [
check for check in health_status['checks'].values()
if check['status'] != 'healthy'
]
health_status['status'] = 'healthy' if not unhealthy_checks else 'unhealthy'
except Exception as e:
health_status['status'] = 'unhealthy'
health_status['error'] = str(e)
return health_status
async def close(self):
"""Properly close the client and cleanup resources"""
try:
if self.client:
await self.client.close()
self.logger.info("Cosmos DB client closed successfully")
except Exception as e:
self.logger.error(f"Error closing Cosmos DB client: {e}")
class ProductionOperationWrapper:
"""Wrapper for production-ready database operations"""
def __init__(self, client: ProductionCosmosClient):
self.client = client
self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
async def execute_with_retry(self, operation_func, *args, **kwargs):
"""Execute operation with retry logic and proper error handling"""
max_retries = self.client.config.retry_attempts
for attempt in range(max_retries + 1):
try:
start_time = datetime.utcnow()
result = await operation_func(*args, **kwargs)
# Log successful operation
duration = (datetime.utcnow() - start_time).total_seconds() * 1000
self.logger.info(
f"Operation {operation_func.__name__} completed successfully "
f"in {duration:.2f}ms (attempt {attempt + 1})"
)
return result
except Exception as e:
self.logger.warning(
f"Operation {operation_func.__name__} failed "
f"(attempt {attempt + 1}/{max_retries + 1}): {e}"
)
if attempt == max_retries:
self.logger.error(
f"Operation {operation_func.__name__} failed "
f"after {max_retries + 1} attempts"
)
raise
# Wait before retry (exponential backoff)
wait_time = (2 ** attempt) + (0.1 * attempt)
await asyncio.sleep(wait_time)
async def batch_operation(self, operations: List[Dict[str, Any]],
container, batch_size: int = 25):
"""Execute batch operations efficiently"""
results = []
for i in range(0, len(operations), batch_size):
batch = operations[i:i + batch_size]
try:
# Group operations by partition key for transaction batches
partition_groups = {}
for op in batch:
partition_key = op.get('partition_key', 'unknown')
if partition_key not in partition_groups:
partition_groups[partition_key] = []
partition_groups[partition_key].append(op)
# Execute each partition group
for partition_key, group_ops in partition_groups.items():
batch_result = await self._execute_partition_batch(
group_ops, container, partition_key
)
results.extend(batch_result)
except Exception as e:
self.logger.error(f"Batch operation failed for batch {i//batch_size + 1}: {e}")
# Add error results for this batch
for op in batch:
results.append({
'operation': op,
'status': 'failed',
'error': str(e)
})
return results
async def _execute_partition_batch(self, operations: List[Dict[str, Any]],
container, partition_key: str):
"""Execute operations for a single partition"""
from azure.cosmos import TransactionalBatch
try:
batch = TransactionalBatch()
for op in operations:
op_type = op['type']
data = op['data']
if op_type == 'create':
batch.create_item(body=data)
elif op_type == 'upsert':
batch.upsert_item(body=data)
elif op_type == 'replace':
batch.replace_item(item=data['id'], body=data)
elif op_type == 'delete':
batch.delete_item(item=data['id'])
# Execute the batch
batch_response = await container.execute_item_batch(
batch_operations=batch,
partition_key=partition_key
)
# Process results
results = []
for i, op in enumerate(operations):
results.append({
'operation': op,
'status': 'success',
'result': batch_response[i] if i < len(batch_response) else None
})
return results
except Exception as e:
# Return error for all operations in this batch
return [
{
'operation': op,
'status': 'failed',
'error': str(e)
}
for op in operations
]
class ProductionMonitoring:
"""Production monitoring and alerting"""
def __init__(self, client: ProductionCosmosClient):
self.client = client
self.metrics = {
'operations_total': 0,
'operations_failed': 0,
'total_ru_consumed': 0,
'avg_latency_ms': 0,
'last_health_check': None
}
self.alerts_enabled = True
async def start_monitoring(self):
"""Start continuous monitoring"""
while True:
try:
await self._collect_metrics()
await self._check_alerts()
await asyncio.sleep(self.client.config.metrics_collection_interval)
except Exception as e:
logging.error(f"Monitoring error: {e}")
await asyncio.sleep(60) # Wait longer on error
async def _collect_metrics(self):
"""Collect performance metrics"""
try:
# Perform health check
health_result = await self.client.health_check()
self.metrics['last_health_check'] = health_result
# Log metrics
logging.info(f"Metrics collected: {json.dumps(self.metrics, indent=2)}")
except Exception as e:
logging.error(f"Failed to collect metrics: {e}")
async def _check_alerts(self):
"""Check for alert conditions"""
if not self.alerts_enabled:
return
# Check error rate
if self.metrics['operations_total'] > 0:
error_rate = self.metrics['operations_failed'] / self.metrics['operations_total']
if error_rate > 0.05: # 5% error rate threshold
await self._send_alert('high_error_rate', f"Error rate: {error_rate:.2%}")
# Check health status
health_check = self.metrics.get('last_health_check')
if health_check and health_check['status'] != 'healthy':
await self._send_alert('health_check_failed', "Health check failed")
async def _send_alert(self, alert_type: str, message: str):
"""Send alert (placeholder implementation)"""
alert_message = {
'timestamp': datetime.utcnow().isoformat(),
'alert_type': alert_type,
'message': message,
'metrics': self.metrics
}
logging.warning(f"ALERT: {json.dumps(alert_message)}")
# In production, integrate with your alerting system:
# - Send to Azure Monitor
# - Send to PagerDuty
# - Send to Slack
# - Send email notifications
# Deployment utilities
class DeploymentHelper:
"""Helper utilities for production deployment"""
@staticmethod
def validate_environment():
"""Validate environment configuration"""
required_env_vars = [
'COSMOS_ENDPOINT',
'COSMOS_KEY',
'COSMOS_DATABASE'
]
missing_vars = []
for var in required_env_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
return True
@staticmethod
def create_production_containers(database_client, container_configs: List[Dict]):
"""Create containers with production settings"""
created_containers = []
for config in container_configs:
try:
from azure.cosmos import PartitionKey
container = database_client.create_container_if_not_exists(
id=config['name'],
partition_key=PartitionKey(path=config['partition_key_path']),
offer_throughput=config.get('throughput', 400),
indexing_policy=config.get('indexing_policy')
)
created_containers.append(container)
logging.info(f"Container '{config['name']}' created/verified")
except Exception as e:
logging.error(f"Failed to create container '{config['name']}': {e}")
raise
return created_containers
@staticmethod
def setup_production_logging():
"""Setup production logging configuration"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('cosmos_app.log')
]
)
# Usage example for production deployment
async def production_deployment_example():
"""Example of production deployment setup"""
# Validate environment
DeploymentHelper.validate_environment()
# Setup logging
DeploymentHelper.setup_production_logging()
# Create production configuration
config = ProductionConfig.from_environment()
# Initialize production client
cosmos_client = ProductionCosmosClient(config)
await cosmos_client.initialize()
# Create production containers
container_configs = [
{
'name': 'products',
'partition_key_path': '/category',
'throughput': 1000,
'indexing_policy': {
'indexingMode': 'consistent',
'automatic': True,
'includedPaths': [{'path': '/*'}],
'excludedPaths': [{'path': '/description/*'}]
}
},
{
'name': 'orders',
'partition_key_path': '/customerId',
'throughput': 800
}
]
created_containers = DeploymentHelper.create_production_containers(
cosmos_client.database,
container_configs
)
# Setup operation wrapper
operation_wrapper = ProductionOperationWrapper(cosmos_client)
# Setup monitoring
monitoring = ProductionMonitoring(cosmos_client)
# Start monitoring in background
monitoring_task = asyncio.create_task(monitoring.start_monitoring())
try:
# Your application logic here
logging.info("Production deployment completed successfully")
# Keep monitoring running
await monitoring_task
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
await cosmos_client.close()
if __name__ == "__main__":
asyncio.run(production_deployment_example())Python17. Advanced Scenarios
Multi-Master Global Distribution
graph TB
A[Global Application] --> B[Region 1: West US]
A --> C[Region 2: East Europe]
A --> D[Region 3: Southeast Asia]
B --> E[Read/Write Replicas]
C --> F[Read/Write Replicas]
D --> G[Read/Write Replicas]
E <--> F
F <--> G
G <--> E
H[Conflict Resolution] --> I[Last Writer Wins]
H --> J[Custom Merge Procedures]
H --> K[Application-Defined]Advanced Scenarios Implementation
# advanced_scenarios.py
import asyncio
import json
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timedelta
from enum import Enum
import hashlib
class ConflictResolutionMode(Enum):
LAST_WRITER_WINS = "lastWriterWins"
CUSTOM_PROCEDURE = "customProcedure"
APPLICATION_DEFINED = "applicationDefined"
class GlobalDistributionManager:
"""Manage global distribution and multi-master scenarios"""
def __init__(self, primary_client, region_clients: Dict[str, Any]):
self.primary_client = primary_client
self.region_clients = region_clients
self.conflict_resolver = ConflictResolver()
async def setup_global_distribution(self, containers: List[str],
conflict_resolution_mode: ConflictResolutionMode):
"""Setup global distribution for containers"""
setup_results = {}
for container_name in containers:
try:
# Configure conflict resolution for each container
await self._configure_conflict_resolution(
container_name,
conflict_resolution_mode
)
# Verify replication across regions
replication_status = await self._verify_global_replication(container_name)
setup_results[container_name] = {
'status': 'success',
'conflict_resolution': conflict_resolution_mode.value,
'replication_status': replication_status
}
except Exception as e:
setup_results[container_name] = {
'status': 'failed',
'error': str(e)
}
return setup_results
async def _configure_conflict_resolution(self, container_name: str,
mode: ConflictResolutionMode):
"""Configure conflict resolution for a container"""
conflict_resolution_policy = {
'mode': mode.value
}
if mode == ConflictResolutionMode.CUSTOM_PROCEDURE:
# Set up custom conflict resolution stored procedure
conflict_resolution_policy['conflictResolutionProcedure'] = 'customConflictResolver'
# Create the conflict resolution stored procedure
await self._create_conflict_resolution_procedure(container_name)
elif mode == ConflictResolutionMode.LAST_WRITER_WINS:
conflict_resolution_policy['conflictResolutionPath'] = '/_ts'
# Apply the policy (this would be done through Azure Portal or ARM templates in practice)
# For demonstration, we log the configuration
print(f"Configured conflict resolution for {container_name}: {conflict_resolution_policy}")
async def _create_conflict_resolution_procedure(self, container_name: str):
"""Create custom conflict resolution stored procedure"""
conflict_resolution_sproc = """
function customConflictResolver(incomingItem, existingItem, isTombstone, conflictingItems) {
var context = getContext();
var response = context.getResponse();
// Custom logic for conflict resolution
if (isTombstone) {
// Handle delete conflicts
response.setBody(null); // Accept delete
return;
}
if (!existingItem) {
// No conflict, accept incoming item
response.setBody(incomingItem);
return;
}
// Merge strategy: prefer higher priority or newer timestamp
var resolved = mergeItems(incomingItem, existingItem);
response.setBody(resolved);
function mergeItems(incoming, existing) {
// Example merge logic
var merged = Object.assign({}, existing);
// Prefer incoming item if it has higher priority
if (incoming.priority > existing.priority) {
merged = Object.assign(merged, incoming);
} else if (incoming.priority === existing.priority) {
// Same priority, merge specific fields
if (incoming._ts > existing._ts) {
merged.lastUpdated = incoming.lastUpdated;
merged.data = incoming.data;
}
}
// Always update the modification timestamp
merged._lastResolved = new Date().toISOString();
return merged;
}
}
"""
# In practice, you would create this stored procedure on the container
print(f"Created conflict resolution procedure for {container_name}")
async def _verify_global_replication(self, container_name: str) -> Dict[str, Any]:
"""Verify replication status across regions"""
replication_status = {}
for region, client in self.region_clients.items():
try:
# Test read operation from each region
container = await client.get_container(container_name)
# Perform a simple query to verify accessibility
query_result = container.query_items(
query="SELECT TOP 1 * FROM c",
enable_cross_partition_query=True
)
# Try to get first item (may be empty)
try:
first_item = await query_result.__anext__()
item_count = 1
except StopAsyncIteration:
item_count = 0
replication_status[region] = {
'status': 'healthy',
'accessible': True,
'sample_item_count': item_count,
'test_timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
replication_status[region] = {
'status': 'unhealthy',
'accessible': False,
'error': str(e),
'test_timestamp': datetime.utcnow().isoformat()
}
return replication_status
async def write_with_preference(self, data: Dict[str, Any],
preferred_region: str = None,
consistency_level: str = "Session"):
"""Write data with region preference"""
target_client = self.primary_client
if preferred_region and preferred_region in self.region_clients:
target_client = self.region_clients[preferred_region]
try:
# Add metadata for conflict resolution
data['_writtenAt'] = datetime.utcnow().isoformat()
data['_writtenFrom'] = preferred_region or 'primary'
data['_version'] = data.get('_version', 0) + 1
# Perform write operation
container = await target_client.get_container('global_data')
result = await container.upsert_item(body=data)
return {
'success': True,
'region': preferred_region or 'primary',
'result': result,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
# Fallback to primary region if preferred region fails
if preferred_region and target_client != self.primary_client:
return await self.write_with_preference(data, None, consistency_level)
else:
return {
'success': False,
'error': str(e),
'region': preferred_region or 'primary'
}
class ConflictResolver:
"""Handle conflict resolution scenarios"""
def __init__(self):
self.resolution_strategies = {
'timestamp': self._resolve_by_timestamp,
'priority': self._resolve_by_priority,
'merge': self._resolve_by_merge,
'custom': self._resolve_custom
}
async def resolve_conflict(self, conflict_data: Dict[str, Any],
strategy: str = 'timestamp') -> Dict[str, Any]:
"""Resolve conflicts using specified strategy"""
if strategy not in self.resolution_strategies:
raise ValueError(f"Unknown resolution strategy: {strategy}")
resolver = self.resolution_strategies[strategy]
return await resolver(conflict_data)
async def _resolve_by_timestamp(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve by last write timestamp"""
items = conflict_data.get('conflicting_items', [])
if not items:
return {'resolved_item': None, 'strategy': 'timestamp'}
# Find item with latest timestamp
latest_item = max(items, key=lambda x: x.get('_ts', 0))
return {
'resolved_item': latest_item,
'strategy': 'timestamp',
'resolution_reason': f"Selected item with latest timestamp: {latest_item.get('_ts')}"
}
async def _resolve_by_priority(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve by priority field"""
items = conflict_data.get('conflicting_items', [])
if not items:
return {'resolved_item': None, 'strategy': 'priority'}
# Find item with highest priority
highest_priority_item = max(items, key=lambda x: x.get('priority', 0))
return {
'resolved_item': highest_priority_item,
'strategy': 'priority',
'resolution_reason': f"Selected item with highest priority: {highest_priority_item.get('priority')}"
}
async def _resolve_by_merge(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve by merging conflicting items"""
items = conflict_data.get('conflicting_items', [])
if not items:
return {'resolved_item': None, 'strategy': 'merge'}
if len(items) == 1:
return {'resolved_item': items[0], 'strategy': 'merge'}
# Merge items
merged_item = {}
# Start with the first item
merged_item.update(items[0])
# Merge subsequent items
for item in items[1:]:
for key, value in item.items():
if key.startswith('_'):
continue # Skip system fields
if key not in merged_item:
merged_item[key] = value
else:
# Merge strategy based on field type
if isinstance(value, (int, float)) and isinstance(merged_item[key], (int, float)):
merged_item[key] = max(value, merged_item[key]) # Take maximum
elif isinstance(value, list):
# Merge arrays
if isinstance(merged_item[key], list):
merged_item[key] = list(set(merged_item[key] + value))
else:
merged_item[key] = value
elif isinstance(value, str) and len(value) > len(str(merged_item[key])):
merged_item[key] = value # Take longer string
# Add merge metadata
merged_item['_merged'] = True
merged_item['_mergedAt'] = datetime.utcnow().isoformat()
merged_item['_mergedFrom'] = [item.get('id') for item in items]
return {
'resolved_item': merged_item,
'strategy': 'merge',
'resolution_reason': f"Merged {len(items)} conflicting items"
}
async def _resolve_custom(self, conflict_data: Dict[str, Any]) -> Dict[str, Any]:
"""Custom resolution logic"""
# Implement your custom business logic here
items = conflict_data.get('conflicting_items', [])
# Example: Business-specific resolution
business_rules_item = None
for item in items:
if item.get('source') == 'authoritative_system':
business_rules_item = item
break
if business_rules_item:
return {
'resolved_item': business_rules_item,
'strategy': 'custom',
'resolution_reason': 'Selected item from authoritative system'
}
# Fallback to timestamp resolution
return await self._resolve_by_timestamp(conflict_data)
class EventSourcingPattern:
"""Implement event sourcing pattern with Cosmos DB"""
def __init__(self, container_client):
self.container = container_client
self.event_types = {}
def register_event_type(self, event_type: str, handler_func):
"""Register event type with its handler"""
self.event_types[event_type] = handler_func
async def append_event(self, aggregate_id: str, event_type: str,
event_data: Dict[str, Any], expected_version: int = None):
"""Append event to event stream"""
event = {
'id': f"{aggregate_id}_{event_type}_{datetime.utcnow().isoformat()}",
'aggregateId': aggregate_id,
'eventType': event_type,
'eventData': event_data,
'version': expected_version or await self._get_next_version(aggregate_id),
'timestamp': datetime.utcnow().isoformat(),
'category': 'event'
}
try:
# Optimistic concurrency check
if expected_version:
current_version = await self._get_current_version(aggregate_id)
if current_version >= expected_version:
raise Exception(f"Concurrency conflict: expected version {expected_version}, current version {current_version}")
# Store event
result = await self.container.create_item(body=event)
return {
'success': True,
'event': result,
'version': event['version']
}
except Exception as e:
return {
'success': False,
'error': str(e),
'aggregate_id': aggregate_id
}
async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Dict[str, Any]]:
"""Get events for an aggregate"""
query = """
SELECT * FROM c
WHERE c.aggregateId = @aggregate_id
AND c.version >= @from_version
AND c.category = 'event'
ORDER BY c.version
"""
parameters = [
{"name": "@aggregate_id", "value": aggregate_id},
{"name": "@from_version", "value": from_version}
]
events = []
query_iterable = self.container.query_items(
query=query,
parameters=parameters,
partition_key=aggregate_id
)
async for event in query_iterable:
events.append(event)
return events
async def rebuild_aggregate(self, aggregate_id: str, target_version: int = None) -> Dict[str, Any]:
"""Rebuild aggregate from events"""
events = await self.get_events(aggregate_id)
if target_version:
events = [e for e in events if e['version'] <= target_version]
aggregate_state = {}
for event in events:
event_type = event['eventType']
if event_type in self.event_types:
handler = self.event_types[event_type]
aggregate_state = handler(aggregate_state, event['eventData'])
else:
# Default handling - just merge event data
aggregate_state.update(event['eventData'])
# Add metadata
aggregate_state['_aggregateId'] = aggregate_id
aggregate_state['_version'] = events[-1]['version'] if events else 0
aggregate_state['_rebuiltAt'] = datetime.utcnow().isoformat()
return aggregate_state
async def _get_next_version(self, aggregate_id: str) -> int:
"""Get next version number for aggregate"""
current_version = await self._get_current_version(aggregate_id)
return current_version + 1
async def _get_current_version(self, aggregate_id: str) -> int:
"""Get current version number for aggregate"""
query = """
SELECT TOP 1 c.version FROM c
WHERE c.aggregateId = @aggregate_id
AND c.category = 'event'
ORDER BY c.version DESC
"""
parameters = [{"name": "@aggregate_id", "value": aggregate_id}]
query_iterable = self.container.query_items(
query=query,
parameters=parameters,
partition_key=aggregate_id
)
try:
latest_event = await query_iterable.__anext__()
return latest_event['version']
except StopAsyncIteration:
return 0
class CQRSPattern:
"""Command Query Responsibility Segregation pattern"""
def __init__(self, write_container, read_container):
self.write_container = write_container # Command side
self.read_container = read_container # Query side
self.projections = {}
def register_projection(self, projection_name: str, projection_func):
"""Register a read projection"""
self.projections[projection_name] = projection_func
async def execute_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a command (write operation)"""
try:
# Validate command
validation_result = await self._validate_command(command)
if not validation_result['valid']:
return {
'success': False,
'error': validation_result['error']
}
# Execute command
command_result = await self._process_command(command)
# Update read projections asynchronously
asyncio.create_task(self._update_projections(command))
return {
'success': True,
'result': command_result,
'command_id': command.get('id')
}
except Exception as e:
return {
'success': False,
'error': str(e),
'command_id': command.get('id')
}
async def execute_query(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a query (read operation)"""
try:
query_type = query.get('type')
if query_type == 'get_by_id':
result = await self._query_by_id(query)
elif query_type == 'search':
result = await self._query_search(query)
elif query_type == 'aggregate':
result = await self._query_aggregate(query)
else:
result = await self._query_custom(query)
return {
'success': True,
'data': result,
'query_id': query.get('id')
}
except Exception as e:
return {
'success': False,
'error': str(e),
'query_id': query.get('id')
}
async def _validate_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
"""Validate command before execution"""
# Basic validation
if not command.get('type'):
return {'valid': False, 'error': 'Command type is required'}
if not command.get('data'):
return {'valid': False, 'error': 'Command data is required'}
# Business rule validation
command_type = command['type']
if command_type == 'create_order':
return await self._validate_create_order(command['data'])
elif command_type == 'update_inventory':
return await self._validate_update_inventory(command['data'])
else:
return {'valid': True}
async def _validate_create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate create order command"""
if not order_data.get('customer_id'):
return {'valid': False, 'error': 'Customer ID is required'}
if not order_data.get('items') or len(order_data['items']) == 0:
return {'valid': False, 'error': 'Order must have at least one item'}
# Check inventory availability
for item in order_data['items']:
product_id = item.get('product_id')
quantity = item.get('quantity', 0)
if quantity <= 0:
return {'valid': False, 'error': f'Invalid quantity for product {product_id}'}
# Check inventory (this would query current inventory)
# For demo, assume inventory check passes
return {'valid': True}
async def _validate_update_inventory(self, inventory_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate update inventory command"""
if not inventory_data.get('product_id'):
return {'valid': False, 'error': 'Product ID is required'}
if 'quantity' not in inventory_data:
return {'valid': False, 'error': 'Quantity is required'}
if inventory_data['quantity'] < 0:
return {'valid': False, 'error': 'Quantity cannot be negative'}
return {'valid': True}
async def _process_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
"""Process the validated command"""
command_record = {
'id': command.get('id', f"cmd_{datetime.utcnow().isoformat()}"),
'type': command['type'],
'data': command['data'],
'timestamp': datetime.utcnow().isoformat(),
'status': 'executed',
'category': 'command'
}
# Store command in write side
result = await self.write_container.create_item(body=command_record)
return result
async def _update_projections(self, command: Dict[str, Any]):
"""Update read projections based on command"""
for projection_name, projection_func in self.projections.items():
try:
projection_data = await projection_func(command)
if projection_data:
# Update read side
await self.read_container.upsert_item(body=projection_data)
except Exception as e:
print(f"Failed to update projection {projection_name}: {e}")
async def _query_by_id(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""Query by ID from read side"""
item_id = query.get('id')
partition_key = query.get('partition_key', item_id)
try:
result = await self.read_container.read_item(
item=item_id,
partition_key=partition_key
)
return result
except Exception:
return None
async def _query_search(self, query: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Search query from read side"""
search_query = query.get('query', "SELECT * FROM c")
parameters = query.get('parameters', [])
results = []
query_iterable = self.read_container.query_items(
query=search_query,
parameters=parameters,
enable_cross_partition_query=True
)
async for item in query_iterable:
results.append(item)
return results
async def _query_aggregate(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""Aggregate query from read side"""
# Example aggregate query
aggregate_query = """
SELECT
COUNT(1) as total_count,
AVG(c.price) as avg_price,
SUM(c.quantity) as total_quantity
FROM c
WHERE c.category = @category
"""
parameters = [
{"name": "@category", "value": query.get('category', 'unknown')}
]
query_iterable = self.read_container.query_items(
query=aggregate_query,
parameters=parameters,
enable_cross_partition_query=True
)
try:
result = await query_iterable.__anext__()
return result
except StopAsyncIteration:
return {}
async def _query_custom(self, query: Dict[str, Any]) -> Any:
"""Custom query logic"""
# Implement custom query logic based on your needs
return {"message": "Custom query not implemented"}
# Usage examples for advanced scenarios
async def advanced_scenarios_example():
"""Example demonstrating advanced scenarios"""
# Setup clients (mock for demonstration)
primary_client = None # Your primary cosmos client
region_clients = {
'west-us': None, # Your west US client
'east-europe': None, # Your east Europe client
'southeast-asia': None # Your southeast Asia client
}
# Global distribution example
global_dist = GlobalDistributionManager(primary_client, region_clients)
# Setup global distribution
containers = ['products', 'orders', 'customers']
distribution_result = await global_dist.setup_global_distribution(
containers,
ConflictResolutionMode.LAST_WRITER_WINS
)
print(f"Global distribution setup: {distribution_result}")
# Write with region preference
product_data = {
'id': 'product-001',
'name': 'Global Product',
'price': 99.99,
'category': 'electronics'
}
write_result = await global_dist.write_with_preference(
product_data,
preferred_region='west-us'
)
print(f"Write result: {write_result}")
# Event sourcing example
container = None # Your container client
event_sourcing = EventSourcingPattern(container)
# Register event handlers
def handle_product_created(state, event_data):
state.update(event_data)
state['status'] = 'active'
return state
def handle_price_updated(state, event_data):
state['price'] = event_data['new_price']
state['price_history'] = state.get('price_history', [])
state['price_history'].append({
'old_price': event_data['old_price'],
'new_price': event_data['new_price'],
'changed_at': datetime.utcnow().isoformat()
})
return state
event_sourcing.register_event_type('ProductCreated', handle_product_created)
event_sourcing.register_event_type('PriceUpdated', handle_price_updated)
# Append events
aggregate_id = 'product-001'
create_event = await event_sourcing.append_event(
aggregate_id,
'ProductCreated',
{
'name': 'Sample Product',
'price': 100.0,
'category': 'electronics'
}
)
price_update_event = await event_sourcing.append_event(
aggregate_id,
'PriceUpdated',
{
'old_price': 100.0,
'new_price': 89.99
}
)
# Rebuild aggregate
current_state = await event_sourcing.rebuild_aggregate(aggregate_id)
print(f"Current product state: {current_state}")
# CQRS example
write_container = None # Your write container
read_container = None # Your read container
cqrs = CQRSPattern(write_container, read_container)
# Register projections
async def order_summary_projection(command):
if command['type'] == 'create_order':
order_data = command['data']
return {
'id': f"summary_{order_data['customer_id']}",
'customer_id': order_data['customer_id'],
'total_orders': 1,
'total_amount': order_data.get('total', 0),
'last_order_date': datetime.utcnow().isoformat(),
'category': 'order_summary'
}
return None
cqrs.register_projection('order_summary', order_summary_projection)
# Execute command
create_order_command = {
'id': 'cmd-001',
'type': 'create_order',
'data': {
'customer_id': 'customer-123',
'items': [
{'product_id': 'product-001', 'quantity': 2, 'price': 89.99}
],
'total': 179.98
}
}
command_result = await cqrs.execute_command(create_order_command)
print(f"Command result: {command_result}")
# Execute query
query = {
'id': 'query-001',
'type': 'get_by_id',
'id': 'summary_customer-123',
'partition_key': 'customer-123'
}
query_result = await cqrs.execute_query(query)
print(f"Query result: {query_result}")
if __name__ == "__main__":
asyncio.run(advanced_scenarios_example())PythonConclusion
This comprehensive guide has covered Azure Cosmos DB development with Python from beginner to expert level. Here’s a summary of what we’ve covered:
Key Topics Covered:
- Foundation: Core concepts, setup, and basic operations
- Intermediate: Advanced querying, partitioning, and performance optimization
- Advanced: Global distribution, event sourcing, CQRS patterns
- Production: Security, monitoring, best practices, and deployment strategies
Key Takeaways:
- Partition Key Design is crucial for performance and scalability
- Consistency Levels should be chosen based on application requirements
- Change Feed enables real-time processing and event-driven architectures
- Monitoring and Alerting are essential for production systems
- Security should be implemented at multiple layers
- Global Distribution provides low-latency access worldwide
Next Steps:
- Practice with the provided examples
- Implement monitoring and alerting in your applications
- Experiment with different consistency levels and partition strategies
- Build production-ready applications using the patterns shown
- Stay Updated with Azure Cosmos DB feature updates and best practices
This guide provides a solid foundation for building scalable, high-performance applications with Azure Cosmos DB and Python. The patterns and practices shown here are used in production systems handling millions of operations daily.
Remember to always test your implementations thoroughly and monitor performance in production environments. Azure Cosmos DB’s flexibility allows you to adapt your approach as your application requirements evolve.
Discover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
