""" Bulkhead pattern implementation for resource isolation. This module provides bulkhead functionality to isolate different types of operations and prevent resource exhaustion. """ import asyncio from typing import Any, Callable, Optional, Dict, Set from datetime import datetime, timedelta from enum import Enum import time from dataclasses import dataclass import uuid from src.core import get_logger logger = get_logger(__name__) class BulkheadType(str, Enum): """Types of bulkhead isolation.""" THREAD_POOL = "thread_pool" # Thread pool isolation SEMAPHORE = "semaphore" # Semaphore-based isolation QUEUE = "queue" # Queue-based isolation @dataclass class BulkheadConfig: """Bulkhead configuration.""" max_concurrent: int = 10 # Maximum concurrent operations queue_size: Optional[int] = None # Queue size (None = unlimited) timeout: float = 30.0 # Operation timeout bulkhead_type: BulkheadType = BulkheadType.SEMAPHORE @dataclass class BulkheadStats: """Bulkhead statistics.""" total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 rejected_requests: int = 0 timeout_requests: int = 0 current_active: int = 0 current_queued: int = 0 max_active_reached: int = 0 max_queued_reached: int = 0 total_wait_time_ms: float = 0.0 total_execution_time_ms: float = 0.0 class BulkheadRejectedException(Exception): """Exception raised when bulkhead rejects request.""" pass class BulkheadTimeoutException(Exception): """Exception raised when operation times out.""" pass class Bulkhead: """ Bulkhead implementation for resource isolation. Features: - Configurable concurrency limits - Queue management - Timeout handling - Performance monitoring - Different isolation strategies """ def __init__( self, name: str, config: Optional[BulkheadConfig] = None ): """ Initialize bulkhead. Args: name: Bulkhead name for identification config: Configuration parameters """ self.name = name self.config = config or BulkheadConfig() self.stats = BulkheadStats() # Initialize based on bulkhead type if self.config.bulkhead_type == BulkheadType.SEMAPHORE: self._semaphore = asyncio.Semaphore(self.config.max_concurrent) elif self.config.bulkhead_type == BulkheadType.QUEUE: self._queue: asyncio.Queue = asyncio.Queue( maxsize=self.config.queue_size or 0 ) self._workers: Set[asyncio.Task] = set() self._start_workers() self._active_operations: Set[str] = set() self._lock = asyncio.Lock() logger.info(f"Bulkhead '{name}' initialized with type {self.config.bulkhead_type}") async def execute(self, func: Callable, *args, **kwargs) -> Any: """ Execute function with bulkhead protection. Args: func: Function to execute *args: Function arguments **kwargs: Function keyword arguments Returns: Function result Raises: BulkheadRejectedException: When bulkhead rejects request BulkheadTimeoutException: When operation times out """ operation_id = str(uuid.uuid4()) start_time = time.time() async with self._lock: self.stats.total_requests += 1 try: if self.config.bulkhead_type == BulkheadType.SEMAPHORE: return await self._execute_with_semaphore( func, operation_id, start_time, *args, **kwargs ) elif self.config.bulkhead_type == BulkheadType.QUEUE: return await self._execute_with_queue( func, operation_id, start_time, *args, **kwargs ) else: # Direct execution (no protection) return await self._execute_function(func, *args, **kwargs) except Exception as e: async with self._lock: if isinstance(e, (BulkheadRejectedException, BulkheadTimeoutException)): if isinstance(e, BulkheadRejectedException): self.stats.rejected_requests += 1 else: self.stats.timeout_requests += 1 else: self.stats.failed_requests += 1 raise async def _execute_with_semaphore( self, func: Callable, operation_id: str, start_time: float, *args, **kwargs ) -> Any: """Execute function using semaphore isolation.""" wait_start = time.time() try: # Try to acquire semaphore with timeout await asyncio.wait_for( self._semaphore.acquire(), timeout=self.config.timeout ) except asyncio.TimeoutError: raise BulkheadTimeoutException( f"Failed to acquire semaphore for bulkhead '{self.name}' " f"within {self.config.timeout}s" ) wait_time = time.time() - wait_start try: async with self._lock: self.stats.current_active += 1 self.stats.max_active_reached = max( self.stats.max_active_reached, self.stats.current_active ) self.stats.total_wait_time_ms += wait_time * 1000 self._active_operations.add(operation_id) # Execute function exec_start = time.time() result = await self._execute_function(func, *args, **kwargs) exec_time = time.time() - exec_start async with self._lock: self.stats.successful_requests += 1 self.stats.total_execution_time_ms += exec_time * 1000 return result finally: async with self._lock: self.stats.current_active -= 1 self._active_operations.discard(operation_id) self._semaphore.release() async def _execute_with_queue( self, func: Callable, operation_id: str, start_time: float, *args, **kwargs ) -> Any: """Execute function using queue isolation.""" # Create operation item operation = { "id": operation_id, "func": func, "args": args, "kwargs": kwargs, "future": asyncio.Future(), "submitted_at": time.time() } try: # Try to add to queue if self.config.queue_size and self._queue.qsize() >= self.config.queue_size: raise BulkheadRejectedException( f"Queue full for bulkhead '{self.name}' " f"(size: {self._queue.qsize()})" ) await self._queue.put(operation) async with self._lock: self.stats.current_queued += 1 self.stats.max_queued_reached = max( self.stats.max_queued_reached, self.stats.current_queued ) # Wait for result with timeout try: result = await asyncio.wait_for( operation["future"], timeout=self.config.timeout ) async with self._lock: self.stats.successful_requests += 1 return result except asyncio.TimeoutError: # Cancel the operation operation["future"].cancel() raise BulkheadTimeoutException( f"Operation timed out in bulkhead '{self.name}' " f"after {self.config.timeout}s" ) finally: async with self._lock: if self.stats.current_queued > 0: self.stats.current_queued -= 1 def _start_workers(self): """Start worker tasks for queue processing.""" for i in range(self.config.max_concurrent): worker = asyncio.create_task( self._worker_loop(f"worker-{i}") ) self._workers.add(worker) async def _worker_loop(self, worker_name: str): """Worker loop for processing queued operations.""" logger.debug(f"Worker {worker_name} started for bulkhead '{self.name}'") while True: try: # Get operation from queue operation = await self._queue.get() if operation is None: # Shutdown signal break operation_id = operation["id"] wait_time = time.time() - operation["submitted_at"] try: async with self._lock: self.stats.current_active += 1 self.stats.max_active_reached = max( self.stats.max_active_reached, self.stats.current_active ) self.stats.total_wait_time_ms += wait_time * 1000 self._active_operations.add(operation_id) # Execute function if not operation["future"].cancelled(): exec_start = time.time() result = await self._execute_function( operation["func"], *operation["args"], **operation["kwargs"] ) exec_time = time.time() - exec_start operation["future"].set_result(result) async with self._lock: self.stats.total_execution_time_ms += exec_time * 1000 except Exception as e: if not operation["future"].cancelled(): operation["future"].set_exception(e) async with self._lock: self.stats.failed_requests += 1 finally: async with self._lock: self.stats.current_active -= 1 self._active_operations.discard(operation_id) self._queue.task_done() except Exception as e: logger.error(f"Worker {worker_name} error: {e}") async def _execute_function(self, func: Callable, *args, **kwargs) -> Any: """Execute function, handling both sync and async functions.""" if asyncio.iscoroutinefunction(func): return await func(*args, **kwargs) else: # Run sync function in thread pool loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args, **kwargs) def get_stats(self) -> Dict[str, Any]: """Get bulkhead statistics.""" success_rate = ( self.stats.successful_requests / self.stats.total_requests if self.stats.total_requests > 0 else 0 ) avg_wait_time = ( self.stats.total_wait_time_ms / self.stats.total_requests if self.stats.total_requests > 0 else 0 ) avg_exec_time = ( self.stats.total_execution_time_ms / self.stats.successful_requests if self.stats.successful_requests > 0 else 0 ) return { "name": self.name, "type": self.config.bulkhead_type.value, "config": { "max_concurrent": self.config.max_concurrent, "queue_size": self.config.queue_size, "timeout": self.config.timeout }, "stats": { "total_requests": self.stats.total_requests, "successful_requests": self.stats.successful_requests, "failed_requests": self.stats.failed_requests, "rejected_requests": self.stats.rejected_requests, "timeout_requests": self.stats.timeout_requests, "success_rate": success_rate, "current_active": self.stats.current_active, "current_queued": self.stats.current_queued, "max_active_reached": self.stats.max_active_reached, "max_queued_reached": self.stats.max_queued_reached, "avg_wait_time_ms": avg_wait_time, "avg_execution_time_ms": avg_exec_time } } async def shutdown(self): """Shutdown bulkhead and cleanup resources.""" if self.config.bulkhead_type == BulkheadType.QUEUE: # Signal workers to stop for _ in self._workers: await self._queue.put(None) # Wait for workers to finish await asyncio.gather(*self._workers, return_exceptions=True) self._workers.clear() logger.info(f"Bulkhead '{self.name}' shut down") class BulkheadManager: """ Manager for multiple bulkheads. Provides centralized management and monitoring of bulkheads. """ def __init__(self): """Initialize bulkhead manager.""" self._bulkheads: Dict[str, Bulkhead] = {} self._default_configs: Dict[str, BulkheadConfig] = {} def register_default_config( self, resource_type: str, config: BulkheadConfig ): """ Register default configuration for a resource type. Args: resource_type: Resource type name config: Default configuration """ self._default_configs[resource_type] = config logger.info(f"Registered default bulkhead config for '{resource_type}'") def get_bulkhead( self, resource_type: str, config: Optional[BulkheadConfig] = None ) -> Bulkhead: """ Get or create bulkhead for resource type. Args: resource_type: Resource type name config: Configuration (uses default if not provided) Returns: Bulkhead instance """ if resource_type not in self._bulkheads: # Use provided config or default bulkhead_config = ( config or self._default_configs.get(resource_type) or BulkheadConfig() ) self._bulkheads[resource_type] = Bulkhead( resource_type, bulkhead_config ) return self._bulkheads[resource_type] async def execute_with_bulkhead( self, resource_type: str, func: Callable, *args, config: Optional[BulkheadConfig] = None, **kwargs ) -> Any: """ Execute function with bulkhead protection. Args: resource_type: Resource type name func: Function to execute *args: Function arguments config: Optional configuration **kwargs: Function keyword arguments Returns: Function result """ bulkhead = self.get_bulkhead(resource_type, config) return await bulkhead.execute(func, *args, **kwargs) def get_all_stats(self) -> Dict[str, Any]: """Get statistics for all bulkheads.""" return { name: bulkhead.get_stats() for name, bulkhead in self._bulkheads.items() } async def shutdown_all(self): """Shutdown all bulkheads.""" for bulkhead in self._bulkheads.values(): await bulkhead.shutdown() logger.info("All bulkheads shut down") def get_resource_utilization(self) -> Dict[str, Any]: """Get resource utilization across all bulkheads.""" total_capacity = 0 total_active = 0 total_queued = 0 resource_stats = {} for name, bulkhead in self._bulkheads.items(): stats = bulkhead.get_stats() capacity = stats["config"]["max_concurrent"] active = stats["stats"]["current_active"] queued = stats["stats"]["current_queued"] total_capacity += capacity total_active += active total_queued += queued resource_stats[name] = { "utilization": active / capacity if capacity > 0 else 0, "active": active, "capacity": capacity, "queued": queued } overall_utilization = ( total_active / total_capacity if total_capacity > 0 else 0 ) return { "overall_utilization": overall_utilization, "total_capacity": total_capacity, "total_active": total_active, "total_queued": total_queued, "resources": resource_stats } # Global bulkhead manager bulkhead_manager = BulkheadManager() # Pre-configured bulkheads for common resource types def setup_default_bulkheads(): """Setup default bulkhead configurations.""" # Database operations bulkhead_manager.register_default_config( "database", BulkheadConfig( max_concurrent=20, queue_size=100, timeout=30.0, bulkhead_type=BulkheadType.SEMAPHORE ) ) # External API calls bulkhead_manager.register_default_config( "external_api", BulkheadConfig( max_concurrent=10, queue_size=50, timeout=15.0, bulkhead_type=BulkheadType.QUEUE ) ) # LLM operations bulkhead_manager.register_default_config( "llm_operations", BulkheadConfig( max_concurrent=5, queue_size=20, timeout=60.0, bulkhead_type=BulkheadType.QUEUE ) ) # File operations bulkhead_manager.register_default_config( "file_operations", BulkheadConfig( max_concurrent=15, timeout=30.0, bulkhead_type=BulkheadType.SEMAPHORE ) ) # Analytics operations bulkhead_manager.register_default_config( "analytics", BulkheadConfig( max_concurrent=8, queue_size=30, timeout=120.0, bulkhead_type=BulkheadType.QUEUE ) ) # Initialize default configurations setup_default_bulkheads() # Convenience decorator def bulkhead( resource_type: str, config: Optional[BulkheadConfig] = None ): """ Decorator to protect functions with bulkhead. Args: resource_type: Resource type for bulkhead config: Optional configuration """ def decorator(func): async def wrapper(*args, **kwargs): return await bulkhead_manager.execute_with_bulkhead( resource_type, func, *args, config=config, **kwargs ) return wrapper return decorator