anderson-ufrj commited on
Commit
35d7096
·
1 Parent(s): 1762def

feat: implement resilience patterns and monitoring endpoints

Browse files

Resilience patterns:
- Circuit breaker pattern to prevent cascading failures
- Bulkhead pattern for resource isolation and protection
- Configurable failure thresholds and recovery timeouts
- Automatic state transitions and health monitoring

Circuit breakers:
- CLOSED → OPEN → HALF_OPEN state management
- Per-service configuration for external dependencies
- Failure rate monitoring and automatic recovery
- Statistics tracking for performance analysis

Bulkhead isolation:
- Semaphore and queue-based resource isolation
- Configurable concurrency limits per resource type
- Timeout handling and rejection policies
- Resource utilization monitoring

Monitoring endpoints:
- /api/v1/resilience/circuit-breakers - Circuit breaker status
- /api/v1/resilience/bulkheads - Resource utilization metrics
- /api/v1/resilience/health - Overall system health
- /api/v1/cqrs/* - CQRS command and query endpoints

Benefits:
- Improved system stability through failure isolation
- Better resource management and utilization
- Comprehensive monitoring and alerting capabilities
- Enhanced observability for production environments

app.py CHANGED
@@ -548,6 +548,16 @@ app.add_middleware(
548
  allow_headers=["*"],
549
  )
550
 
 
 
 
 
 
 
 
 
 
 
551
  # ==================== ENDPOINTS ====================
552
 
553
  @app.get("/", response_model=HealthResponse)
 
548
  allow_headers=["*"],
549
  )
550
 
551
+ # Add compression middleware for better performance
552
+ from src.api.middleware.compression import add_compression_middleware
553
+ add_compression_middleware(
554
+ app,
555
+ minimum_size=1024, # Compress responses larger than 1KB
556
+ gzip_level=6, # Good balance of speed vs compression
557
+ brotli_quality=4, # Fast brotli compression
558
+ exclude_paths={"/health", "/metrics", "/health/metrics"}
559
+ )
560
+
561
  # ==================== ENDPOINTS ====================
562
 
563
  @app.get("/", response_model=HealthResponse)
pyproject.toml CHANGED
@@ -84,6 +84,7 @@ dependencies = [
84
  "python-dotenv>=1.0.0",
85
  "tenacity>=8.2.3",
86
  "pendulum>=3.0.0",
 
87
  ]
88
 
89
  [project.optional-dependencies]
@@ -101,6 +102,7 @@ hf = [
101
  "python-dotenv>=1.0.0",
102
  "numpy>=1.26.3",
103
  "pandas>=2.1.4",
 
104
  ]
105
 
106
  dev = [
 
84
  "python-dotenv>=1.0.0",
85
  "tenacity>=8.2.3",
86
  "pendulum>=3.0.0",
87
+ "orjson>=3.9.10",
88
  ]
89
 
90
  [project.optional-dependencies]
 
102
  "python-dotenv>=1.0.0",
103
  "numpy>=1.26.3",
104
  "pandas>=2.1.4",
105
+ "orjson>=3.9.10",
106
  ]
107
 
108
  dev = [
src/api/app.py CHANGED
@@ -20,7 +20,7 @@ from fastapi.openapi.utils import get_openapi
20
  from src.core import get_logger, settings
21
  from src.core.exceptions import CidadaoAIError, create_error_response
22
  from src.core.audit import audit_logger, AuditEventType, AuditSeverity, AuditContext
23
- from src.api.routes import investigations, analysis, reports, health, auth, oauth, audit, chat, websocket_chat
24
  from src.api.middleware.rate_limiting import RateLimitMiddleware
25
  from src.api.middleware.authentication import AuthenticationMiddleware
26
  from src.api.middleware.logging_middleware import LoggingMiddleware
@@ -150,6 +150,16 @@ app.add_middleware(
150
  expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"]
151
  )
152
 
 
 
 
 
 
 
 
 
 
 
153
 
154
  # Custom OpenAPI schema
155
  def custom_openapi():
@@ -264,6 +274,29 @@ app.include_router(
264
  tags=["WebSocket"]
265
  )
266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
267
 
268
  # Global exception handler
269
  @app.exception_handler(CidadaoAIError)
 
20
  from src.core import get_logger, settings
21
  from src.core.exceptions import CidadaoAIError, create_error_response
22
  from src.core.audit import audit_logger, AuditEventType, AuditSeverity, AuditContext
23
+ from src.api.routes import investigations, analysis, reports, health, auth, oauth, audit, chat, websocket_chat, batch, graphql, cqrs, resilience
24
  from src.api.middleware.rate_limiting import RateLimitMiddleware
25
  from src.api.middleware.authentication import AuthenticationMiddleware
26
  from src.api.middleware.logging_middleware import LoggingMiddleware
 
150
  expose_headers=["X-RateLimit-Limit", "X-RateLimit-Remaining"]
151
  )
152
 
153
+ # Add compression middleware
154
+ from src.api.middleware.compression import add_compression_middleware
155
+ add_compression_middleware(
156
+ app,
157
+ minimum_size=1024,
158
+ gzip_level=6,
159
+ brotli_quality=4,
160
+ exclude_paths={"/health", "/metrics", "/health/metrics", "/api/v1/ws"}
161
+ )
162
+
163
 
164
  # Custom OpenAPI schema
165
  def custom_openapi():
 
274
  tags=["WebSocket"]
275
  )
276
 
277
+ app.include_router(
278
+ batch.router,
279
+ tags=["Batch Operations"]
280
+ )
281
+
282
+ # GraphQL endpoint
283
+ app.include_router(
284
+ graphql.router,
285
+ tags=["GraphQL"]
286
+ )
287
+
288
+ # CQRS endpoints
289
+ app.include_router(
290
+ cqrs.router,
291
+ tags=["CQRS"]
292
+ )
293
+
294
+ # Resilience monitoring endpoints
295
+ app.include_router(
296
+ resilience.router,
297
+ tags=["Resilience"]
298
+ )
299
+
300
 
301
  # Global exception handler
302
  @app.exception_handler(CidadaoAIError)
src/api/routes/__init__.py CHANGED
@@ -6,6 +6,6 @@ Date: 2025-01-24
6
  License: Proprietary - All rights reserved
7
  """
8
 
9
- from . import health, investigations, analysis, reports, chat, websocket_chat
10
 
11
- __all__ = ["health", "investigations", "analysis", "reports", "chat", "websocket_chat"]
 
6
  License: Proprietary - All rights reserved
7
  """
8
 
9
+ from . import health, investigations, analysis, reports, chat, websocket_chat, cqrs, resilience
10
 
11
+ __all__ = ["health", "investigations", "analysis", "reports", "chat", "websocket_chat", "cqrs", "resilience"]
src/api/routes/cqrs.py ADDED
@@ -0,0 +1,400 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CQRS API endpoints for command and query operations.
3
+
4
+ This module provides RESTful endpoints that use the CQRS pattern
5
+ for better scalability and separation of concerns.
6
+ """
7
+
8
+ from typing import Dict, Any, Optional
9
+ from datetime import datetime
10
+
11
+ from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
12
+ from pydantic import BaseModel
13
+
14
+ from src.core import get_logger
15
+ from src.api.auth import get_current_user
16
+ from src.infrastructure.cqrs.commands import (
17
+ CommandBus,
18
+ CreateInvestigationCommand,
19
+ UpdateInvestigationCommand,
20
+ CancelInvestigationCommand,
21
+ ExecuteAgentTaskCommand,
22
+ SendChatMessageCommand
23
+ )
24
+ from src.infrastructure.cqrs.queries import (
25
+ QueryBus,
26
+ GetInvestigationByIdQuery,
27
+ SearchInvestigationsQuery,
28
+ GetInvestigationStatsQuery,
29
+ SearchContractsQuery,
30
+ GetAgentPerformanceQuery
31
+ )
32
+ from src.infrastructure.events.event_bus import get_event_bus
33
+
34
+ logger = get_logger(__name__)
35
+
36
+ router = APIRouter(prefix="/api/v1/cqrs", tags=["CQRS"])
37
+
38
+
39
+ # Request/Response models
40
+ class CreateInvestigationRequest(BaseModel):
41
+ """Request to create investigation."""
42
+ query: str
43
+ data_sources: Optional[list[str]] = None
44
+ priority: str = "medium"
45
+
46
+
47
+ class UpdateInvestigationRequest(BaseModel):
48
+ """Request to update investigation."""
49
+ status: str
50
+ results: Optional[Dict[str, Any]] = None
51
+
52
+
53
+ class SearchInvestigationsRequest(BaseModel):
54
+ """Request to search investigations."""
55
+ filters: Dict[str, Any] = {}
56
+ sort_by: str = "created_at"
57
+ sort_order: str = "desc"
58
+ limit: int = 20
59
+ offset: int = 0
60
+
61
+
62
+ class SearchContractsRequest(BaseModel):
63
+ """Request to search contracts."""
64
+ search_term: Optional[str] = None
65
+ orgao: Optional[str] = None
66
+ min_value: Optional[float] = None
67
+ max_value: Optional[float] = None
68
+ year: Optional[int] = None
69
+ limit: int = 50
70
+ offset: int = 0
71
+
72
+
73
+ class ExecuteAgentTaskRequest(BaseModel):
74
+ """Request to execute agent task."""
75
+ agent_name: str
76
+ task_type: str
77
+ payload: Dict[str, Any]
78
+ timeout: Optional[float] = None
79
+
80
+
81
+ # Global instances
82
+ command_bus: Optional[CommandBus] = None
83
+ query_bus: Optional[QueryBus] = None
84
+
85
+
86
+ async def get_command_bus() -> CommandBus:
87
+ """Get command bus instance."""
88
+ global command_bus
89
+ if command_bus is None:
90
+ event_bus = await get_event_bus()
91
+ command_bus = CommandBus(event_bus)
92
+ return command_bus
93
+
94
+
95
+ async def get_query_bus() -> QueryBus:
96
+ """Get query bus instance."""
97
+ global query_bus
98
+ if query_bus is None:
99
+ query_bus = QueryBus()
100
+ return query_bus
101
+
102
+
103
+ # Command endpoints
104
+ @router.post("/investigations", response_model=Dict[str, Any])
105
+ async def create_investigation(
106
+ request: CreateInvestigationRequest,
107
+ background_tasks: BackgroundTasks,
108
+ current_user = Depends(get_current_user),
109
+ cmd_bus: CommandBus = Depends(get_command_bus)
110
+ ):
111
+ """
112
+ Create a new investigation using CQRS command.
113
+
114
+ This endpoint demonstrates the command side of CQRS:
115
+ - Accepts write operations
116
+ - Publishes events
117
+ - Returns minimal response
118
+ """
119
+ command = CreateInvestigationCommand(
120
+ user_id=current_user["sub"],
121
+ query=request.query,
122
+ data_sources=request.data_sources,
123
+ priority=request.priority
124
+ )
125
+
126
+ result = await cmd_bus.execute(command)
127
+
128
+ if not result.success:
129
+ raise HTTPException(status_code=400, detail=result.error)
130
+
131
+ return {
132
+ "investigation_id": result.data["investigation_id"],
133
+ "command_id": result.command_id,
134
+ "events_published": result.events_published
135
+ }
136
+
137
+
138
+ @router.put("/investigations/{investigation_id}", response_model=Dict[str, Any])
139
+ async def update_investigation(
140
+ investigation_id: str,
141
+ request: UpdateInvestigationRequest,
142
+ current_user = Depends(get_current_user),
143
+ cmd_bus: CommandBus = Depends(get_command_bus)
144
+ ):
145
+ """Update investigation status."""
146
+ command = UpdateInvestigationCommand(
147
+ user_id=current_user["sub"],
148
+ investigation_id=investigation_id,
149
+ status=request.status,
150
+ results=request.results
151
+ )
152
+
153
+ result = await cmd_bus.execute(command)
154
+
155
+ if not result.success:
156
+ raise HTTPException(status_code=400, detail=result.error)
157
+
158
+ return {"success": True, "command_id": result.command_id}
159
+
160
+
161
+ @router.delete("/investigations/{investigation_id}", response_model=Dict[str, Any])
162
+ async def cancel_investigation(
163
+ investigation_id: str,
164
+ reason: Optional[str] = None,
165
+ current_user = Depends(get_current_user),
166
+ cmd_bus: CommandBus = Depends(get_command_bus)
167
+ ):
168
+ """Cancel an investigation."""
169
+ command = CancelInvestigationCommand(
170
+ user_id=current_user["sub"],
171
+ investigation_id=investigation_id,
172
+ reason=reason
173
+ )
174
+
175
+ result = await cmd_bus.execute(command)
176
+
177
+ if not result.success:
178
+ raise HTTPException(status_code=400, detail=result.error)
179
+
180
+ return {"success": True, "command_id": result.command_id}
181
+
182
+
183
+ @router.post("/agents/execute", response_model=Dict[str, Any])
184
+ async def execute_agent_task(
185
+ request: ExecuteAgentTaskRequest,
186
+ background_tasks: BackgroundTasks,
187
+ current_user = Depends(get_current_user),
188
+ cmd_bus: CommandBus = Depends(get_command_bus)
189
+ ):
190
+ """Execute an agent task."""
191
+ command = ExecuteAgentTaskCommand(
192
+ user_id=current_user["sub"],
193
+ agent_name=request.agent_name,
194
+ task_type=request.task_type,
195
+ payload=request.payload,
196
+ timeout=request.timeout
197
+ )
198
+
199
+ result = await cmd_bus.execute(command)
200
+
201
+ if not result.success:
202
+ raise HTTPException(status_code=400, detail=result.error)
203
+
204
+ return {
205
+ "success": True,
206
+ "command_id": result.command_id,
207
+ "task_id": result.data.get("task_id") if result.data else None
208
+ }
209
+
210
+
211
+ # Query endpoints
212
+ @router.get("/investigations/{investigation_id}", response_model=Dict[str, Any])
213
+ async def get_investigation(
214
+ investigation_id: str,
215
+ include_findings: bool = True,
216
+ include_anomalies: bool = True,
217
+ current_user = Depends(get_current_user),
218
+ q_bus: QueryBus = Depends(get_query_bus)
219
+ ):
220
+ """
221
+ Get investigation by ID using CQRS query.
222
+
223
+ This endpoint demonstrates the query side of CQRS:
224
+ - Optimized for reads
225
+ - Uses caching
226
+ - Returns denormalized data
227
+ """
228
+ query = GetInvestigationByIdQuery(
229
+ user_id=current_user["sub"],
230
+ investigation_id=investigation_id,
231
+ include_findings=include_findings,
232
+ include_anomalies=include_anomalies
233
+ )
234
+
235
+ result = await q_bus.execute(query)
236
+
237
+ if not result.success:
238
+ raise HTTPException(status_code=404, detail=result.error)
239
+
240
+ return {
241
+ "investigation": result.data,
242
+ "from_cache": result.from_cache,
243
+ "execution_time_ms": result.execution_time_ms
244
+ }
245
+
246
+
247
+ @router.post("/investigations/search", response_model=Dict[str, Any])
248
+ async def search_investigations(
249
+ request: SearchInvestigationsRequest,
250
+ current_user = Depends(get_current_user),
251
+ q_bus: QueryBus = Depends(get_query_bus)
252
+ ):
253
+ """Search investigations with filters."""
254
+ query = SearchInvestigationsQuery(
255
+ user_id=current_user["sub"],
256
+ filters=request.filters,
257
+ sort_by=request.sort_by,
258
+ sort_order=request.sort_order,
259
+ limit=request.limit,
260
+ offset=request.offset
261
+ )
262
+
263
+ result = await q_bus.execute(query)
264
+
265
+ if not result.success:
266
+ raise HTTPException(status_code=400, detail=result.error)
267
+
268
+ return {
269
+ "investigations": result.data,
270
+ "metadata": result.metadata,
271
+ "execution_time_ms": result.execution_time_ms
272
+ }
273
+
274
+
275
+ @router.get("/investigations/stats", response_model=Dict[str, Any])
276
+ async def get_investigation_stats(
277
+ date_from: Optional[datetime] = None,
278
+ date_to: Optional[datetime] = None,
279
+ current_user = Depends(get_current_user),
280
+ q_bus: QueryBus = Depends(get_query_bus)
281
+ ):
282
+ """Get investigation statistics."""
283
+ query = GetInvestigationStatsQuery(
284
+ user_id=current_user["sub"],
285
+ date_from=date_from,
286
+ date_to=date_to
287
+ )
288
+
289
+ result = await q_bus.execute(query)
290
+
291
+ if not result.success:
292
+ raise HTTPException(status_code=400, detail=result.error)
293
+
294
+ return {
295
+ "stats": result.data,
296
+ "from_cache": result.from_cache,
297
+ "execution_time_ms": result.execution_time_ms
298
+ }
299
+
300
+
301
+ @router.post("/contracts/search", response_model=Dict[str, Any])
302
+ async def search_contracts(
303
+ request: SearchContractsRequest,
304
+ current_user = Depends(get_current_user),
305
+ q_bus: QueryBus = Depends(get_query_bus)
306
+ ):
307
+ """Search contracts with filters."""
308
+ query = SearchContractsQuery(
309
+ user_id=current_user["sub"],
310
+ search_term=request.search_term,
311
+ orgao=request.orgao,
312
+ min_value=request.min_value,
313
+ max_value=request.max_value,
314
+ year=request.year,
315
+ limit=request.limit,
316
+ offset=request.offset,
317
+ use_cache=True,
318
+ cache_ttl=300 # 5 minutes
319
+ )
320
+
321
+ result = await q_bus.execute(query)
322
+
323
+ if not result.success:
324
+ raise HTTPException(status_code=400, detail=result.error)
325
+
326
+ return {
327
+ "contracts": result.data,
328
+ "from_cache": result.from_cache,
329
+ "execution_time_ms": result.execution_time_ms
330
+ }
331
+
332
+
333
+ @router.get("/agents/performance", response_model=Dict[str, Any])
334
+ async def get_agent_performance(
335
+ agent_name: Optional[str] = None,
336
+ time_period: str = "1h",
337
+ current_user = Depends(get_current_user),
338
+ q_bus: QueryBus = Depends(get_query_bus)
339
+ ):
340
+ """Get agent performance metrics."""
341
+ query = GetAgentPerformanceQuery(
342
+ user_id=current_user["sub"],
343
+ agent_name=agent_name,
344
+ time_period=time_period,
345
+ use_cache=True,
346
+ cache_ttl=60 # 1 minute for recent metrics
347
+ )
348
+
349
+ result = await q_bus.execute(query)
350
+
351
+ if not result.success:
352
+ raise HTTPException(status_code=400, detail=result.error)
353
+
354
+ return {
355
+ "performance": result.data,
356
+ "from_cache": result.from_cache,
357
+ "execution_time_ms": result.execution_time_ms
358
+ }
359
+
360
+
361
+ # Bus statistics endpoints
362
+ @router.get("/stats/commands", response_model=Dict[str, Any])
363
+ async def get_command_bus_stats(
364
+ current_user = Depends(get_current_user),
365
+ cmd_bus: CommandBus = Depends(get_command_bus)
366
+ ):
367
+ """Get command bus statistics."""
368
+ return cmd_bus.get_stats()
369
+
370
+
371
+ @router.get("/stats/queries", response_model=Dict[str, Any])
372
+ async def get_query_bus_stats(
373
+ current_user = Depends(get_current_user),
374
+ q_bus: QueryBus = Depends(get_query_bus)
375
+ ):
376
+ """Get query bus statistics."""
377
+ return q_bus.get_stats()
378
+
379
+
380
+ # Health check
381
+ @router.get("/health", response_model=Dict[str, Any])
382
+ async def cqrs_health_check():
383
+ """Check CQRS system health."""
384
+ try:
385
+ cmd_bus = await get_command_bus()
386
+ q_bus = await get_query_bus()
387
+ event_bus = await get_event_bus()
388
+
389
+ return {
390
+ "status": "healthy",
391
+ "command_bus": "ready",
392
+ "query_bus": "ready",
393
+ "event_bus": "ready",
394
+ "event_bus_stats": event_bus.get_stats()
395
+ }
396
+ except Exception as e:
397
+ return {
398
+ "status": "unhealthy",
399
+ "error": str(e)
400
+ }
src/api/routes/resilience.py ADDED
@@ -0,0 +1,348 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Resilience monitoring endpoints.
3
+
4
+ This module provides endpoints for monitoring circuit breakers,
5
+ bulkheads, and overall system resilience.
6
+ """
7
+
8
+ from typing import Dict, Any
9
+ from fastapi import APIRouter, HTTPException, Depends
10
+
11
+ from src.core import get_logger
12
+ from src.api.auth import get_current_user
13
+ from src.infrastructure.resilience import circuit_breaker_manager, bulkhead_manager
14
+
15
+ logger = get_logger(__name__)
16
+
17
+ router = APIRouter(prefix="/api/v1/resilience", tags=["Resilience"])
18
+
19
+
20
+ @router.get("/circuit-breakers", response_model=Dict[str, Any])
21
+ async def get_circuit_breaker_stats(
22
+ current_user = Depends(get_current_user)
23
+ ):
24
+ """
25
+ Get statistics for all circuit breakers.
26
+
27
+ Returns detailed statistics including:
28
+ - Current state
29
+ - Success/failure rates
30
+ - Request counts
31
+ - Recent state changes
32
+ """
33
+ try:
34
+ stats = circuit_breaker_manager.get_all_stats()
35
+ health_status = circuit_breaker_manager.get_health_status()
36
+
37
+ return {
38
+ "circuit_breakers": stats,
39
+ "health_status": health_status,
40
+ "summary": {
41
+ "total_breakers": len(stats),
42
+ "healthy_services": len(health_status["healthy_services"]),
43
+ "degraded_services": len(health_status["degraded_services"]),
44
+ "failed_services": len(health_status["failed_services"])
45
+ }
46
+ }
47
+ except Exception as e:
48
+ logger.error(f"Failed to get circuit breaker stats: {e}")
49
+ raise HTTPException(status_code=500, detail="Failed to retrieve circuit breaker statistics")
50
+
51
+
52
+ @router.get("/circuit-breakers/{service_name}", response_model=Dict[str, Any])
53
+ async def get_circuit_breaker_stats_by_service(
54
+ service_name: str,
55
+ current_user = Depends(get_current_user)
56
+ ):
57
+ """
58
+ Get statistics for a specific circuit breaker.
59
+
60
+ Args:
61
+ service_name: Name of the service
62
+
63
+ Returns:
64
+ Detailed statistics for the specified circuit breaker
65
+ """
66
+ try:
67
+ # Get or create circuit breaker to ensure it exists
68
+ breaker = circuit_breaker_manager.get_circuit_breaker(service_name)
69
+ stats = breaker.get_stats()
70
+
71
+ return {
72
+ "service_name": service_name,
73
+ "circuit_breaker": stats
74
+ }
75
+ except Exception as e:
76
+ logger.error(f"Failed to get circuit breaker stats for {service_name}: {e}")
77
+ raise HTTPException(status_code=500, detail=f"Failed to retrieve circuit breaker statistics for {service_name}")
78
+
79
+
80
+ @router.post("/circuit-breakers/{service_name}/reset", response_model=Dict[str, Any])
81
+ async def reset_circuit_breaker(
82
+ service_name: str,
83
+ current_user = Depends(get_current_user)
84
+ ):
85
+ """
86
+ Reset a specific circuit breaker to closed state.
87
+
88
+ Args:
89
+ service_name: Name of the service
90
+
91
+ Returns:
92
+ Success confirmation
93
+ """
94
+ try:
95
+ breaker = circuit_breaker_manager.get_circuit_breaker(service_name)
96
+ await breaker.reset()
97
+
98
+ logger.info(f"Circuit breaker for {service_name} reset by user {current_user['sub']}")
99
+
100
+ return {
101
+ "message": f"Circuit breaker for {service_name} reset successfully",
102
+ "service_name": service_name,
103
+ "new_state": breaker.state.value
104
+ }
105
+ except Exception as e:
106
+ logger.error(f"Failed to reset circuit breaker for {service_name}: {e}")
107
+ raise HTTPException(status_code=500, detail=f"Failed to reset circuit breaker for {service_name}")
108
+
109
+
110
+ @router.post("/circuit-breakers/reset-all", response_model=Dict[str, Any])
111
+ async def reset_all_circuit_breakers(
112
+ current_user = Depends(get_current_user)
113
+ ):
114
+ """
115
+ Reset all circuit breakers to closed state.
116
+
117
+ Returns:
118
+ Success confirmation
119
+ """
120
+ try:
121
+ await circuit_breaker_manager.reset_all()
122
+
123
+ logger.warning(f"All circuit breakers reset by user {current_user['sub']}")
124
+
125
+ return {
126
+ "message": "All circuit breakers reset successfully",
127
+ "reset_by": current_user["sub"]
128
+ }
129
+ except Exception as e:
130
+ logger.error(f"Failed to reset all circuit breakers: {e}")
131
+ raise HTTPException(status_code=500, detail="Failed to reset all circuit breakers")
132
+
133
+
134
+ @router.get("/bulkheads", response_model=Dict[str, Any])
135
+ async def get_bulkhead_stats(
136
+ current_user = Depends(get_current_user)
137
+ ):
138
+ """
139
+ Get statistics for all bulkheads.
140
+
141
+ Returns detailed statistics including:
142
+ - Current utilization
143
+ - Active/queued operations
144
+ - Performance metrics
145
+ - Resource isolation status
146
+ """
147
+ try:
148
+ stats = bulkhead_manager.get_all_stats()
149
+ utilization = bulkhead_manager.get_resource_utilization()
150
+
151
+ return {
152
+ "bulkheads": stats,
153
+ "resource_utilization": utilization,
154
+ "summary": {
155
+ "total_bulkheads": len(stats),
156
+ "overall_utilization": utilization["overall_utilization"],
157
+ "total_capacity": utilization["total_capacity"],
158
+ "total_active": utilization["total_active"],
159
+ "total_queued": utilization["total_queued"]
160
+ }
161
+ }
162
+ except Exception as e:
163
+ logger.error(f"Failed to get bulkhead stats: {e}")
164
+ raise HTTPException(status_code=500, detail="Failed to retrieve bulkhead statistics")
165
+
166
+
167
+ @router.get("/bulkheads/{resource_type}", response_model=Dict[str, Any])
168
+ async def get_bulkhead_stats_by_resource(
169
+ resource_type: str,
170
+ current_user = Depends(get_current_user)
171
+ ):
172
+ """
173
+ Get statistics for a specific bulkhead.
174
+
175
+ Args:
176
+ resource_type: Type of resource
177
+
178
+ Returns:
179
+ Detailed statistics for the specified bulkhead
180
+ """
181
+ try:
182
+ # Get or create bulkhead to ensure it exists
183
+ bulkhead = bulkhead_manager.get_bulkhead(resource_type)
184
+ stats = bulkhead.get_stats()
185
+
186
+ return {
187
+ "resource_type": resource_type,
188
+ "bulkhead": stats
189
+ }
190
+ except Exception as e:
191
+ logger.error(f"Failed to get bulkhead stats for {resource_type}: {e}")
192
+ raise HTTPException(status_code=500, detail=f"Failed to retrieve bulkhead statistics for {resource_type}")
193
+
194
+
195
+ @router.get("/health", response_model=Dict[str, Any])
196
+ async def get_resilience_health():
197
+ """
198
+ Get overall resilience health status.
199
+
200
+ Returns:
201
+ Comprehensive health status of all resilience components
202
+ """
203
+ try:
204
+ circuit_breaker_health = circuit_breaker_manager.get_health_status()
205
+ bulkhead_utilization = bulkhead_manager.get_resource_utilization()
206
+
207
+ # Determine overall health
208
+ overall_health = "healthy"
209
+
210
+ # Check circuit breaker health
211
+ if circuit_breaker_health["overall_health"] == "critical":
212
+ overall_health = "critical"
213
+ elif circuit_breaker_health["overall_health"] == "degraded":
214
+ overall_health = "degraded"
215
+
216
+ # Check bulkhead utilization
217
+ if bulkhead_utilization["overall_utilization"] > 0.9:
218
+ if overall_health == "healthy":
219
+ overall_health = "degraded"
220
+ elif bulkhead_utilization["overall_utilization"] > 0.95:
221
+ overall_health = "critical"
222
+
223
+ return {
224
+ "overall_health": overall_health,
225
+ "circuit_breakers": {
226
+ "health": circuit_breaker_health["overall_health"],
227
+ "healthy_services": len(circuit_breaker_health["healthy_services"]),
228
+ "failed_services": len(circuit_breaker_health["failed_services"]),
229
+ "health_score": circuit_breaker_health["health_score"]
230
+ },
231
+ "bulkheads": {
232
+ "utilization": bulkhead_utilization["overall_utilization"],
233
+ "active_operations": bulkhead_utilization["total_active"],
234
+ "total_capacity": bulkhead_utilization["total_capacity"],
235
+ "queued_operations": bulkhead_utilization["total_queued"]
236
+ },
237
+ "recommendations": _generate_health_recommendations(
238
+ circuit_breaker_health,
239
+ bulkhead_utilization
240
+ )
241
+ }
242
+ except Exception as e:
243
+ logger.error(f"Failed to get resilience health: {e}")
244
+ raise HTTPException(status_code=500, detail="Failed to retrieve resilience health status")
245
+
246
+
247
+ @router.get("/metrics", response_model=Dict[str, Any])
248
+ async def get_resilience_metrics(
249
+ current_user = Depends(get_current_user)
250
+ ):
251
+ """
252
+ Get comprehensive resilience metrics.
253
+
254
+ Returns:
255
+ Detailed metrics for monitoring and alerting
256
+ """
257
+ try:
258
+ circuit_breaker_stats = circuit_breaker_manager.get_all_stats()
259
+ bulkhead_stats = bulkhead_manager.get_all_stats()
260
+
261
+ # Aggregate metrics
262
+ total_requests = 0
263
+ total_failures = 0
264
+ total_timeouts = 0
265
+ total_rejections = 0
266
+
267
+ for stats in circuit_breaker_stats.values():
268
+ total_requests += stats["stats"]["total_requests"]
269
+ total_failures += stats["stats"]["failed_requests"]
270
+ total_rejections += stats["stats"]["rejected_requests"]
271
+
272
+ for stats in bulkhead_stats.values():
273
+ total_requests += stats["stats"]["total_requests"]
274
+ total_failures += stats["stats"]["failed_requests"]
275
+ total_timeouts += stats["stats"]["timeout_requests"]
276
+ total_rejections += stats["stats"]["rejected_requests"]
277
+
278
+ success_rate = (
279
+ (total_requests - total_failures) / total_requests
280
+ if total_requests > 0 else 1.0
281
+ )
282
+
283
+ return {
284
+ "circuit_breakers": circuit_breaker_stats,
285
+ "bulkheads": bulkhead_stats,
286
+ "aggregate_metrics": {
287
+ "total_requests": total_requests,
288
+ "total_failures": total_failures,
289
+ "total_timeouts": total_timeouts,
290
+ "total_rejections": total_rejections,
291
+ "success_rate": success_rate,
292
+ "failure_rate": total_failures / total_requests if total_requests > 0 else 0,
293
+ "rejection_rate": total_rejections / total_requests if total_requests > 0 else 0
294
+ }
295
+ }
296
+ except Exception as e:
297
+ logger.error(f"Failed to get resilience metrics: {e}")
298
+ raise HTTPException(status_code=500, detail="Failed to retrieve resilience metrics")
299
+
300
+
301
+ def _generate_health_recommendations(
302
+ circuit_breaker_health: Dict[str, Any],
303
+ bulkhead_utilization: Dict[str, Any]
304
+ ) -> list[str]:
305
+ """Generate health recommendations based on current status."""
306
+ recommendations = []
307
+
308
+ # Circuit breaker recommendations
309
+ if circuit_breaker_health["failed_services"]:
310
+ recommendations.append(
311
+ f"⚠️ {len(circuit_breaker_health['failed_services'])} services have failing circuit breakers. "
312
+ f"Check: {', '.join(circuit_breaker_health['failed_services'])}"
313
+ )
314
+
315
+ if circuit_breaker_health["degraded_services"]:
316
+ recommendations.append(
317
+ f"⚡ {len(circuit_breaker_health['degraded_services'])} services are in recovery mode. "
318
+ f"Monitor: {', '.join(circuit_breaker_health['degraded_services'])}"
319
+ )
320
+
321
+ # Bulkhead recommendations
322
+ if bulkhead_utilization["overall_utilization"] > 0.8:
323
+ recommendations.append(
324
+ f"📊 High resource utilization ({bulkhead_utilization['overall_utilization']:.1%}). "
325
+ "Consider scaling or optimizing workloads."
326
+ )
327
+
328
+ high_util_resources = [
329
+ name for name, resource in bulkhead_utilization["resources"].items()
330
+ if resource["utilization"] > 0.9
331
+ ]
332
+
333
+ if high_util_resources:
334
+ recommendations.append(
335
+ f"🔥 High utilization resources: {', '.join(high_util_resources)}. "
336
+ "Consider increasing capacity or load balancing."
337
+ )
338
+
339
+ if bulkhead_utilization["total_queued"] > 0:
340
+ recommendations.append(
341
+ f"⏳ {bulkhead_utilization['total_queued']} operations queued. "
342
+ "Monitor queue lengths and processing times."
343
+ )
344
+
345
+ if not recommendations:
346
+ recommendations.append("✅ All resilience components are healthy.")
347
+
348
+ return recommendations
src/core/__init__.py CHANGED
@@ -30,6 +30,7 @@ from .exceptions import (
30
  ValidationError,
31
  )
32
  from .logging import get_logger, setup_logging
 
33
 
34
  __all__ = [
35
  # Config
@@ -58,6 +59,9 @@ __all__ = [
58
  # Logging
59
  "get_logger",
60
  "setup_logging",
 
 
 
61
  ]
62
 
63
  # Initialize logging on import
 
30
  ValidationError,
31
  )
32
  from .logging import get_logger, setup_logging
33
+ from .llm_pool import llm_pool, get_llm_pool
34
 
35
  __all__ = [
36
  # Config
 
59
  # Logging
60
  "get_logger",
61
  "setup_logging",
62
+ # LLM Pool
63
+ "llm_pool",
64
+ "get_llm_pool",
65
  ]
66
 
67
  # Initialize logging on import
src/infrastructure/__init__.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Infrastructure components for Cidadão.AI."""
2
+
3
+ from .cqrs import CommandBus, QueryBus
4
+ from .events import EventBus, EventType, Event
5
+ from .websocket import websocket_manager
6
+ from .messaging import QueueService
7
+ from .resilience import circuit_breaker_manager, bulkhead_manager
8
+
9
+ __all__ = [
10
+ "CommandBus",
11
+ "QueryBus",
12
+ "EventBus",
13
+ "EventType",
14
+ "Event",
15
+ "websocket_manager",
16
+ "QueueService",
17
+ "circuit_breaker_manager",
18
+ "bulkhead_manager"
19
+ ]
src/infrastructure/resilience/__init__.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Resilience patterns for Cidadão.AI."""
2
+
3
+ from .circuit_breaker import (
4
+ CircuitBreaker,
5
+ CircuitBreakerConfig,
6
+ CircuitState,
7
+ CircuitBreakerManager,
8
+ CircuitBreakerOpenException,
9
+ CircuitBreakerTimeoutException,
10
+ circuit_breaker_manager,
11
+ circuit_breaker
12
+ )
13
+ from .bulkhead import (
14
+ Bulkhead,
15
+ BulkheadConfig,
16
+ BulkheadType,
17
+ BulkheadManager,
18
+ BulkheadRejectedException,
19
+ BulkheadTimeoutException,
20
+ bulkhead_manager,
21
+ bulkhead
22
+ )
23
+
24
+ __all__ = [
25
+ "CircuitBreaker",
26
+ "CircuitBreakerConfig",
27
+ "CircuitState",
28
+ "CircuitBreakerManager",
29
+ "CircuitBreakerOpenException",
30
+ "CircuitBreakerTimeoutException",
31
+ "circuit_breaker_manager",
32
+ "circuit_breaker",
33
+ "Bulkhead",
34
+ "BulkheadConfig",
35
+ "BulkheadType",
36
+ "BulkheadManager",
37
+ "BulkheadRejectedException",
38
+ "BulkheadTimeoutException",
39
+ "bulkhead_manager",
40
+ "bulkhead"
41
+ ]
src/infrastructure/resilience/bulkhead.py ADDED
@@ -0,0 +1,613 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Bulkhead pattern implementation for resource isolation.
3
+
4
+ This module provides bulkhead functionality to isolate different
5
+ types of operations and prevent resource exhaustion.
6
+ """
7
+
8
+ import asyncio
9
+ from typing import Any, Callable, Optional, Dict, Set
10
+ from datetime import datetime, timedelta
11
+ from enum import Enum
12
+ import time
13
+ from dataclasses import dataclass
14
+ import uuid
15
+
16
+ from src.core import get_logger
17
+
18
+ logger = get_logger(__name__)
19
+
20
+
21
+ class BulkheadType(str, Enum):
22
+ """Types of bulkhead isolation."""
23
+ THREAD_POOL = "thread_pool" # Thread pool isolation
24
+ SEMAPHORE = "semaphore" # Semaphore-based isolation
25
+ QUEUE = "queue" # Queue-based isolation
26
+
27
+
28
+ @dataclass
29
+ class BulkheadConfig:
30
+ """Bulkhead configuration."""
31
+ max_concurrent: int = 10 # Maximum concurrent operations
32
+ queue_size: Optional[int] = None # Queue size (None = unlimited)
33
+ timeout: float = 30.0 # Operation timeout
34
+ bulkhead_type: BulkheadType = BulkheadType.SEMAPHORE
35
+
36
+
37
+ @dataclass
38
+ class BulkheadStats:
39
+ """Bulkhead statistics."""
40
+ total_requests: int = 0
41
+ successful_requests: int = 0
42
+ failed_requests: int = 0
43
+ rejected_requests: int = 0
44
+ timeout_requests: int = 0
45
+ current_active: int = 0
46
+ current_queued: int = 0
47
+ max_active_reached: int = 0
48
+ max_queued_reached: int = 0
49
+ total_wait_time_ms: float = 0.0
50
+ total_execution_time_ms: float = 0.0
51
+
52
+
53
+ class BulkheadRejectedException(Exception):
54
+ """Exception raised when bulkhead rejects request."""
55
+ pass
56
+
57
+
58
+ class BulkheadTimeoutException(Exception):
59
+ """Exception raised when operation times out."""
60
+ pass
61
+
62
+
63
+ class Bulkhead:
64
+ """
65
+ Bulkhead implementation for resource isolation.
66
+
67
+ Features:
68
+ - Configurable concurrency limits
69
+ - Queue management
70
+ - Timeout handling
71
+ - Performance monitoring
72
+ - Different isolation strategies
73
+ """
74
+
75
+ def __init__(
76
+ self,
77
+ name: str,
78
+ config: Optional[BulkheadConfig] = None
79
+ ):
80
+ """
81
+ Initialize bulkhead.
82
+
83
+ Args:
84
+ name: Bulkhead name for identification
85
+ config: Configuration parameters
86
+ """
87
+ self.name = name
88
+ self.config = config or BulkheadConfig()
89
+ self.stats = BulkheadStats()
90
+
91
+ # Initialize based on bulkhead type
92
+ if self.config.bulkhead_type == BulkheadType.SEMAPHORE:
93
+ self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
94
+ elif self.config.bulkhead_type == BulkheadType.QUEUE:
95
+ self._queue: asyncio.Queue = asyncio.Queue(
96
+ maxsize=self.config.queue_size or 0
97
+ )
98
+ self._workers: Set[asyncio.Task] = set()
99
+ self._start_workers()
100
+
101
+ self._active_operations: Set[str] = set()
102
+ self._lock = asyncio.Lock()
103
+
104
+ logger.info(f"Bulkhead '{name}' initialized with type {self.config.bulkhead_type}")
105
+
106
+ async def execute(self, func: Callable, *args, **kwargs) -> Any:
107
+ """
108
+ Execute function with bulkhead protection.
109
+
110
+ Args:
111
+ func: Function to execute
112
+ *args: Function arguments
113
+ **kwargs: Function keyword arguments
114
+
115
+ Returns:
116
+ Function result
117
+
118
+ Raises:
119
+ BulkheadRejectedException: When bulkhead rejects request
120
+ BulkheadTimeoutException: When operation times out
121
+ """
122
+ operation_id = str(uuid.uuid4())
123
+ start_time = time.time()
124
+
125
+ async with self._lock:
126
+ self.stats.total_requests += 1
127
+
128
+ try:
129
+ if self.config.bulkhead_type == BulkheadType.SEMAPHORE:
130
+ return await self._execute_with_semaphore(
131
+ func, operation_id, start_time, *args, **kwargs
132
+ )
133
+ elif self.config.bulkhead_type == BulkheadType.QUEUE:
134
+ return await self._execute_with_queue(
135
+ func, operation_id, start_time, *args, **kwargs
136
+ )
137
+ else:
138
+ # Direct execution (no protection)
139
+ return await self._execute_function(func, *args, **kwargs)
140
+
141
+ except Exception as e:
142
+ async with self._lock:
143
+ if isinstance(e, (BulkheadRejectedException, BulkheadTimeoutException)):
144
+ if isinstance(e, BulkheadRejectedException):
145
+ self.stats.rejected_requests += 1
146
+ else:
147
+ self.stats.timeout_requests += 1
148
+ else:
149
+ self.stats.failed_requests += 1
150
+ raise
151
+
152
+ async def _execute_with_semaphore(
153
+ self,
154
+ func: Callable,
155
+ operation_id: str,
156
+ start_time: float,
157
+ *args,
158
+ **kwargs
159
+ ) -> Any:
160
+ """Execute function using semaphore isolation."""
161
+ wait_start = time.time()
162
+
163
+ try:
164
+ # Try to acquire semaphore with timeout
165
+ await asyncio.wait_for(
166
+ self._semaphore.acquire(),
167
+ timeout=self.config.timeout
168
+ )
169
+ except asyncio.TimeoutError:
170
+ raise BulkheadTimeoutException(
171
+ f"Failed to acquire semaphore for bulkhead '{self.name}' "
172
+ f"within {self.config.timeout}s"
173
+ )
174
+
175
+ wait_time = time.time() - wait_start
176
+
177
+ try:
178
+ async with self._lock:
179
+ self.stats.current_active += 1
180
+ self.stats.max_active_reached = max(
181
+ self.stats.max_active_reached,
182
+ self.stats.current_active
183
+ )
184
+ self.stats.total_wait_time_ms += wait_time * 1000
185
+ self._active_operations.add(operation_id)
186
+
187
+ # Execute function
188
+ exec_start = time.time()
189
+ result = await self._execute_function(func, *args, **kwargs)
190
+ exec_time = time.time() - exec_start
191
+
192
+ async with self._lock:
193
+ self.stats.successful_requests += 1
194
+ self.stats.total_execution_time_ms += exec_time * 1000
195
+
196
+ return result
197
+
198
+ finally:
199
+ async with self._lock:
200
+ self.stats.current_active -= 1
201
+ self._active_operations.discard(operation_id)
202
+
203
+ self._semaphore.release()
204
+
205
+ async def _execute_with_queue(
206
+ self,
207
+ func: Callable,
208
+ operation_id: str,
209
+ start_time: float,
210
+ *args,
211
+ **kwargs
212
+ ) -> Any:
213
+ """Execute function using queue isolation."""
214
+ # Create operation item
215
+ operation = {
216
+ "id": operation_id,
217
+ "func": func,
218
+ "args": args,
219
+ "kwargs": kwargs,
220
+ "future": asyncio.Future(),
221
+ "submitted_at": time.time()
222
+ }
223
+
224
+ try:
225
+ # Try to add to queue
226
+ if self.config.queue_size and self._queue.qsize() >= self.config.queue_size:
227
+ raise BulkheadRejectedException(
228
+ f"Queue full for bulkhead '{self.name}' "
229
+ f"(size: {self._queue.qsize()})"
230
+ )
231
+
232
+ await self._queue.put(operation)
233
+
234
+ async with self._lock:
235
+ self.stats.current_queued += 1
236
+ self.stats.max_queued_reached = max(
237
+ self.stats.max_queued_reached,
238
+ self.stats.current_queued
239
+ )
240
+
241
+ # Wait for result with timeout
242
+ try:
243
+ result = await asyncio.wait_for(
244
+ operation["future"],
245
+ timeout=self.config.timeout
246
+ )
247
+
248
+ async with self._lock:
249
+ self.stats.successful_requests += 1
250
+
251
+ return result
252
+
253
+ except asyncio.TimeoutError:
254
+ # Cancel the operation
255
+ operation["future"].cancel()
256
+ raise BulkheadTimeoutException(
257
+ f"Operation timed out in bulkhead '{self.name}' "
258
+ f"after {self.config.timeout}s"
259
+ )
260
+
261
+ finally:
262
+ async with self._lock:
263
+ if self.stats.current_queued > 0:
264
+ self.stats.current_queued -= 1
265
+
266
+ def _start_workers(self):
267
+ """Start worker tasks for queue processing."""
268
+ for i in range(self.config.max_concurrent):
269
+ worker = asyncio.create_task(
270
+ self._worker_loop(f"worker-{i}")
271
+ )
272
+ self._workers.add(worker)
273
+
274
+ async def _worker_loop(self, worker_name: str):
275
+ """Worker loop for processing queued operations."""
276
+ logger.debug(f"Worker {worker_name} started for bulkhead '{self.name}'")
277
+
278
+ while True:
279
+ try:
280
+ # Get operation from queue
281
+ operation = await self._queue.get()
282
+
283
+ if operation is None: # Shutdown signal
284
+ break
285
+
286
+ operation_id = operation["id"]
287
+ wait_time = time.time() - operation["submitted_at"]
288
+
289
+ try:
290
+ async with self._lock:
291
+ self.stats.current_active += 1
292
+ self.stats.max_active_reached = max(
293
+ self.stats.max_active_reached,
294
+ self.stats.current_active
295
+ )
296
+ self.stats.total_wait_time_ms += wait_time * 1000
297
+ self._active_operations.add(operation_id)
298
+
299
+ # Execute function
300
+ if not operation["future"].cancelled():
301
+ exec_start = time.time()
302
+ result = await self._execute_function(
303
+ operation["func"],
304
+ *operation["args"],
305
+ **operation["kwargs"]
306
+ )
307
+ exec_time = time.time() - exec_start
308
+
309
+ operation["future"].set_result(result)
310
+
311
+ async with self._lock:
312
+ self.stats.total_execution_time_ms += exec_time * 1000
313
+
314
+ except Exception as e:
315
+ if not operation["future"].cancelled():
316
+ operation["future"].set_exception(e)
317
+
318
+ async with self._lock:
319
+ self.stats.failed_requests += 1
320
+
321
+ finally:
322
+ async with self._lock:
323
+ self.stats.current_active -= 1
324
+ self._active_operations.discard(operation_id)
325
+
326
+ self._queue.task_done()
327
+
328
+ except Exception as e:
329
+ logger.error(f"Worker {worker_name} error: {e}")
330
+
331
+ async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
332
+ """Execute function, handling both sync and async functions."""
333
+ if asyncio.iscoroutinefunction(func):
334
+ return await func(*args, **kwargs)
335
+ else:
336
+ # Run sync function in thread pool
337
+ loop = asyncio.get_event_loop()
338
+ return await loop.run_in_executor(None, func, *args, **kwargs)
339
+
340
+ def get_stats(self) -> Dict[str, Any]:
341
+ """Get bulkhead statistics."""
342
+ success_rate = (
343
+ self.stats.successful_requests / self.stats.total_requests
344
+ if self.stats.total_requests > 0 else 0
345
+ )
346
+
347
+ avg_wait_time = (
348
+ self.stats.total_wait_time_ms / self.stats.total_requests
349
+ if self.stats.total_requests > 0 else 0
350
+ )
351
+
352
+ avg_exec_time = (
353
+ self.stats.total_execution_time_ms / self.stats.successful_requests
354
+ if self.stats.successful_requests > 0 else 0
355
+ )
356
+
357
+ return {
358
+ "name": self.name,
359
+ "type": self.config.bulkhead_type.value,
360
+ "config": {
361
+ "max_concurrent": self.config.max_concurrent,
362
+ "queue_size": self.config.queue_size,
363
+ "timeout": self.config.timeout
364
+ },
365
+ "stats": {
366
+ "total_requests": self.stats.total_requests,
367
+ "successful_requests": self.stats.successful_requests,
368
+ "failed_requests": self.stats.failed_requests,
369
+ "rejected_requests": self.stats.rejected_requests,
370
+ "timeout_requests": self.stats.timeout_requests,
371
+ "success_rate": success_rate,
372
+ "current_active": self.stats.current_active,
373
+ "current_queued": self.stats.current_queued,
374
+ "max_active_reached": self.stats.max_active_reached,
375
+ "max_queued_reached": self.stats.max_queued_reached,
376
+ "avg_wait_time_ms": avg_wait_time,
377
+ "avg_execution_time_ms": avg_exec_time
378
+ }
379
+ }
380
+
381
+ async def shutdown(self):
382
+ """Shutdown bulkhead and cleanup resources."""
383
+ if self.config.bulkhead_type == BulkheadType.QUEUE:
384
+ # Signal workers to stop
385
+ for _ in self._workers:
386
+ await self._queue.put(None)
387
+
388
+ # Wait for workers to finish
389
+ await asyncio.gather(*self._workers, return_exceptions=True)
390
+ self._workers.clear()
391
+
392
+ logger.info(f"Bulkhead '{self.name}' shut down")
393
+
394
+
395
+ class BulkheadManager:
396
+ """
397
+ Manager for multiple bulkheads.
398
+
399
+ Provides centralized management and monitoring of bulkheads.
400
+ """
401
+
402
+ def __init__(self):
403
+ """Initialize bulkhead manager."""
404
+ self._bulkheads: Dict[str, Bulkhead] = {}
405
+ self._default_configs: Dict[str, BulkheadConfig] = {}
406
+
407
+ def register_default_config(
408
+ self,
409
+ resource_type: str,
410
+ config: BulkheadConfig
411
+ ):
412
+ """
413
+ Register default configuration for a resource type.
414
+
415
+ Args:
416
+ resource_type: Resource type name
417
+ config: Default configuration
418
+ """
419
+ self._default_configs[resource_type] = config
420
+ logger.info(f"Registered default bulkhead config for '{resource_type}'")
421
+
422
+ def get_bulkhead(
423
+ self,
424
+ resource_type: str,
425
+ config: Optional[BulkheadConfig] = None
426
+ ) -> Bulkhead:
427
+ """
428
+ Get or create bulkhead for resource type.
429
+
430
+ Args:
431
+ resource_type: Resource type name
432
+ config: Configuration (uses default if not provided)
433
+
434
+ Returns:
435
+ Bulkhead instance
436
+ """
437
+ if resource_type not in self._bulkheads:
438
+ # Use provided config or default
439
+ bulkhead_config = (
440
+ config or
441
+ self._default_configs.get(resource_type) or
442
+ BulkheadConfig()
443
+ )
444
+
445
+ self._bulkheads[resource_type] = Bulkhead(
446
+ resource_type,
447
+ bulkhead_config
448
+ )
449
+
450
+ return self._bulkheads[resource_type]
451
+
452
+ async def execute_with_bulkhead(
453
+ self,
454
+ resource_type: str,
455
+ func: Callable,
456
+ *args,
457
+ config: Optional[BulkheadConfig] = None,
458
+ **kwargs
459
+ ) -> Any:
460
+ """
461
+ Execute function with bulkhead protection.
462
+
463
+ Args:
464
+ resource_type: Resource type name
465
+ func: Function to execute
466
+ *args: Function arguments
467
+ config: Optional configuration
468
+ **kwargs: Function keyword arguments
469
+
470
+ Returns:
471
+ Function result
472
+ """
473
+ bulkhead = self.get_bulkhead(resource_type, config)
474
+ return await bulkhead.execute(func, *args, **kwargs)
475
+
476
+ def get_all_stats(self) -> Dict[str, Any]:
477
+ """Get statistics for all bulkheads."""
478
+ return {
479
+ name: bulkhead.get_stats()
480
+ for name, bulkhead in self._bulkheads.items()
481
+ }
482
+
483
+ async def shutdown_all(self):
484
+ """Shutdown all bulkheads."""
485
+ for bulkhead in self._bulkheads.values():
486
+ await bulkhead.shutdown()
487
+
488
+ logger.info("All bulkheads shut down")
489
+
490
+ def get_resource_utilization(self) -> Dict[str, Any]:
491
+ """Get resource utilization across all bulkheads."""
492
+ total_capacity = 0
493
+ total_active = 0
494
+ total_queued = 0
495
+
496
+ resource_stats = {}
497
+
498
+ for name, bulkhead in self._bulkheads.items():
499
+ stats = bulkhead.get_stats()
500
+ capacity = stats["config"]["max_concurrent"]
501
+ active = stats["stats"]["current_active"]
502
+ queued = stats["stats"]["current_queued"]
503
+
504
+ total_capacity += capacity
505
+ total_active += active
506
+ total_queued += queued
507
+
508
+ resource_stats[name] = {
509
+ "utilization": active / capacity if capacity > 0 else 0,
510
+ "active": active,
511
+ "capacity": capacity,
512
+ "queued": queued
513
+ }
514
+
515
+ overall_utilization = (
516
+ total_active / total_capacity if total_capacity > 0 else 0
517
+ )
518
+
519
+ return {
520
+ "overall_utilization": overall_utilization,
521
+ "total_capacity": total_capacity,
522
+ "total_active": total_active,
523
+ "total_queued": total_queued,
524
+ "resources": resource_stats
525
+ }
526
+
527
+
528
+ # Global bulkhead manager
529
+ bulkhead_manager = BulkheadManager()
530
+
531
+
532
+ # Pre-configured bulkheads for common resource types
533
+ def setup_default_bulkheads():
534
+ """Setup default bulkhead configurations."""
535
+
536
+ # Database operations
537
+ bulkhead_manager.register_default_config(
538
+ "database",
539
+ BulkheadConfig(
540
+ max_concurrent=20,
541
+ queue_size=100,
542
+ timeout=30.0,
543
+ bulkhead_type=BulkheadType.SEMAPHORE
544
+ )
545
+ )
546
+
547
+ # External API calls
548
+ bulkhead_manager.register_default_config(
549
+ "external_api",
550
+ BulkheadConfig(
551
+ max_concurrent=10,
552
+ queue_size=50,
553
+ timeout=15.0,
554
+ bulkhead_type=BulkheadType.QUEUE
555
+ )
556
+ )
557
+
558
+ # LLM operations
559
+ bulkhead_manager.register_default_config(
560
+ "llm_operations",
561
+ BulkheadConfig(
562
+ max_concurrent=5,
563
+ queue_size=20,
564
+ timeout=60.0,
565
+ bulkhead_type=BulkheadType.QUEUE
566
+ )
567
+ )
568
+
569
+ # File operations
570
+ bulkhead_manager.register_default_config(
571
+ "file_operations",
572
+ BulkheadConfig(
573
+ max_concurrent=15,
574
+ timeout=30.0,
575
+ bulkhead_type=BulkheadType.SEMAPHORE
576
+ )
577
+ )
578
+
579
+ # Analytics operations
580
+ bulkhead_manager.register_default_config(
581
+ "analytics",
582
+ BulkheadConfig(
583
+ max_concurrent=8,
584
+ queue_size=30,
585
+ timeout=120.0,
586
+ bulkhead_type=BulkheadType.QUEUE
587
+ )
588
+ )
589
+
590
+
591
+ # Initialize default configurations
592
+ setup_default_bulkheads()
593
+
594
+
595
+ # Convenience decorator
596
+ def bulkhead(
597
+ resource_type: str,
598
+ config: Optional[BulkheadConfig] = None
599
+ ):
600
+ """
601
+ Decorator to protect functions with bulkhead.
602
+
603
+ Args:
604
+ resource_type: Resource type for bulkhead
605
+ config: Optional configuration
606
+ """
607
+ def decorator(func):
608
+ async def wrapper(*args, **kwargs):
609
+ return await bulkhead_manager.execute_with_bulkhead(
610
+ resource_type, func, *args, config=config, **kwargs
611
+ )
612
+ return wrapper
613
+ return decorator
src/infrastructure/resilience/circuit_breaker.py ADDED
@@ -0,0 +1,516 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Circuit breaker pattern implementation for external services.
3
+
4
+ This module provides circuit breaker functionality to prevent cascading
5
+ failures and improve system resilience.
6
+ """
7
+
8
+ import asyncio
9
+ from typing import Any, Callable, Optional, Dict, Union
10
+ from datetime import datetime, timedelta
11
+ from enum import Enum
12
+ import time
13
+ from dataclasses import dataclass, field
14
+
15
+ from src.core import get_logger
16
+
17
+ logger = get_logger(__name__)
18
+
19
+
20
+ class CircuitState(str, Enum):
21
+ """Circuit breaker states."""
22
+ CLOSED = "closed" # Normal operation
23
+ OPEN = "open" # Circuit is open, rejecting requests
24
+ HALF_OPEN = "half_open" # Testing if service is recovered
25
+
26
+
27
+ @dataclass
28
+ class CircuitBreakerConfig:
29
+ """Circuit breaker configuration."""
30
+ failure_threshold: int = 5 # Failures before opening
31
+ recovery_timeout: float = 60.0 # Seconds before trying half-open
32
+ success_threshold: int = 3 # Successes to close from half-open
33
+ timeout: float = 30.0 # Request timeout
34
+ expected_exception: type = Exception # Exception type to count as failure
35
+
36
+
37
+ @dataclass
38
+ class CircuitBreakerStats:
39
+ """Circuit breaker statistics."""
40
+ total_requests: int = 0
41
+ successful_requests: int = 0
42
+ failed_requests: int = 0
43
+ rejected_requests: int = 0
44
+ state_changes: int = 0
45
+ last_failure_time: Optional[datetime] = None
46
+ last_success_time: Optional[datetime] = None
47
+ current_consecutive_failures: int = 0
48
+ current_consecutive_successes: int = 0
49
+
50
+
51
+ class CircuitBreakerOpenException(Exception):
52
+ """Exception raised when circuit breaker is open."""
53
+ pass
54
+
55
+
56
+ class CircuitBreakerTimeoutException(Exception):
57
+ """Exception raised when request times out."""
58
+ pass
59
+
60
+
61
+ class CircuitBreaker:
62
+ """
63
+ Circuit breaker implementation for resilient external service calls.
64
+
65
+ Features:
66
+ - Automatic failure detection
67
+ - Configurable thresholds
68
+ - Recovery mechanism
69
+ - Statistics and monitoring
70
+ - Async/await support
71
+ """
72
+
73
+ def __init__(
74
+ self,
75
+ name: str,
76
+ config: Optional[CircuitBreakerConfig] = None
77
+ ):
78
+ """
79
+ Initialize circuit breaker.
80
+
81
+ Args:
82
+ name: Circuit breaker name for identification
83
+ config: Configuration parameters
84
+ """
85
+ self.name = name
86
+ self.config = config or CircuitBreakerConfig()
87
+ self.state = CircuitState.CLOSED
88
+ self.stats = CircuitBreakerStats()
89
+ self._lock = asyncio.Lock()
90
+ self._last_failure_time: Optional[float] = None
91
+
92
+ logger.info(f"Circuit breaker '{name}' initialized")
93
+
94
+ async def call(self, func: Callable, *args, **kwargs) -> Any:
95
+ """
96
+ Execute function with circuit breaker protection.
97
+
98
+ Args:
99
+ func: Function to execute
100
+ *args: Function arguments
101
+ **kwargs: Function keyword arguments
102
+
103
+ Returns:
104
+ Function result
105
+
106
+ Raises:
107
+ CircuitBreakerOpenException: When circuit is open
108
+ CircuitBreakerTimeoutException: When request times out
109
+ """
110
+ async with self._lock:
111
+ self.stats.total_requests += 1
112
+
113
+ # Check if circuit should be opened
114
+ await self._check_state()
115
+
116
+ if self.state == CircuitState.OPEN:
117
+ self.stats.rejected_requests += 1
118
+ raise CircuitBreakerOpenException(
119
+ f"Circuit breaker '{self.name}' is open"
120
+ )
121
+
122
+ # Execute the function
123
+ start_time = time.time()
124
+
125
+ try:
126
+ # Execute with timeout
127
+ result = await asyncio.wait_for(
128
+ self._execute_async(func, *args, **kwargs),
129
+ timeout=self.config.timeout
130
+ )
131
+
132
+ # Record success
133
+ await self._record_success()
134
+
135
+ execution_time = time.time() - start_time
136
+ logger.debug(
137
+ f"Circuit breaker '{self.name}' - Success "
138
+ f"(time: {execution_time:.3f}s)"
139
+ )
140
+
141
+ return result
142
+
143
+ except asyncio.TimeoutError:
144
+ await self._record_failure()
145
+ execution_time = time.time() - start_time
146
+
147
+ logger.warning(
148
+ f"Circuit breaker '{self.name}' - Timeout "
149
+ f"(time: {execution_time:.3f}s)"
150
+ )
151
+
152
+ raise CircuitBreakerTimeoutException(
153
+ f"Request to '{self.name}' timed out after {self.config.timeout}s"
154
+ )
155
+
156
+ except self.config.expected_exception as e:
157
+ await self._record_failure()
158
+ execution_time = time.time() - start_time
159
+
160
+ logger.warning(
161
+ f"Circuit breaker '{self.name}' - Failure: {e} "
162
+ f"(time: {execution_time:.3f}s)"
163
+ )
164
+
165
+ raise
166
+
167
+ async def _execute_async(self, func: Callable, *args, **kwargs) -> Any:
168
+ """Execute function, handling both sync and async functions."""
169
+ if asyncio.iscoroutinefunction(func):
170
+ return await func(*args, **kwargs)
171
+ else:
172
+ # Run sync function in thread pool
173
+ loop = asyncio.get_event_loop()
174
+ return await loop.run_in_executor(None, func, *args, **kwargs)
175
+
176
+ async def _check_state(self):
177
+ """Check and update circuit breaker state."""
178
+ current_time = time.time()
179
+
180
+ if self.state == CircuitState.OPEN:
181
+ # Check if we should try half-open
182
+ if (self._last_failure_time and
183
+ current_time - self._last_failure_time >= self.config.recovery_timeout):
184
+
185
+ self.state = CircuitState.HALF_OPEN
186
+ self.stats.state_changes += 1
187
+
188
+ logger.info(
189
+ f"Circuit breaker '{self.name}' transitioned to HALF_OPEN"
190
+ )
191
+
192
+ elif self.state == CircuitState.HALF_OPEN:
193
+ # Half-open state is handled in success/failure recording
194
+ pass
195
+
196
+ async def _record_success(self):
197
+ """Record successful execution."""
198
+ async with self._lock:
199
+ self.stats.successful_requests += 1
200
+ self.stats.current_consecutive_failures = 0
201
+ self.stats.current_consecutive_successes += 1
202
+ self.stats.last_success_time = datetime.utcnow()
203
+
204
+ if self.state == CircuitState.HALF_OPEN:
205
+ if (self.stats.current_consecutive_successes >=
206
+ self.config.success_threshold):
207
+
208
+ # Transition to closed
209
+ self.state = CircuitState.CLOSED
210
+ self.stats.state_changes += 1
211
+ self.stats.current_consecutive_successes = 0
212
+
213
+ logger.info(
214
+ f"Circuit breaker '{self.name}' transitioned to CLOSED"
215
+ )
216
+
217
+ async def _record_failure(self):
218
+ """Record failed execution."""
219
+ async with self._lock:
220
+ self.stats.failed_requests += 1
221
+ self.stats.current_consecutive_successes = 0
222
+ self.stats.current_consecutive_failures += 1
223
+ self.stats.last_failure_time = datetime.utcnow()
224
+ self._last_failure_time = time.time()
225
+
226
+ # Check if we should open the circuit
227
+ if (self.state in [CircuitState.CLOSED, CircuitState.HALF_OPEN] and
228
+ self.stats.current_consecutive_failures >= self.config.failure_threshold):
229
+
230
+ self.state = CircuitState.OPEN
231
+ self.stats.state_changes += 1
232
+
233
+ logger.warning(
234
+ f"Circuit breaker '{self.name}' opened after "
235
+ f"{self.stats.current_consecutive_failures} consecutive failures"
236
+ )
237
+
238
+ def get_stats(self) -> Dict[str, Any]:
239
+ """Get circuit breaker statistics."""
240
+ success_rate = (
241
+ self.stats.successful_requests / self.stats.total_requests
242
+ if self.stats.total_requests > 0 else 0
243
+ )
244
+
245
+ return {
246
+ "name": self.name,
247
+ "state": self.state.value,
248
+ "config": {
249
+ "failure_threshold": self.config.failure_threshold,
250
+ "recovery_timeout": self.config.recovery_timeout,
251
+ "success_threshold": self.config.success_threshold,
252
+ "timeout": self.config.timeout
253
+ },
254
+ "stats": {
255
+ "total_requests": self.stats.total_requests,
256
+ "successful_requests": self.stats.successful_requests,
257
+ "failed_requests": self.stats.failed_requests,
258
+ "rejected_requests": self.stats.rejected_requests,
259
+ "success_rate": success_rate,
260
+ "state_changes": self.stats.state_changes,
261
+ "current_consecutive_failures": self.stats.current_consecutive_failures,
262
+ "current_consecutive_successes": self.stats.current_consecutive_successes,
263
+ "last_failure_time": (
264
+ self.stats.last_failure_time.isoformat()
265
+ if self.stats.last_failure_time else None
266
+ ),
267
+ "last_success_time": (
268
+ self.stats.last_success_time.isoformat()
269
+ if self.stats.last_success_time else None
270
+ )
271
+ }
272
+ }
273
+
274
+ async def reset(self):
275
+ """Reset circuit breaker to closed state."""
276
+ async with self._lock:
277
+ self.state = CircuitState.CLOSED
278
+ self.stats.current_consecutive_failures = 0
279
+ self.stats.current_consecutive_successes = 0
280
+ self._last_failure_time = None
281
+
282
+ logger.info(f"Circuit breaker '{self.name}' manually reset")
283
+
284
+ async def force_open(self):
285
+ """Force circuit breaker to open state."""
286
+ async with self._lock:
287
+ self.state = CircuitState.OPEN
288
+ self._last_failure_time = time.time()
289
+
290
+ logger.warning(f"Circuit breaker '{self.name}' manually opened")
291
+
292
+
293
+ class CircuitBreakerManager:
294
+ """
295
+ Manager for multiple circuit breakers.
296
+
297
+ Provides centralized management and monitoring of circuit breakers.
298
+ """
299
+
300
+ def __init__(self):
301
+ """Initialize circuit breaker manager."""
302
+ self._breakers: Dict[str, CircuitBreaker] = {}
303
+ self._default_configs: Dict[str, CircuitBreakerConfig] = {}
304
+
305
+ def register_default_config(
306
+ self,
307
+ service_name: str,
308
+ config: CircuitBreakerConfig
309
+ ):
310
+ """
311
+ Register default configuration for a service.
312
+
313
+ Args:
314
+ service_name: Service name
315
+ config: Default configuration
316
+ """
317
+ self._default_configs[service_name] = config
318
+ logger.info(f"Registered default config for service '{service_name}'")
319
+
320
+ def get_circuit_breaker(
321
+ self,
322
+ service_name: str,
323
+ config: Optional[CircuitBreakerConfig] = None
324
+ ) -> CircuitBreaker:
325
+ """
326
+ Get or create circuit breaker for service.
327
+
328
+ Args:
329
+ service_name: Service name
330
+ config: Configuration (uses default if not provided)
331
+
332
+ Returns:
333
+ Circuit breaker instance
334
+ """
335
+ if service_name not in self._breakers:
336
+ # Use provided config or default
337
+ breaker_config = (
338
+ config or
339
+ self._default_configs.get(service_name) or
340
+ CircuitBreakerConfig()
341
+ )
342
+
343
+ self._breakers[service_name] = CircuitBreaker(
344
+ service_name,
345
+ breaker_config
346
+ )
347
+
348
+ return self._breakers[service_name]
349
+
350
+ async def call_service(
351
+ self,
352
+ service_name: str,
353
+ func: Callable,
354
+ *args,
355
+ config: Optional[CircuitBreakerConfig] = None,
356
+ **kwargs
357
+ ) -> Any:
358
+ """
359
+ Call service through circuit breaker.
360
+
361
+ Args:
362
+ service_name: Service name
363
+ func: Function to call
364
+ *args: Function arguments
365
+ config: Optional configuration
366
+ **kwargs: Function keyword arguments
367
+
368
+ Returns:
369
+ Function result
370
+ """
371
+ breaker = self.get_circuit_breaker(service_name, config)
372
+ return await breaker.call(func, *args, **kwargs)
373
+
374
+ def get_all_stats(self) -> Dict[str, Any]:
375
+ """Get statistics for all circuit breakers."""
376
+ return {
377
+ name: breaker.get_stats()
378
+ for name, breaker in self._breakers.items()
379
+ }
380
+
381
+ async def reset_all(self):
382
+ """Reset all circuit breakers."""
383
+ for breaker in self._breakers.values():
384
+ await breaker.reset()
385
+
386
+ logger.info("All circuit breakers reset")
387
+
388
+ def get_health_status(self) -> Dict[str, Any]:
389
+ """Get health status of all services."""
390
+ healthy_services = []
391
+ degraded_services = []
392
+ failed_services = []
393
+
394
+ for name, breaker in self._breakers.items():
395
+ if breaker.state == CircuitState.CLOSED:
396
+ healthy_services.append(name)
397
+ elif breaker.state == CircuitState.HALF_OPEN:
398
+ degraded_services.append(name)
399
+ else: # OPEN
400
+ failed_services.append(name)
401
+
402
+ total_services = len(self._breakers)
403
+ healthy_count = len(healthy_services)
404
+
405
+ overall_health = "healthy"
406
+ if len(failed_services) > 0:
407
+ if healthy_count == 0:
408
+ overall_health = "critical"
409
+ else:
410
+ overall_health = "degraded"
411
+ elif len(degraded_services) > 0:
412
+ overall_health = "degraded"
413
+
414
+ return {
415
+ "overall_health": overall_health,
416
+ "total_services": total_services,
417
+ "healthy_services": healthy_services,
418
+ "degraded_services": degraded_services,
419
+ "failed_services": failed_services,
420
+ "health_score": healthy_count / total_services if total_services > 0 else 1.0
421
+ }
422
+
423
+
424
+ # Global circuit breaker manager
425
+ circuit_breaker_manager = CircuitBreakerManager()
426
+
427
+
428
+ # Pre-configured circuit breakers for common services
429
+ def setup_default_circuit_breakers():
430
+ """Setup default circuit breaker configurations."""
431
+
432
+ # Portal da Transparência API
433
+ circuit_breaker_manager.register_default_config(
434
+ "transparency_api",
435
+ CircuitBreakerConfig(
436
+ failure_threshold=3,
437
+ recovery_timeout=30.0,
438
+ success_threshold=2,
439
+ timeout=15.0
440
+ )
441
+ )
442
+
443
+ # LLM Services (Groq, etc)
444
+ circuit_breaker_manager.register_default_config(
445
+ "llm_service",
446
+ CircuitBreakerConfig(
447
+ failure_threshold=5,
448
+ recovery_timeout=60.0,
449
+ success_threshold=3,
450
+ timeout=30.0
451
+ )
452
+ )
453
+
454
+ # Database connections
455
+ circuit_breaker_manager.register_default_config(
456
+ "database",
457
+ CircuitBreakerConfig(
458
+ failure_threshold=2,
459
+ recovery_timeout=10.0,
460
+ success_threshold=1,
461
+ timeout=5.0
462
+ )
463
+ )
464
+
465
+ # Redis connections
466
+ circuit_breaker_manager.register_default_config(
467
+ "redis",
468
+ CircuitBreakerConfig(
469
+ failure_threshold=3,
470
+ recovery_timeout=20.0,
471
+ success_threshold=2,
472
+ timeout=3.0
473
+ )
474
+ )
475
+
476
+
477
+ # Initialize default configurations
478
+ setup_default_circuit_breakers()
479
+
480
+
481
+ # Convenience decorators
482
+ def circuit_breaker(
483
+ service_name: str,
484
+ config: Optional[CircuitBreakerConfig] = None
485
+ ):
486
+ """
487
+ Decorator to protect functions with circuit breaker.
488
+
489
+ Args:
490
+ service_name: Service name for circuit breaker
491
+ config: Optional configuration
492
+ """
493
+ def decorator(func):
494
+ async def wrapper(*args, **kwargs):
495
+ return await circuit_breaker_manager.call_service(
496
+ service_name, func, *args, config=config, **kwargs
497
+ )
498
+ return wrapper
499
+ return decorator
500
+
501
+
502
+ # Example usage functions
503
+ async def protected_api_call(url: str) -> dict:
504
+ """Example of API call protected by circuit breaker."""
505
+ import httpx
506
+
507
+ async def make_request():
508
+ async with httpx.AsyncClient() as client:
509
+ response = await client.get(url)
510
+ response.raise_for_status()
511
+ return response.json()
512
+
513
+ return await circuit_breaker_manager.call_service(
514
+ "external_api",
515
+ make_request
516
+ )
src/llm/providers.py CHANGED
@@ -16,9 +16,10 @@ from enum import Enum
16
  import httpx
17
  from pydantic import BaseModel, Field as PydanticField
18
 
19
- from src.core import get_logger, settings
20
  from src.core.exceptions import LLMError, LLMRateLimitError
21
  from src.services.maritaca_client import MaritacaClient, MaritacaModel
 
22
 
23
 
24
  class LLMProvider(str, Enum):
@@ -81,14 +82,19 @@ class BaseLLMProvider(ABC):
81
  self.timeout = timeout
82
  self.max_retries = max_retries
83
  self.logger = get_logger(__name__)
 
84
 
85
- self.client = httpx.AsyncClient(
86
- timeout=httpx.Timeout(timeout),
87
- limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
88
- )
89
 
90
  async def __aenter__(self):
91
  """Async context manager entry."""
 
 
 
 
 
 
92
  return self
93
 
94
  async def __aexit__(self, exc_type, exc_val, exc_tb):
@@ -97,7 +103,8 @@ class BaseLLMProvider(ABC):
97
 
98
  async def close(self):
99
  """Close HTTP client."""
100
- await self.client.aclose()
 
101
 
102
  @abstractmethod
103
  async def complete(self, request: LLMRequest) -> LLMResponse:
@@ -134,6 +141,30 @@ class BaseLLMProvider(ABC):
134
  stream: bool = False
135
  ) -> Union[Dict[str, Any], AsyncGenerator[Dict[str, Any], None]]:
136
  """Make HTTP request with retry logic."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  url = f"{self.base_url}{endpoint}"
138
  headers = self._get_headers()
139
 
@@ -165,7 +196,7 @@ class BaseLLMProvider(ABC):
165
  else:
166
  response = await self.client.post(
167
  url,
168
- json=data,
169
  headers=headers,
170
  )
171
 
@@ -178,7 +209,7 @@ class BaseLLMProvider(ABC):
178
  response_time=response_time,
179
  )
180
 
181
- return response.json()
182
  else:
183
  await self._handle_error_response(response, attempt)
184
 
@@ -277,7 +308,7 @@ class BaseLLMProvider(ABC):
277
  if data == "[DONE]":
278
  break
279
  try:
280
- yield eval(data) # Parse JSON chunk
281
  except:
282
  continue
283
 
@@ -294,6 +325,7 @@ class GroqProvider(BaseLLMProvider):
294
  timeout=60,
295
  max_retries=3,
296
  )
 
297
 
298
  async def complete(self, request: LLMRequest) -> LLMResponse:
299
  """Complete text generation using Groq."""
 
16
  import httpx
17
  from pydantic import BaseModel, Field as PydanticField
18
 
19
+ from src.core import get_logger, settings, get_llm_pool
20
  from src.core.exceptions import LLMError, LLMRateLimitError
21
  from src.services.maritaca_client import MaritacaClient, MaritacaModel
22
+ from src.core.json_utils import dumps, loads
23
 
24
 
25
  class LLMProvider(str, Enum):
 
82
  self.timeout = timeout
83
  self.max_retries = max_retries
84
  self.logger = get_logger(__name__)
85
+ self._use_pool = True # Flag to use connection pool
86
 
87
+ # Legacy client for backward compatibility
88
+ self.client = None
 
 
89
 
90
  async def __aenter__(self):
91
  """Async context manager entry."""
92
+ # Initialize legacy client if not using pool
93
+ if not self._use_pool and not self.client:
94
+ self.client = httpx.AsyncClient(
95
+ timeout=httpx.Timeout(self.timeout),
96
+ limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
97
+ )
98
  return self
99
 
100
  async def __aexit__(self, exc_type, exc_val, exc_tb):
 
103
 
104
  async def close(self):
105
  """Close HTTP client."""
106
+ if self.client:
107
+ await self.client.aclose()
108
 
109
  @abstractmethod
110
  async def complete(self, request: LLMRequest) -> LLMResponse:
 
141
  stream: bool = False
142
  ) -> Union[Dict[str, Any], AsyncGenerator[Dict[str, Any], None]]:
143
  """Make HTTP request with retry logic."""
144
+ # Use connection pool if available
145
+ if self._use_pool and hasattr(self, '_provider_name'):
146
+ try:
147
+ pool = await get_llm_pool()
148
+ if stream:
149
+ # For streaming, fall back to regular client for now
150
+ self.logger.debug("Streaming not yet supported with pool, using regular client")
151
+ else:
152
+ result = await pool.post(
153
+ self._provider_name,
154
+ endpoint,
155
+ data
156
+ )
157
+ return result
158
+ except Exception as e:
159
+ self.logger.warning(f"Pool request failed, falling back to regular client: {e}")
160
+
161
+ # Original implementation for fallback or streaming
162
+ if not self.client:
163
+ self.client = httpx.AsyncClient(
164
+ timeout=httpx.Timeout(self.timeout),
165
+ limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
166
+ )
167
+
168
  url = f"{self.base_url}{endpoint}"
169
  headers = self._get_headers()
170
 
 
196
  else:
197
  response = await self.client.post(
198
  url,
199
+ content=dumps_bytes(data),
200
  headers=headers,
201
  )
202
 
 
209
  response_time=response_time,
210
  )
211
 
212
+ return loads(response.content)
213
  else:
214
  await self._handle_error_response(response, attempt)
215
 
 
308
  if data == "[DONE]":
309
  break
310
  try:
311
+ yield loads(data) # Parse JSON chunk safely with orjson
312
  except:
313
  continue
314
 
 
325
  timeout=60,
326
  max_retries=3,
327
  )
328
+ self._provider_name = "groq"
329
 
330
  async def complete(self, request: LLMRequest) -> LLMResponse:
331
  """Complete text generation using Groq."""