anderson-ufrj commited on
Commit
f8a1f65
·
1 Parent(s): d392f49

feat(metrics): implement comprehensive Prometheus metrics across all endpoints

Browse files

- Add MetricsMiddleware for automatic HTTP request tracking
- Track request duration, status codes, and error rates
- Integrate metrics into investigation endpoints with business metrics
- Update BaseAgent to use centralized metrics manager
- Add metrics decorators (@track_time, @count_calls) for easy integration
- Track agent task execution with duration and status
- Monitor investigation lifecycle (created, completed, failed)
- Record anomaly detection metrics with confidence scores
- Setup HTTP-specific metrics (requests, errors, slow requests)
- Enable /api/v1/observability/metrics endpoint for Prometheus scraping
- Track active investigations gauge for real-time monitoring
- Replace legacy monitoring with centralized metrics manager

src/agents/deodoro.py CHANGED
@@ -17,7 +17,7 @@ from pydantic import BaseModel, Field as PydanticField
17
 
18
  from src.core import AgentStatus, get_logger
19
  from src.core.exceptions import AgentError, AgentExecutionError
20
- from src.core.monitoring import AGENT_TASK_COUNT, AGENT_TASK_DURATION
21
  import time
22
 
23
 
@@ -188,23 +188,29 @@ class BaseAgent(ABC):
188
  retry=retries,
189
  )
190
 
191
- # Process the message with timing
192
- with AGENT_TASK_DURATION.labels(
193
- agent_type=self.name,
194
- task_type=action
195
- ).time():
196
- response = await self.process(message, context)
197
 
198
  # Calculate processing time
199
  processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
200
  response.processing_time_ms = processing_time
201
 
202
- # Record successful execution
203
- AGENT_TASK_COUNT.labels(
204
- agent_type=self.name,
 
 
 
 
 
 
 
 
 
205
  task_type=action,
206
- status="completed"
207
- ).inc()
 
208
 
209
  # Update status
210
  self.status = AgentStatus.COMPLETED
@@ -233,11 +239,14 @@ class BaseAgent(ABC):
233
  )
234
 
235
  # Record retry attempt
236
- AGENT_TASK_COUNT.labels(
237
- agent_type=self.name,
238
- task_type=action,
239
- status="retry"
240
- ).inc()
 
 
 
241
 
242
  retries += 1
243
  if retries <= self.max_retries:
@@ -245,11 +254,21 @@ class BaseAgent(ABC):
245
  await self._wait(2 ** retries)
246
 
247
  # All retries exhausted - record failure
248
- AGENT_TASK_COUNT.labels(
249
- agent_type=self.name,
 
 
 
 
 
 
 
 
 
250
  task_type=action,
 
251
  status="failed"
252
- ).inc()
253
 
254
  self.status = AgentStatus.ERROR
255
 
 
17
 
18
  from src.core import AgentStatus, get_logger
19
  from src.core.exceptions import AgentError, AgentExecutionError
20
+ from src.infrastructure.observability.metrics import metrics_manager, BusinessMetrics
21
  import time
22
 
23
 
 
188
  retry=retries,
189
  )
190
 
191
+ # Process the message
192
+ response = await self.process(message, context)
 
 
 
 
193
 
194
  # Calculate processing time
195
  processing_time = (datetime.utcnow() - start_time).total_seconds() * 1000
196
  response.processing_time_ms = processing_time
197
 
198
+ # Record metrics using centralized metrics manager
199
+ metrics_manager.increment_counter(
200
+ "cidadao_ai_agent_tasks_total",
201
+ labels={
202
+ "agent_name": self.name,
203
+ "task_type": action,
204
+ "status": "completed"
205
+ }
206
+ )
207
+
208
+ BusinessMetrics.record_agent_task(
209
+ agent_name=self.name,
210
  task_type=action,
211
+ duration_seconds=processing_time / 1000.0,
212
+ status="success"
213
+ )
214
 
215
  # Update status
216
  self.status = AgentStatus.COMPLETED
 
239
  )
240
 
241
  # Record retry attempt
242
+ metrics_manager.increment_counter(
243
+ "cidadao_ai_agent_tasks_total",
244
+ labels={
245
+ "agent_name": self.name,
246
+ "task_type": action,
247
+ "status": "retry"
248
+ }
249
+ )
250
 
251
  retries += 1
252
  if retries <= self.max_retries:
 
254
  await self._wait(2 ** retries)
255
 
256
  # All retries exhausted - record failure
257
+ metrics_manager.increment_counter(
258
+ "cidadao_ai_agent_tasks_total",
259
+ labels={
260
+ "agent_name": self.name,
261
+ "task_type": action,
262
+ "status": "failed"
263
+ }
264
+ )
265
+
266
+ BusinessMetrics.record_agent_task(
267
+ agent_name=self.name,
268
  task_type=action,
269
+ duration_seconds=(datetime.utcnow() - start_time).total_seconds(),
270
  status="failed"
271
+ )
272
 
