anderson-ufrj
feat: implement comprehensive monitoring and observability stack
c97e35f
"""
SLA/SLO monitoring system for tracking service level objectives.
This module provides comprehensive SLA/SLO tracking with automated
alerting and reporting capabilities.
"""
import asyncio
import time
from typing import Dict, Any, List, Optional, Callable, Union
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass, field
from collections import deque, defaultdict
import statistics
from src.core import get_logger
from src.infrastructure.observability import BusinessMetrics, get_structured_logger
logger = get_structured_logger(__name__, component="slo_monitor")
class SLOType(str, Enum):
"""Types of SLO metrics."""
AVAILABILITY = "availability"
LATENCY = "latency"
ERROR_RATE = "error_rate"
THROUGHPUT = "throughput"
CUSTOM = "custom"
class AlertSeverity(str, Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class TimeWindow(str, Enum):
"""Time windows for SLO calculations."""
MINUTE_1 = "1m"
MINUTE_5 = "5m"
MINUTE_15 = "15m"
HOUR_1 = "1h"
HOUR_4 = "4h"
HOUR_24 = "24h"
DAY_7 = "7d"
DAY_30 = "30d"
@dataclass
class SLOTarget:
"""SLO target definition."""
name: str
slo_type: SLOType
target_value: float # Target threshold (e.g., 99.9 for 99.9% availability)
time_window: TimeWindow
description: str
warning_threshold: float = 95.0 # % of target that triggers warning
critical_threshold: float = 90.0 # % of target that triggers critical alert
def get_warning_value(self) -> float:
"""Get warning threshold value."""
return self.target_value * (self.warning_threshold / 100)
def get_critical_value(self) -> float:
"""Get critical threshold value."""
return self.target_value * (self.critical_threshold / 100)
@dataclass
class SLOMetric:
"""Single SLO metric measurement."""
timestamp: datetime
value: float
success: bool = True
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SLOViolation:
"""SLO violation record."""
slo_name: str
violation_time: datetime
actual_value: float
target_value: float
severity: AlertSeverity
duration_minutes: float
details: Dict[str, Any] = field(default_factory=dict)
class SLOCalculator:
"""Calculator for different types of SLO metrics."""
@staticmethod
def calculate_availability(metrics: List[SLOMetric]) -> float:
"""Calculate availability percentage."""
if not metrics:
return 0.0
successful = sum(1 for m in metrics if m.success)
total = len(metrics)
return (successful / total) * 100
@staticmethod
def calculate_latency_percentile(metrics: List[SLOMetric], percentile: float = 95.0) -> float:
"""Calculate latency percentile."""
if not metrics:
return 0.0
values = [m.value for m in metrics]
return statistics.quantiles(values, n=100)[int(percentile) - 1] if len(values) > 1 else values[0]
@staticmethod
def calculate_error_rate(metrics: List[SLOMetric]) -> float:
"""Calculate error rate percentage."""
if not metrics:
return 0.0
errors = sum(1 for m in metrics if not m.success)
total = len(metrics)
return (errors / total) * 100
@staticmethod
def calculate_throughput(metrics: List[SLOMetric], time_window_minutes: float) -> float:
"""Calculate throughput (requests per minute)."""
if not metrics or time_window_minutes <= 0:
return 0.0
return len(metrics) / time_window_minutes
@staticmethod
def calculate_custom(metrics: List[SLOMetric], calculation_func: Callable) -> float:
"""Calculate custom metric using provided function."""
return calculation_func(metrics)
class SLOMonitor:
"""
Monitor for tracking SLA/SLO compliance.
Features:
- Real-time SLO tracking
- Automated violation detection
- Alert generation
- Historical trend analysis
- Error budget tracking
"""
def __init__(self, max_history_size: int = 10000):
"""
Initialize SLO monitor.
Args:
max_history_size: Maximum number of metrics to keep in memory
"""
self.max_history_size = max_history_size
# SLO definitions
self.slo_targets: Dict[str, SLOTarget] = {}
# Metric storage
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_size))
# Violation tracking
self.violations: Dict[str, List[SLOViolation]] = defaultdict(list)
self.current_violations: Dict[str, SLOViolation] = {}
# Alert callbacks
self.alert_callbacks: List[Callable] = []
# Calculator
self.calculator = SLOCalculator()
# Statistics
self.stats = {
"total_measurements": 0,
"total_violations": 0,
"alerts_sent": 0
}
self.logger = get_structured_logger(__name__, component="slo_monitor")
# Initialize default SLOs
self._initialize_default_slos()
def _initialize_default_slos(self):
"""Initialize default SLO targets for Cidadão.AI."""
# API Availability
self.register_slo(SLOTarget(
name="api_availability",
slo_type=SLOType.AVAILABILITY,
target_value=99.9, # 99.9% uptime
time_window=TimeWindow.HOUR_24,
description="API endpoint availability over 24 hours",
warning_threshold=98.0,
critical_threshold=95.0
))
# API Response Time
self.register_slo(SLOTarget(
name="api_latency_p95",
slo_type=SLOType.LATENCY,
target_value=2000.0, # 2 seconds P95
time_window=TimeWindow.HOUR_1,
description="95th percentile API response time under 2 seconds",
warning_threshold=90.0,
critical_threshold=80.0
))
# Investigation Success Rate
self.register_slo(SLOTarget(
name="investigation_success_rate",
slo_type=SLOType.AVAILABILITY,
target_value=95.0, # 95% success rate
time_window=TimeWindow.HOUR_4,
description="Investigation completion success rate",
warning_threshold=92.0,
critical_threshold=88.0
))
# Agent Task Error Rate
self.register_slo(SLOTarget(
name="agent_error_rate",
slo_type=SLOType.ERROR_RATE,
target_value=1.0, # < 1% error rate
time_window=TimeWindow.HOUR_1,
description="Agent task error rate under 1%",
warning_threshold=80.0,
critical_threshold=60.0
))
# Database Query Performance
self.register_slo(SLOTarget(
name="database_latency_p90",
slo_type=SLOType.LATENCY,
target_value=500.0, # 500ms P90
time_window=TimeWindow.MINUTE_15,
description="90th percentile database query time under 500ms",
warning_threshold=85.0,
critical_threshold=70.0
))
# Anomaly Detection Accuracy
self.register_slo(SLOTarget(
name="anomaly_detection_accuracy",
slo_type=SLOType.CUSTOM,
target_value=90.0, # 90% accuracy
time_window=TimeWindow.HOUR_24,
description="Anomaly detection accuracy rate",
warning_threshold=95.0,
critical_threshold=90.0
))
def register_slo(self, slo_target: SLOTarget):
"""Register an SLO target for monitoring."""
self.slo_targets[slo_target.name] = slo_target
self.logger.info(
f"Registered SLO: {slo_target.name}",
operation="register_slo",
slo_type=slo_target.slo_type.value,
target_value=slo_target.target_value,
time_window=slo_target.time_window.value
)
def register_alert_callback(self, callback: Callable[[SLOViolation], None]):
"""Register callback for SLO violation alerts."""
self.alert_callbacks.append(callback)
def record_metric(
self,
slo_name: str,
value: float,
success: bool = True,
metadata: Optional[Dict[str, Any]] = None
):
"""
Record a metric for SLO monitoring.
Args:
slo_name: Name of the SLO
value: Metric value
success: Whether the operation was successful
metadata: Additional metadata
"""
if slo_name not in self.slo_targets:
self.logger.warning(f"Unknown SLO: {slo_name}")
return
metric = SLOMetric(
timestamp=datetime.utcnow(),
value=value,
success=success,
metadata=metadata or {}
)
self.metrics[slo_name].append(metric)
self.stats["total_measurements"] += 1
# Check for violations
asyncio.create_task(self._check_slo_compliance(slo_name))
self.logger.debug(
f"Recorded metric for SLO: {slo_name}",
operation="record_metric",
value=value,
success=success
)
async def _check_slo_compliance(self, slo_name: str):
"""Check SLO compliance and trigger alerts if needed."""
if slo_name not in self.slo_targets:
return
slo_target = self.slo_targets[slo_name]
current_value = await self.calculate_current_slo(slo_name)
if current_value is None:
return
# Determine violation severity
violation_severity = None
if current_value < slo_target.get_critical_value():
violation_severity = AlertSeverity.CRITICAL
elif current_value < slo_target.get_warning_value():
violation_severity = AlertSeverity.WARNING
# Handle violation
if violation_severity:
await self._handle_violation(slo_name, current_value, violation_severity)
else:
# No violation, clear any existing violation
await self._clear_violation(slo_name)
async def _handle_violation(
self,
slo_name: str,
actual_value: float,
severity: AlertSeverity
):
"""Handle SLO violation."""
slo_target = self.slo_targets[slo_name]
now = datetime.utcnow()
# Check if this is a new violation or continuation
if slo_name in self.current_violations:
# Update existing violation
violation = self.current_violations[slo_name]
violation.duration_minutes = (now - violation.violation_time).total_seconds() / 60
violation.actual_value = actual_value
violation.severity = severity
else:
# New violation
violation = SLOViolation(
slo_name=slo_name,
violation_time=now,
actual_value=actual_value,
target_value=slo_target.target_value,
severity=severity,
duration_minutes=0.0,
details={
"slo_type": slo_target.slo_type.value,
"time_window": slo_target.time_window.value,
"description": slo_target.description
}
)
self.current_violations[slo_name] = violation
self.violations[slo_name].append(violation)
self.stats["total_violations"] += 1
# Send alerts
await self._send_alert(violation)
self.logger.warning(
f"SLO violation: {slo_name}",
operation="slo_violation",
actual_value=actual_value,
target_value=slo_target.target_value,
severity=severity.value,
duration_minutes=violation.duration_minutes
)
async def _clear_violation(self, slo_name: str):
"""Clear SLO violation when back in compliance."""
if slo_name in self.current_violations:
violation = self.current_violations.pop(slo_name)
self.logger.info(
f"SLO violation cleared: {slo_name}",
operation="slo_violation_cleared",
total_duration_minutes=violation.duration_minutes
)
async def _send_alert(self, violation: SLOViolation):
"""Send alert for SLO violation."""
for callback in self.alert_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(violation)
else:
callback(violation)
self.stats["alerts_sent"] += 1
except Exception as e:
self.logger.error(
f"Failed to send alert for {violation.slo_name}",
operation="send_alert",
error=e
)
async def calculate_current_slo(self, slo_name: str) -> Optional[float]:
"""Calculate current SLO value."""
if slo_name not in self.slo_targets:
return None
slo_target = self.slo_targets[slo_name]
metrics = self._get_metrics_for_window(slo_name, slo_target.time_window)
if not metrics:
return None
# Calculate based on SLO type
if slo_target.slo_type == SLOType.AVAILABILITY:
return self.calculator.calculate_availability(metrics)
elif slo_target.slo_type == SLOType.LATENCY:
# For latency SLOs, we want percentage under threshold
under_threshold = sum(1 for m in metrics if m.value <= slo_target.target_value)
return (under_threshold / len(metrics)) * 100
elif slo_target.slo_type == SLOType.ERROR_RATE:
error_rate = self.calculator.calculate_error_rate(metrics)
# For error rate SLOs, we want percentage compliance (low error rate)
return max(0, 100 - error_rate)
elif slo_target.slo_type == SLOType.THROUGHPUT:
time_window_minutes = self._get_time_window_minutes(slo_target.time_window)
throughput = self.calculator.calculate_throughput(metrics, time_window_minutes)
# Calculate percentage of target throughput achieved
return min(100, (throughput / slo_target.target_value) * 100)
elif slo_target.slo_type == SLOType.CUSTOM:
# For custom SLOs, use the metric values directly
if metrics:
return statistics.mean(m.value for m in metrics)
return None
def _get_metrics_for_window(self, slo_name: str, time_window: TimeWindow) -> List[SLOMetric]:
"""Get metrics for the specified time window."""
if slo_name not in self.metrics:
return []
now = datetime.utcnow()
window_minutes = self._get_time_window_minutes(time_window)
cutoff_time = now - timedelta(minutes=window_minutes)
return [
metric for metric in self.metrics[slo_name]
if metric.timestamp > cutoff_time
]
def _get_time_window_minutes(self, time_window: TimeWindow) -> float:
"""Convert time window to minutes."""
window_map = {
TimeWindow.MINUTE_1: 1,
TimeWindow.MINUTE_5: 5,
TimeWindow.MINUTE_15: 15,
TimeWindow.HOUR_1: 60,
TimeWindow.HOUR_4: 240,
TimeWindow.HOUR_24: 1440,
TimeWindow.DAY_7: 10080,
TimeWindow.DAY_30: 43200
}
return window_map.get(time_window, 60)
def get_slo_status(self, slo_name: str) -> Dict[str, Any]:
"""Get current status of an SLO."""
if slo_name not in self.slo_targets:
return {"error": f"Unknown SLO: {slo_name}"}
slo_target = self.slo_targets[slo_name]
current_value = asyncio.run(self.calculate_current_slo(slo_name))
# Calculate error budget
if current_value is not None:
error_budget_consumed = max(0, (slo_target.target_value - current_value) / slo_target.target_value * 100)
else:
error_budget_consumed = 0
# Get violation history
recent_violations = [
{
"timestamp": v.violation_time.isoformat(),
"severity": v.severity.value,
"duration_minutes": v.duration_minutes,
"actual_value": v.actual_value
}
for v in self.violations[slo_name][-10:] # Last 10 violations
]
return {
"slo_name": slo_name,
"target": slo_target.target_value,
"current_value": current_value,
"compliance_percentage": (current_value / slo_target.target_value * 100) if current_value else 0,
"error_budget_consumed_percentage": error_budget_consumed,
"status": "compliant" if slo_name not in self.current_violations else "violated",
"time_window": slo_target.time_window.value,
"description": slo_target.description,
"current_violation": (
{
"severity": self.current_violations[slo_name].severity.value,
"duration_minutes": self.current_violations[slo_name].duration_minutes,
"started_at": self.current_violations[slo_name].violation_time.isoformat()
}
if slo_name in self.current_violations else None
),
"recent_violations": recent_violations,
"total_metrics": len(self.metrics[slo_name])
}
def get_all_slo_status(self) -> Dict[str, Any]:
"""Get status of all SLOs."""
slo_statuses = {}
for slo_name in self.slo_targets.keys():
slo_statuses[slo_name] = self.get_slo_status(slo_name)
# Calculate overall compliance
compliant_slos = sum(
1 for status in slo_statuses.values()
if status.get("status") == "compliant"
)
total_slos = len(slo_statuses)
overall_compliance = (compliant_slos / total_slos * 100) if total_slos > 0 else 100
return {
"timestamp": datetime.utcnow().isoformat(),
"overall_compliance_percentage": overall_compliance,
"total_slos": total_slos,
"compliant_slos": compliant_slos,
"violated_slos": total_slos - compliant_slos,
"statistics": self.stats,
"slo_details": slo_statuses
}
def get_error_budget_report(self) -> Dict[str, Any]:
"""Get error budget consumption report."""
report = {
"timestamp": datetime.utcnow().isoformat(),
"error_budgets": {}
}
for slo_name, slo_target in self.slo_targets.items():
current_value = asyncio.run(self.calculate_current_slo(slo_name))
if current_value is not None:
# Calculate error budget
budget_remaining = max(0, current_value - slo_target.target_value)
budget_consumed = max(0, slo_target.target_value - current_value)
budget_total = 100 - slo_target.target_value # Assuming percentage-based
consumption_percentage = (budget_consumed / budget_total * 100) if budget_total > 0 else 0
report["error_budgets"][slo_name] = {
"target": slo_target.target_value,
"current_value": current_value,
"budget_consumed": budget_consumed,
"budget_remaining": budget_remaining,
"consumption_percentage": consumption_percentage,
"status": (
"healthy" if consumption_percentage < 50
else "warning" if consumption_percentage < 80
else "critical"
),
"time_window": slo_target.time_window.value
}
return report
# Global SLO monitor
slo_monitor = SLOMonitor()
# Convenience functions for common metrics
def record_api_request(endpoint: str, response_time_ms: float, success: bool):
"""Record API request metrics for SLO monitoring."""
slo_monitor.record_metric("api_availability", 1.0, success, {"endpoint": endpoint})
slo_monitor.record_metric("api_latency_p95", response_time_ms, success, {"endpoint": endpoint})
def record_investigation_result(investigation_id: str, success: bool, duration_ms: float):
"""Record investigation result for SLO monitoring."""
slo_monitor.record_metric("investigation_success_rate", 1.0, success, {
"investigation_id": investigation_id,
"duration_ms": duration_ms
})
def record_agent_task(agent_name: str, task_type: str, success: bool, duration_ms: float):
"""Record agent task for SLO monitoring."""
slo_monitor.record_metric("agent_error_rate", 1.0, success, {
"agent_name": agent_name,
"task_type": task_type,
"duration_ms": duration_ms
})
def record_database_query(query_type: str, duration_ms: float, success: bool):
"""Record database query for SLO monitoring."""
slo_monitor.record_metric("database_latency_p90", duration_ms, success, {
"query_type": query_type
})
def record_anomaly_detection(true_positive: bool, false_positive: bool):
"""Record anomaly detection accuracy."""
if true_positive:
accuracy = 100.0
elif false_positive:
accuracy = 0.0
else:
accuracy = 50.0 # Unknown case
slo_monitor.record_metric("anomaly_detection_accuracy", accuracy, True, {
"true_positive": true_positive,
"false_positive": false_positive
})
# Default alert callback
async def default_alert_callback(violation: SLOViolation):
"""Default alert callback that logs violations."""
logger.warning(
f"SLO VIOLATION ALERT: {violation.slo_name}",
operation="slo_alert",
severity=violation.severity.value,
actual_value=violation.actual_value,
target_value=violation.target_value,
duration_minutes=violation.duration_minutes
)
# Record alert metric
BusinessMetrics.record_investigation_created() # Using as example metric
# Register default alert callback
slo_monitor.register_alert_callback(default_alert_callback)