|
|
""" |
|
|
Sistema de Monitoramento e Observabilidade Enterprise |
|
|
OpenTelemetry, Prometheus, Distributed Tracing, Health Checks Avançados |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
import logging |
|
|
import threading |
|
|
from typing import Dict, List, Optional, Any, Callable, Union |
|
|
from datetime import datetime, timedelta |
|
|
from contextlib import asynccontextmanager |
|
|
from functools import wraps |
|
|
from src.core import json_utils |
|
|
import psutil |
|
|
import traceback |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
try: |
|
|
from opentelemetry import trace, metrics |
|
|
from opentelemetry.exporter.jaeger.thrift import JaegerExporter |
|
|
from opentelemetry.exporter.prometheus import PrometheusMetricReader |
|
|
from opentelemetry.sdk.trace import TracerProvider |
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor |
|
|
from opentelemetry.sdk.metrics import MeterProvider |
|
|
from opentelemetry.sdk.resources import Resource |
|
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor |
|
|
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor |
|
|
from opentelemetry.instrumentation.redis import RedisInstrumentor |
|
|
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor |
|
|
OPENTELEMETRY_AVAILABLE = True |
|
|
except ImportError: |
|
|
|
|
|
OPENTELEMETRY_AVAILABLE = False |
|
|
from src.core.monitoring_minimal import MockTracer as trace |
|
|
|
|
|
class MockInstrumentor: |
|
|
@staticmethod |
|
|
def instrument(*args, **kwargs): |
|
|
pass |
|
|
|
|
|
FastAPIInstrumentor = HTTPXClientInstrumentor = RedisInstrumentor = SQLAlchemyInstrumentor = MockInstrumentor |
|
|
|
|
|
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry, generate_latest |
|
|
from pydantic import BaseModel, Field |
|
|
import structlog |
|
|
|
|
|
logger = structlog.get_logger(__name__) |
|
|
|
|
|
|
|
|
class HealthStatus(Enum): |
|
|
"""Status de saúde dos componentes""" |
|
|
HEALTHY = "healthy" |
|
|
DEGRADED = "degraded" |
|
|
UNHEALTHY = "unhealthy" |
|
|
UNKNOWN = "unknown" |
|
|
|
|
|
|
|
|
class MetricType(Enum): |
|
|
"""Tipos de métricas""" |
|
|
COUNTER = "counter" |
|
|
HISTOGRAM = "histogram" |
|
|
GAUGE = "gauge" |
|
|
SUMMARY = "summary" |
|
|
|
|
|
|
|
|
class MonitoringConfig(BaseModel): |
|
|
"""Configuração do sistema de monitoramento""" |
|
|
|
|
|
|
|
|
service_name: str = "cidadao-ai" |
|
|
service_version: str = "1.0.0" |
|
|
environment: str = "production" |
|
|
|
|
|
|
|
|
jaeger_endpoint: str = "http://localhost:14268/api/traces" |
|
|
enable_tracing: bool = True |
|
|
trace_sample_rate: float = 1.0 |
|
|
|
|
|
|
|
|
prometheus_port: int = 8000 |
|
|
enable_metrics: bool = True |
|
|
metrics_path: str = "/metrics" |
|
|
|
|
|
|
|
|
health_check_interval: int = 30 |
|
|
health_check_timeout: int = 5 |
|
|
enable_deep_health_checks: bool = True |
|
|
|
|
|
|
|
|
slow_query_threshold_ms: float = 1000.0 |
|
|
high_memory_threshold_mb: float = 1024.0 |
|
|
high_cpu_threshold_percent: float = 80.0 |
|
|
|
|
|
|
|
|
enable_alerting: bool = True |
|
|
alert_webhook_url: Optional[str] = None |
|
|
|
|
|
|
|
|
class PerformanceMetrics(BaseModel): |
|
|
"""Métricas de performance do sistema""" |
|
|
|
|
|
|
|
|
cpu_usage_percent: float |
|
|
memory_usage_mb: float |
|
|
memory_usage_percent: float |
|
|
disk_usage_percent: float |
|
|
|
|
|
|
|
|
active_investigations: int |
|
|
total_requests: int |
|
|
failed_requests: int |
|
|
average_response_time_ms: float |
|
|
|
|
|
|
|
|
ml_inference_time_ms: float |
|
|
anomalies_detected: int |
|
|
detection_accuracy: float |
|
|
|
|
|
|
|
|
db_connections_active: int |
|
|
db_query_time_ms: float |
|
|
cache_hit_rate: float |
|
|
|
|
|
|
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow) |
|
|
|
|
|
|
|
|
class AlertSeverity(Enum): |
|
|
"""Severidade dos alertas""" |
|
|
INFO = "info" |
|
|
WARNING = "warning" |
|
|
ERROR = "error" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
class Alert(BaseModel): |
|
|
"""Modelo de alerta""" |
|
|
|
|
|
id: str |
|
|
title: str |
|
|
description: str |
|
|
severity: AlertSeverity |
|
|
component: str |
|
|
metric_name: str |
|
|
metric_value: float |
|
|
threshold: float |
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow) |
|
|
resolved: bool = False |
|
|
resolution_time: Optional[datetime] = None |
|
|
|
|
|
|
|
|
class HealthCheck(BaseModel): |
|
|
"""Resultado de health check""" |
|
|
|
|
|
component: str |
|
|
status: HealthStatus |
|
|
details: Dict[str, Any] = Field(default_factory=dict) |
|
|
latency_ms: Optional[float] = None |
|
|
last_check: datetime = Field(default_factory=datetime.utcnow) |
|
|
error_message: Optional[str] = None |
|
|
|
|
|
|
|
|
class ObservabilityManager: |
|
|
"""Gerenciador avançado de observabilidade e monitoramento""" |
|
|
|
|
|
def __init__(self, config: MonitoringConfig): |
|
|
self.config = config |
|
|
self.tracer = None |
|
|
self.meter = None |
|
|
self.registry = CollectorRegistry() |
|
|
|
|
|
|
|
|
self.health_checks: Dict[str, HealthCheck] = {} |
|
|
self.health_check_functions: Dict[str, Callable] = {} |
|
|
|
|
|
|
|
|
self.metrics: Dict[str, Any] = {} |
|
|
self.performance_history: List[PerformanceMetrics] = [] |
|
|
|
|
|
|
|
|
self.active_alerts: Dict[str, Alert] = {} |
|
|
self.alert_history: List[Alert] = [] |
|
|
|
|
|
|
|
|
self.request_times: List[float] = [] |
|
|
self.ml_inference_times: List[float] = [] |
|
|
|
|
|
self._monitoring_task = None |
|
|
self._initialized = False |
|
|
|
|
|
async def initialize(self) -> bool: |
|
|
"""Inicializar sistema de monitoramento""" |
|
|
|
|
|
try: |
|
|
logger.info("Inicializando sistema de observabilidade...") |
|
|
|
|
|
|
|
|
await self._setup_tracing() |
|
|
|
|
|
|
|
|
await self._setup_metrics() |
|
|
|
|
|
|
|
|
await self._setup_health_checks() |
|
|
|
|
|
|
|
|
await self._start_monitoring_loop() |
|
|
|
|
|
self._initialized = True |
|
|
logger.info("✅ Sistema de observabilidade inicializado") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Falha na inicialização do monitoramento: {e}") |
|
|
return False |
|
|
|
|
|
async def _setup_tracing(self): |
|
|
"""Configurar distributed tracing""" |
|
|
|
|
|
if not self.config.enable_tracing: |
|
|
return |
|
|
|
|
|
|
|
|
resource = Resource.create({ |
|
|
"service.name": self.config.service_name, |
|
|
"service.version": self.config.service_version, |
|
|
"deployment.environment": self.config.environment |
|
|
}) |
|
|
|
|
|
|
|
|
trace.set_tracer_provider(TracerProvider(resource=resource)) |
|
|
|
|
|
|
|
|
jaeger_exporter = JaegerExporter( |
|
|
endpoint=self.config.jaeger_endpoint |
|
|
) |
|
|
|
|
|
|
|
|
span_processor = BatchSpanProcessor(jaeger_exporter) |
|
|
trace.get_tracer_provider().add_span_processor(span_processor) |
|
|
|
|
|
|
|
|
self.tracer = trace.get_tracer(__name__) |
|
|
|
|
|
|
|
|
FastAPIInstrumentor.instrument() |
|
|
HTTPXClientInstrumentor.instrument() |
|
|
RedisInstrumentor.instrument() |
|
|
SQLAlchemyInstrumentor.instrument() |
|
|
|
|
|
logger.info("✅ Distributed tracing configurado") |
|
|
|
|
|
async def _setup_metrics(self): |
|
|
"""Configurar métricas Prometheus""" |
|
|
|
|
|
if not self.config.enable_metrics: |
|
|
return |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
|
|
|
"http_requests_total": Counter( |
|
|
"http_requests_total", |
|
|
"Total HTTP requests", |
|
|
["method", "endpoint", "status"], |
|
|
registry=self.registry |
|
|
), |
|
|
"http_request_duration": Histogram( |
|
|
"http_request_duration_seconds", |
|
|
"HTTP request duration", |
|
|
["method", "endpoint"], |
|
|
registry=self.registry |
|
|
), |
|
|
|
|
|
|
|
|
"ml_inference_duration": Histogram( |
|
|
"ml_inference_duration_seconds", |
|
|
"ML inference duration", |
|
|
["model", "task"], |
|
|
registry=self.registry |
|
|
), |
|
|
"anomalies_detected_total": Counter( |
|
|
"anomalies_detected_total", |
|
|
"Total anomalies detected", |
|
|
["severity"], |
|
|
registry=self.registry |
|
|
), |
|
|
|
|
|
|
|
|
"cpu_usage_percent": Gauge( |
|
|
"cpu_usage_percent", |
|
|
"CPU usage percentage", |
|
|
registry=self.registry |
|
|
), |
|
|
"memory_usage_bytes": Gauge( |
|
|
"memory_usage_bytes", |
|
|
"Memory usage in bytes", |
|
|
registry=self.registry |
|
|
), |
|
|
|
|
|
|
|
|
"active_investigations": Gauge( |
|
|
"active_investigations", |
|
|
"Number of active investigations", |
|
|
registry=self.registry |
|
|
), |
|
|
"investigation_duration": Histogram( |
|
|
"investigation_duration_seconds", |
|
|
"Investigation duration", |
|
|
["status"], |
|
|
registry=self.registry |
|
|
), |
|
|
|
|
|
|
|
|
"db_connections_active": Gauge( |
|
|
"db_connections_active", |
|
|
"Active database connections", |
|
|
registry=self.registry |
|
|
), |
|
|
"cache_hit_rate": Gauge( |
|
|
"cache_hit_rate", |
|
|
"Cache hit rate", |
|
|
["cache_type"], |
|
|
registry=self.registry |
|
|
) |
|
|
} |
|
|
|
|
|
logger.info("✅ Métricas Prometheus configuradas") |
|
|
|
|
|
async def _setup_health_checks(self): |
|
|
"""Configurar health checks""" |
|
|
|
|
|
|
|
|
self.register_health_check("system", self._check_system_health) |
|
|
self.register_health_check("database", self._check_database_health) |
|
|
self.register_health_check("redis", self._check_redis_health) |
|
|
self.register_health_check("ml_models", self._check_ml_models_health) |
|
|
|
|
|
logger.info("✅ Health checks configurados") |
|
|
|
|
|
async def _start_monitoring_loop(self): |
|
|
"""Iniciar loop de monitoramento contínuo""" |
|
|
|
|
|
async def monitoring_loop(): |
|
|
while True: |
|
|
try: |
|
|
await self._collect_performance_metrics() |
|
|
await self._run_health_checks() |
|
|
await self._check_alerts() |
|
|
await asyncio.sleep(self.config.health_check_interval) |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Erro no loop de monitoramento: {e}") |
|
|
await asyncio.sleep(5) |
|
|
|
|
|
self._monitoring_task = asyncio.create_task(monitoring_loop()) |
|
|
logger.info("✅ Loop de monitoramento iniciado") |
|
|
|
|
|
def register_health_check(self, name: str, check_function: Callable): |
|
|
"""Registrar função de health check""" |
|
|
self.health_check_functions[name] = check_function |
|
|
logger.info(f"✅ Health check '{name}' registrado") |
|
|
|
|
|
async def _run_health_checks(self): |
|
|
"""Executar todos os health checks""" |
|
|
|
|
|
for name, check_function in self.health_check_functions.items(): |
|
|
try: |
|
|
start_time = time.time() |
|
|
result = await check_function() |
|
|
latency = (time.time() - start_time) * 1000 |
|
|
|
|
|
if isinstance(result, dict): |
|
|
status = result.get("status", HealthStatus.UNKNOWN) |
|
|
details = result.get("details", {}) |
|
|
error_message = result.get("error") |
|
|
else: |
|
|
status = HealthStatus.HEALTHY if result else HealthStatus.UNHEALTHY |
|
|
details = {} |
|
|
error_message = None |
|
|
|
|
|
self.health_checks[name] = HealthCheck( |
|
|
component=name, |
|
|
status=status, |
|
|
details=details, |
|
|
latency_ms=round(latency, 2), |
|
|
error_message=error_message |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.health_checks[name] = HealthCheck( |
|
|
component=name, |
|
|
status=HealthStatus.UNHEALTHY, |
|
|
error_message=str(e), |
|
|
latency_ms=None |
|
|
) |
|
|
|
|
|
async def _check_system_health(self) -> Dict[str, Any]: |
|
|
"""Health check do sistema""" |
|
|
|
|
|
try: |
|
|
cpu_percent = psutil.cpu_percent(interval=1) |
|
|
memory = psutil.virtual_memory() |
|
|
disk = psutil.disk_usage('/') |
|
|
|
|
|
|
|
|
if "cpu_usage_percent" in self.metrics: |
|
|
self.metrics["cpu_usage_percent"].set(cpu_percent) |
|
|
|
|
|
if "memory_usage_bytes" in self.metrics: |
|
|
self.metrics["memory_usage_bytes"].set(memory.used) |
|
|
|
|
|
|
|
|
status = HealthStatus.HEALTHY |
|
|
if cpu_percent > self.config.high_cpu_threshold_percent: |
|
|
status = HealthStatus.DEGRADED |
|
|
if memory.percent > 90: |
|
|
status = HealthStatus.UNHEALTHY |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"details": { |
|
|
"cpu_percent": cpu_percent, |
|
|
"memory_percent": memory.percent, |
|
|
"disk_percent": disk.percent, |
|
|
"load_average": psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"status": HealthStatus.UNHEALTHY, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def _check_database_health(self) -> Dict[str, Any]: |
|
|
"""Health check do banco de dados""" |
|
|
|
|
|
try: |
|
|
|
|
|
from .database import get_database_manager |
|
|
|
|
|
db = await get_database_manager() |
|
|
health_status = await db.get_health_status() |
|
|
|
|
|
|
|
|
pg_healthy = health_status["postgresql"]["status"] == "healthy" |
|
|
redis_healthy = health_status["redis"]["status"] == "healthy" |
|
|
|
|
|
if pg_healthy and redis_healthy: |
|
|
status = HealthStatus.HEALTHY |
|
|
elif pg_healthy or redis_healthy: |
|
|
status = HealthStatus.DEGRADED |
|
|
else: |
|
|
status = HealthStatus.UNHEALTHY |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"details": health_status |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"status": HealthStatus.UNHEALTHY, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def _check_redis_health(self) -> Dict[str, Any]: |
|
|
"""Health check específico do Redis""" |
|
|
|
|
|
try: |
|
|
from .database import get_database_manager |
|
|
|
|
|
db = await get_database_manager() |
|
|
start_time = time.time() |
|
|
await db.redis_cluster.ping() |
|
|
latency = (time.time() - start_time) * 1000 |
|
|
|
|
|
status = HealthStatus.HEALTHY if latency < 100 else HealthStatus.DEGRADED |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"details": { |
|
|
"latency_ms": round(latency, 2), |
|
|
"connection_pool": "active" |
|
|
} |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"status": HealthStatus.UNHEALTHY, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def _check_ml_models_health(self) -> Dict[str, Any]: |
|
|
"""Health check dos modelos ML""" |
|
|
|
|
|
try: |
|
|
|
|
|
from ..ml.hf_integration import get_cidadao_manager |
|
|
|
|
|
manager = get_cidadao_manager() |
|
|
model_info = manager.get_model_info() |
|
|
|
|
|
if model_info.get("status") == "loaded": |
|
|
status = HealthStatus.HEALTHY |
|
|
else: |
|
|
status = HealthStatus.UNHEALTHY |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"details": model_info |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"status": HealthStatus.UNHEALTHY, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
async def _collect_performance_metrics(self): |
|
|
"""Coletar métricas de performance""" |
|
|
|
|
|
try: |
|
|
|
|
|
cpu_percent = psutil.cpu_percent() |
|
|
memory = psutil.virtual_memory() |
|
|
disk = psutil.disk_usage('/') |
|
|
|
|
|
|
|
|
avg_response_time = sum(self.request_times[-100:]) / len(self.request_times[-100:]) if self.request_times else 0 |
|
|
avg_ml_time = sum(self.ml_inference_times[-50:]) / len(self.ml_inference_times[-50:]) if self.ml_inference_times else 0 |
|
|
|
|
|
|
|
|
metrics = PerformanceMetrics( |
|
|
cpu_usage_percent=cpu_percent, |
|
|
memory_usage_mb=memory.used / (1024 * 1024), |
|
|
memory_usage_percent=memory.percent, |
|
|
disk_usage_percent=disk.percent, |
|
|
active_investigations=len(getattr(self, '_active_investigations', [])), |
|
|
total_requests=len(self.request_times), |
|
|
failed_requests=0, |
|
|
average_response_time_ms=avg_response_time * 1000, |
|
|
ml_inference_time_ms=avg_ml_time * 1000, |
|
|
anomalies_detected=0, |
|
|
detection_accuracy=0.0, |
|
|
db_connections_active=0, |
|
|
db_query_time_ms=0.0, |
|
|
cache_hit_rate=0.0 |
|
|
) |
|
|
|
|
|
|
|
|
self.performance_history.append(metrics) |
|
|
|
|
|
|
|
|
if len(self.performance_history) > 1000: |
|
|
self.performance_history = self.performance_history[-1000:] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Erro ao coletar métricas: {e}") |
|
|
|
|
|
async def _check_alerts(self): |
|
|
"""Verificar condições de alerta""" |
|
|
|
|
|
if not self.performance_history: |
|
|
return |
|
|
|
|
|
latest_metrics = self.performance_history[-1] |
|
|
|
|
|
|
|
|
if latest_metrics.cpu_usage_percent > self.config.high_cpu_threshold_percent: |
|
|
await self._trigger_alert( |
|
|
"high_cpu", |
|
|
"High CPU Usage", |
|
|
f"CPU usage is {latest_metrics.cpu_usage_percent:.1f}%", |
|
|
AlertSeverity.WARNING, |
|
|
"system", |
|
|
"cpu_usage_percent", |
|
|
latest_metrics.cpu_usage_percent, |
|
|
self.config.high_cpu_threshold_percent |
|
|
) |
|
|
|
|
|
|
|
|
if latest_metrics.memory_usage_percent > 85: |
|
|
await self._trigger_alert( |
|
|
"high_memory", |
|
|
"High Memory Usage", |
|
|
f"Memory usage is {latest_metrics.memory_usage_percent:.1f}%", |
|
|
AlertSeverity.ERROR, |
|
|
"system", |
|
|
"memory_usage_percent", |
|
|
latest_metrics.memory_usage_percent, |
|
|
85.0 |
|
|
) |
|
|
|
|
|
|
|
|
if latest_metrics.average_response_time_ms > self.config.slow_query_threshold_ms: |
|
|
await self._trigger_alert( |
|
|
"slow_response", |
|
|
"Slow Response Time", |
|
|
f"Average response time is {latest_metrics.average_response_time_ms:.1f}ms", |
|
|
AlertSeverity.WARNING, |
|
|
"api", |
|
|
"average_response_time_ms", |
|
|
latest_metrics.average_response_time_ms, |
|
|
self.config.slow_query_threshold_ms |
|
|
) |
|
|
|
|
|
async def _trigger_alert(self, alert_id: str, title: str, description: str, |
|
|
severity: AlertSeverity, component: str, |
|
|
metric_name: str, metric_value: float, threshold: float): |
|
|
"""Disparar alerta""" |
|
|
|
|
|
|
|
|
if alert_id in self.active_alerts: |
|
|
return |
|
|
|
|
|
alert = Alert( |
|
|
id=alert_id, |
|
|
title=title, |
|
|
description=description, |
|
|
severity=severity, |
|
|
component=component, |
|
|
metric_name=metric_name, |
|
|
metric_value=metric_value, |
|
|
threshold=threshold |
|
|
) |
|
|
|
|
|
self.active_alerts[alert_id] = alert |
|
|
self.alert_history.append(alert) |
|
|
|
|
|
logger.warning(f"🚨 ALERTA: {title} - {description}") |
|
|
|
|
|
|
|
|
if self.config.alert_webhook_url: |
|
|
await self._send_alert_webhook(alert) |
|
|
|
|
|
async def _send_alert_webhook(self, alert: Alert): |
|
|
"""Enviar alerta via webhook""" |
|
|
|
|
|
try: |
|
|
import httpx |
|
|
|
|
|
payload = { |
|
|
"alert_id": alert.id, |
|
|
"title": alert.title, |
|
|
"description": alert.description, |
|
|
"severity": alert.severity.value, |
|
|
"component": alert.component, |
|
|
"timestamp": alert.timestamp.isoformat(), |
|
|
"metric": { |
|
|
"name": alert.metric_name, |
|
|
"value": alert.metric_value, |
|
|
"threshold": alert.threshold |
|
|
} |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
response = await client.post( |
|
|
self.config.alert_webhook_url, |
|
|
json=payload, |
|
|
timeout=10.0 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
logger.info(f"✅ Alerta {alert.id} enviado via webhook") |
|
|
else: |
|
|
logger.error(f"❌ Falha ao enviar alerta via webhook: {response.status_code}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Erro ao enviar webhook: {e}") |
|
|
|
|
|
@asynccontextmanager |
|
|
async def trace_span(self, name: str, attributes: Dict[str, Any] = None): |
|
|
"""Context manager para criar spans de tracing""" |
|
|
|
|
|
if not self.tracer: |
|
|
yield None |
|
|
return |
|
|
|
|
|
with self.tracer.start_as_current_span(name) as span: |
|
|
if attributes: |
|
|
for key, value in attributes.items(): |
|
|
span.set_attribute(key, value) |
|
|
yield span |
|
|
|
|
|
def track_request_time(self, duration_seconds: float): |
|
|
"""Rastrear tempo de request""" |
|
|
self.request_times.append(duration_seconds) |
|
|
|
|
|
|
|
|
if len(self.request_times) > 1000: |
|
|
self.request_times = self.request_times[-1000:] |
|
|
|
|
|
def track_ml_inference_time(self, duration_seconds: float, model: str = "cidadao-gpt"): |
|
|
"""Rastrear tempo de inferência ML""" |
|
|
self.ml_inference_times.append(duration_seconds) |
|
|
|
|
|
|
|
|
if "ml_inference_duration" in self.metrics: |
|
|
self.metrics["ml_inference_duration"].labels( |
|
|
model=model, |
|
|
task="inference" |
|
|
).observe(duration_seconds) |
|
|
|
|
|
|
|
|
if len(self.ml_inference_times) > 500: |
|
|
self.ml_inference_times = self.ml_inference_times[-500:] |
|
|
|
|
|
def increment_anomaly_count(self, severity: str = "medium"): |
|
|
"""Incrementar contador de anomalias""" |
|
|
if "anomalies_detected_total" in self.metrics: |
|
|
self.metrics["anomalies_detected_total"].labels(severity=severity).inc() |
|
|
|
|
|
async def get_health_summary(self) -> Dict[str, Any]: |
|
|
"""Obter resumo de saúde do sistema""" |
|
|
|
|
|
overall_status = HealthStatus.HEALTHY |
|
|
|
|
|
|
|
|
for component, health in self.health_checks.items(): |
|
|
if health.status == HealthStatus.UNHEALTHY: |
|
|
overall_status = HealthStatus.UNHEALTHY |
|
|
break |
|
|
elif health.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY: |
|
|
overall_status = HealthStatus.DEGRADED |
|
|
|
|
|
return { |
|
|
"overall_status": overall_status.value, |
|
|
"components": {name: health.dict() for name, health in self.health_checks.items()}, |
|
|
"active_alerts": len(self.active_alerts), |
|
|
"last_check": datetime.utcnow().isoformat(), |
|
|
"uptime_seconds": time.time() - getattr(self, '_start_time', time.time()) |
|
|
} |
|
|
|
|
|
async def get_metrics_summary(self) -> Dict[str, Any]: |
|
|
"""Obter resumo de métricas""" |
|
|
|
|
|
if not self.performance_history: |
|
|
return {"error": "No metrics available"} |
|
|
|
|
|
latest = self.performance_history[-1] |
|
|
|
|
|
return { |
|
|
"timestamp": latest.timestamp.isoformat(), |
|
|
"system": { |
|
|
"cpu_usage_percent": latest.cpu_usage_percent, |
|
|
"memory_usage_mb": latest.memory_usage_mb, |
|
|
"memory_usage_percent": latest.memory_usage_percent, |
|
|
"disk_usage_percent": latest.disk_usage_percent |
|
|
}, |
|
|
"application": { |
|
|
"active_investigations": latest.active_investigations, |
|
|
"total_requests": latest.total_requests, |
|
|
"average_response_time_ms": latest.average_response_time_ms, |
|
|
"ml_inference_time_ms": latest.ml_inference_time_ms |
|
|
}, |
|
|
"alerts": { |
|
|
"active_count": len(self.active_alerts), |
|
|
"total_count": len(self.alert_history) |
|
|
} |
|
|
} |
|
|
|
|
|
def get_prometheus_metrics(self) -> str: |
|
|
"""Obter métricas no formato Prometheus""" |
|
|
return generate_latest(self.registry) |
|
|
|
|
|
async def cleanup(self): |
|
|
"""Cleanup de recursos""" |
|
|
|
|
|
try: |
|
|
if self._monitoring_task: |
|
|
self._monitoring_task.cancel() |
|
|
try: |
|
|
await self._monitoring_task |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
|
|
|
logger.info("✅ Cleanup do sistema de monitoramento concluído") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Erro no cleanup: {e}") |
|
|
|
|
|
|
|
|
|
|
|
_monitoring_manager: Optional[ObservabilityManager] = None |
|
|
|
|
|
async def get_monitoring_manager() -> ObservabilityManager: |
|
|
"""Obter instância singleton do monitoring manager""" |
|
|
|
|
|
global _monitoring_manager |
|
|
|
|
|
if _monitoring_manager is None or not _monitoring_manager._initialized: |
|
|
config = MonitoringConfig() |
|
|
_monitoring_manager = ObservabilityManager(config) |
|
|
await _monitoring_manager.initialize() |
|
|
|
|
|
return _monitoring_manager |
|
|
|
|
|
|
|
|
def trace_async(span_name: str = None, attributes: Dict[str, Any] = None): |
|
|
"""Decorator para tracing automático de funções async""" |
|
|
|
|
|
def decorator(func): |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
monitoring = await get_monitoring_manager() |
|
|
name = span_name or f"{func.__module__}.{func.__name__}" |
|
|
|
|
|
async with monitoring.trace_span(name, attributes) as span: |
|
|
try: |
|
|
start_time = time.time() |
|
|
result = await func(*args, **kwargs) |
|
|
duration = time.time() - start_time |
|
|
|
|
|
if span: |
|
|
span.set_attribute("duration_seconds", duration) |
|
|
span.set_attribute("success", True) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
if span: |
|
|
span.set_attribute("error", True) |
|
|
span.set_attribute("error_message", str(e)) |
|
|
raise |
|
|
|
|
|
return wrapper |
|
|
return decorator |
|
|
|
|
|
|
|
|
async def cleanup_monitoring(): |
|
|
"""Cleanup global do sistema de monitoramento""" |
|
|
|
|
|
global _monitoring_manager |
|
|
|
|
|
if _monitoring_manager: |
|
|
await _monitoring_manager.cleanup() |
|
|
_monitoring_manager = None |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
import asyncio |
|
|
|
|
|
async def test_monitoring_system(): |
|
|
"""Teste completo do sistema de monitoramento""" |
|
|
|
|
|
print("🧪 Testando sistema de monitoramento...") |
|
|
|
|
|
|
|
|
monitoring = await get_monitoring_manager() |
|
|
|
|
|
|
|
|
monitoring.track_request_time(0.15) |
|
|
monitoring.track_ml_inference_time(0.5) |
|
|
monitoring.increment_anomaly_count("high") |
|
|
|
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
health = await monitoring.get_health_summary() |
|
|
print(f"✅ Health summary: {health['overall_status']}") |
|
|
|
|
|
|
|
|
metrics = await monitoring.get_metrics_summary() |
|
|
print(f"✅ Metrics summary: {metrics.get('system', {}).get('cpu_usage_percent', 'N/A')}% CPU") |
|
|
|
|
|
|
|
|
@trace_async("test_function") |
|
|
async def test_traced_function(): |
|
|
await asyncio.sleep(0.1) |
|
|
return "success" |
|
|
|
|
|
result = await test_traced_function() |
|
|
print(f"✅ Traced function result: {result}") |
|
|
|
|
|
|
|
|
await cleanup_monitoring() |
|
|
print("✅ Teste concluído!") |
|
|
|
|
|
asyncio.run(test_monitoring_system()) |