273
  self.status = AgentStatus.ERROR
274
 
src/api/app.py CHANGED
@@ -26,6 +26,7 @@ from src.api.middleware.authentication import AuthenticationMiddleware
26
  from src.api.middleware.logging_middleware import LoggingMiddleware
27
  from src.api.middleware.security import SecurityMiddleware
28
  from src.api.middleware.compression import CompressionMiddleware
 
29
  from src.infrastructure.observability import (
30
  CorrelationMiddleware,
31
  tracing_manager,
@@ -63,6 +64,9 @@ async def lifespan(app: FastAPI):
63
  build_info={"deployment": "hf-fastapi"}
64
  )
65
 
 
 
 
66
  # Initialize global resources here
67
  # - Database connections
68
  # - Background tasks
@@ -168,6 +172,9 @@ app.add_middleware(
168
  # Add observability middleware
169
  app.add_middleware(CorrelationMiddleware, generate_request_id=True)
170
 
 
 
 
171
  # Add compression middleware
172
  from src.api.middleware.compression import add_compression_middleware
173
  add_compression_middleware(
 
26
  from src.api.middleware.logging_middleware import LoggingMiddleware
27
  from src.api.middleware.security import SecurityMiddleware
28
  from src.api.middleware.compression import CompressionMiddleware
29
+ from src.api.middleware.metrics_middleware import MetricsMiddleware, setup_http_metrics
30
  from src.infrastructure.observability import (
31
  CorrelationMiddleware,
32
  tracing_manager,
 
64
  build_info={"deployment": "hf-fastapi"}
65
  )
66
 
67
+ # Setup HTTP metrics
68
+ setup_http_metrics()
69
+
70
  # Initialize global resources here
71
  # - Database connections
72
  # - Background tasks
 
172
  # Add observability middleware
173
  app.add_middleware(CorrelationMiddleware, generate_request_id=True)
174
 
175
+ # Add metrics middleware for automatic HTTP metrics
176
+ app.add_middleware(MetricsMiddleware)
177
+
178
  # Add compression middleware
179
  from src.api.middleware.compression import add_compression_middleware
180
  add_compression_middleware(
src/api/middleware/metrics_middleware.py ADDED
@@ -0,0 +1,238 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Prometheus metrics middleware for automatic HTTP request tracking.
3
+
4
+ This middleware automatically records metrics for all HTTP requests,
5
+ including duration, status codes, and error rates.
6
+ """
7
+
8
+ import time
9
+ from typing import Callable
10
+
11
+ from fastapi import Request, Response
12
+ from starlette.middleware.base import BaseHTTPMiddleware
13
+
14
+ from src.core import get_logger
15
+ from src.infrastructure.observability.metrics import metrics_manager, BusinessMetrics
16
+
17
+ logger = get_logger(__name__)
18
+
19
+
20
+ class MetricsMiddleware(BaseHTTPMiddleware):
21
+ """Middleware for automatic Prometheus metrics collection."""
22
+
23
+ def __init__(self, app):
24
+ """Initialize metrics middleware."""
25
+ super().__init__(app)
26
+ self.logger = get_logger(__name__)
27
+
28
+ async def dispatch(self, request: Request, call_next: Callable) -> Response:
29
+ """Process request with automatic metrics collection."""
30
+ start_time = time.time()
31
+
32
+ # Skip metrics endpoint to avoid recursion
33
+ if request.url.path == "/api/v1/observability/metrics":
34
+ return await call_next(request)
35
+
36
+ # Extract path template (FastAPI route) for grouping
37
+ path_template = self._get_path_template(request)
38
+ method = request.method
39
+
40
+ try:
41
+ # Process request
42
+ response = await call_next(request)
43
+
44
+ # Calculate duration
45
+ duration = time.time() - start_time
46
+
47
+ # Record metrics
48
+ self._record_request_metrics(
49
+ method=method,
50
+ path=path_template,
51
+ status_code=response.status_code,
52
+ duration=duration
53
+ )
54
+
55
+ return response
56
+
57
+ except Exception as exc:
58
+ # Calculate duration even for errors
59
+ duration = time.time() - start_time
60
+
61
+ # Record error metrics
62
+ self._record_request_metrics(
63
+ method=method,
64
+ path=path_template,
65
+ status_code=500, # Default error status
66
+ duration=duration,
67
+ error=True
68
+ )
69
+
70
+ # Re-raise the exception
71
+ raise exc
72
+
73
+ def _get_path_template(self, request: Request) -> str:
74
+ """
75
+ Get the path template from FastAPI route.
76
+
77
+ This extracts the route pattern (e.g., /users/{user_id})
78
+ instead of the actual path (e.g., /users/123) to avoid
79
+ high cardinality in metrics.
80
+ """
81
+ # Try to get route from request scope
82
+ if hasattr(request, "scope") and "route" in request.scope:
83
+ route = request.scope["route"]
84
+ if hasattr(route, "path"):
85
+ return route.path
86
+
87
+ # Fallback to actual path, but try to generalize it
88
+ path = request.url.path
89
+
90
+ # Common patterns to generalize
91
+ # Replace UUIDs
92
+ import re
93
+ path = re.sub(
94
+ r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',
95
+ '{uuid}',
96
+ path,
97
+ flags=re.IGNORECASE
98
+ )
99
+
100
+ # Replace numeric IDs
101
+ path = re.sub(r'/\d+', '/{id}', path)
102
+
103
+ # Limit cardinality for unknown paths
104
+ if path.count('/') > 5:
105
+ path = '/unknown/deep/path'
106
+
107
+ return path
108
+
109
+ def _record_request_metrics(
110
+ self,
111
+ method: str,
112
+ path: str,
113
+ status_code: int,
114
+ duration: float,
115
+ error: bool = False
116
+ ):
117
+ """Record HTTP request metrics."""
118
+ # HTTP request duration histogram
119
+ metrics_manager.observe_histogram(
120
+ "cidadao_ai_request_duration_seconds",
121
+ duration,
122
+ labels={
123
+ "method": method.upper(),
124
+ "endpoint": path,
125
+ "status_code": str(status_code)
126
+ }
127
+ )
128
+
129
+ # HTTP request counter
130
+ metrics_manager.increment_counter(
131
+ "cidadao_ai_http_requests_total",
132
+ labels={
133
+ "method": method.upper(),
134
+ "endpoint": path,
135
+ "status_code": str(status_code),
136
+ "status": "error" if error or status_code >= 400 else "success"
137
+ }
138
+ )
139
+
140
+ # Error rate tracking
141
+ if error or status_code >= 400:
142
+ metrics_manager.increment_counter(
143
+ "cidadao_ai_http_errors_total",
144
+ labels={
145
+ "method": method.upper(),
146
+ "endpoint": path,
147
+ "status_code": str(status_code),
148
+ "error_type": self._get_error_type(status_code)
149
+ }
150
+ )
151
+
152
+ # Track slow requests
153
+ if duration > 5.0: # Requests taking more than 5 seconds
154
+ metrics_manager.increment_counter(
155
+ "cidadao_ai_slow_requests_total",
156
+ labels={
157
+ "method": method.upper(),
158
+ "endpoint": path,
159
+ "duration_bucket": self._get_duration_bucket(duration)
160
+ }
161
+ )
162
+
163
+ # Update concurrent requests gauge (simplified - in production use proper tracking)
164
+ active_requests = getattr(self, '_active_requests', 0)
165
+ metrics_manager.set_gauge(
166
+ "cidadao_ai_http_requests_in_progress",
167
+ active_requests,
168
+ labels={"method": method.upper()}
169
+ )
170
+
171
+ def _get_error_type(self, status_code: int) -> str:
172
+ """Categorize error types based on status code."""
173
+ if 400 <= status_code < 500:
174
+ return "client_error"
175
+ elif 500 <= status_code < 600:
176
+ return "server_error"
177
+ else:
178
+ return "unknown_error"
179
+
180
+ def _get_duration_bucket(self, duration: float) -> str:
181
+ """Categorize request duration into buckets."""
182
+ if duration < 1:
183
+ return "0-1s"
184
+ elif duration < 5:
185
+ return "1-5s"
186
+ elif duration < 10:
187
+ return "5-10s"
188
+ elif duration < 30:
189
+ return "10-30s"
190
+ else:
191
+ return "30s+"
192
+
193
+
194
+ def setup_http_metrics():
195
+ """Setup HTTP-specific metrics if not already registered."""
196
+ # HTTP requests total counter
197
+ try:
198
+ from src.infrastructure.observability.metrics import MetricConfig, MetricType
199
+
200
+ metrics_manager.register_metric(
201
+ MetricConfig(
202
+ name="cidadao_ai_http_requests_total",
203
+ description="Total HTTP requests received",
204
+ labels=["method", "endpoint", "status_code", "status"]
205
+ ),
206
+ MetricType.COUNTER
207
+ )
208
+
209
+ metrics_manager.register_metric(
210
+ MetricConfig(
211
+ name="cidadao_ai_http_errors_total",
212
+ description="Total HTTP errors",
213
+ labels=["method", "endpoint", "status_code", "error_type"]
214
+ ),
215
+ MetricType.COUNTER
216
+ )
217
+
218
+ metrics_manager.register_metric(
219
+ MetricConfig(
220
+ name="cidadao_ai_slow_requests_total",
221
+ description="Total slow HTTP requests",
222
+ labels=["method", "endpoint", "duration_bucket"]
223
+ ),
224
+ MetricType.COUNTER
225
+ )
226
+
227
+ metrics_manager.register_metric(
228
+ MetricConfig(
229
+ name="cidadao_ai_http_requests_in_progress",
230
+ description="HTTP requests currently being processed",
231
+ labels=["method"]
232
+ ),
233
+ MetricType.GAUGE
234
+ )
235
+
236
+ logger.info("HTTP metrics initialized")
237
+ except Exception as e:
238
+ logger.warning(f"Some HTTP metrics may already be registered: {e}")
src/api/routes/investigations.py CHANGED
@@ -19,6 +19,7 @@ from src.core import get_logger
19
  from src.agents import InvestigatorAgent, AgentContext
20
  from src.api.middleware.authentication import get_current_user
21
  from src.tools import TransparencyAPIFilter
 
22
 
23
 
24
  logger = get_logger(__name__)
@@ -105,6 +106,8 @@ _active_investigations: Dict[str, Dict[str, Any]] = {}
105
 
106
 
107
  @router.post("/start", response_model=Dict[str, str])
 
 
108
  async def start_investigation(
109
  request: InvestigationRequest,
110
  background_tasks: BackgroundTasks,
@@ -150,6 +153,13 @@ async def start_investigation(
150
  user_id=current_user.get("user_id"),
151
  )
152
 
 
 
 
 
 
 
 
153
  return {
154
  "investigation_id": investigation_id,
155
  "status": "started",
@@ -394,6 +404,7 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
394
  This function runs the actual anomaly detection using InvestigatorAgent.
395
  """
396
  investigation = _active_investigations[investigation_id]
 
397
 
398
  try:
399
  # Update status
@@ -462,6 +473,9 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
462
  investigation["progress"] = 1.0
463
  investigation["current_phase"] = "completed"
464
 
 
 
 
465
  logger.info(
466
  "investigation_completed",
467
  investigation_id=investigation_id,
@@ -469,6 +483,23 @@ async def _run_investigation(investigation_id: str, request: InvestigationReques
469
  records_analyzed=investigation["records_processed"],
470
  )
471
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
472
  except Exception as e:
473
  logger.error(
474
  "investigation_failed",
 
19
  from src.agents import InvestigatorAgent, AgentContext
20
  from src.api.middleware.authentication import get_current_user
21
  from src.tools import TransparencyAPIFilter
22
+ from src.infrastructure.observability.metrics import track_time, count_calls, BusinessMetrics
23
 
24
 
25
  logger = get_logger(__name__)
 
106
 
107
 
108
  @router.post("/start", response_model=Dict[str, str])
109
+ @count_calls("cidadao_ai_investigation_requests_total", labels={"operation": "start"})
110
+ @track_time("cidadao_ai_investigation_start_duration_seconds")
111
  async def start_investigation(
112
  request: InvestigationRequest,
113
  background_tasks: BackgroundTasks,
 
153
  user_id=current_user.get("user_id"),
154
  )
155
 
156
+ # Track business metrics
157
+ BusinessMetrics.record_investigation_created(
158
+ priority="medium",
159
+ user_type="authenticated"
160
+ )
161
+ BusinessMetrics.update_active_investigations(len(_active_investigations))
162
+
163
  return {
164
  "investigation_id": investigation_id,
165
  "status": "started",
 
404
  This function runs the actual anomaly detection using InvestigatorAgent.
405
  """
406
  investigation = _active_investigations[investigation_id]
407
+ start_time = datetime.utcnow()
408
 
409
  try:
410
  # Update status
 
473
  investigation["progress"] = 1.0
474
  investigation["current_phase"] = "completed"
475
 
476
+ # Calculate duration
477
+ duration = (datetime.utcnow() - start_time).total_seconds()
478
+
479
  logger.info(
480
  "investigation_completed",
481
  investigation_id=investigation_id,
 
483
  records_analyzed=investigation["records_processed"],
484
  )
485
 
486
+ # Track business metrics
487
+ BusinessMetrics.record_investigation_completed(
488
+ investigation_type=request.data_source,
489
+ duration_seconds=duration,
490
+ priority="medium"
491
+ )
492
+ BusinessMetrics.update_active_investigations(len(_active_investigations) - 1)
493
+
494
+ # Track anomalies found
495
+ for result in results:
496
+ BusinessMetrics.record_anomaly_detected(
497
+ anomaly_type=result.anomaly_type,
498
+ severity=result.severity,
499
+ data_source=request.data_source,
500
+ confidence_score=result.confidence
501
+ )
502
+
503
  except Exception as e:
504
  logger.error(
505
  "investigation_failed",