|
|
""" |
|
|
SLA/SLO monitoring system for tracking service level objectives. |
|
|
|
|
|
This module provides comprehensive SLA/SLO tracking with automated |
|
|
alerting and reporting capabilities. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import time |
|
|
from typing import Dict, Any, List, Optional, Callable, Union |
|
|
from datetime import datetime, timedelta |
|
|
from enum import Enum |
|
|
from dataclasses import dataclass, field |
|
|
from collections import deque, defaultdict |
|
|
import statistics |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.infrastructure.observability import BusinessMetrics, get_structured_logger |
|
|
|
|
|
logger = get_structured_logger(__name__, component="slo_monitor") |
|
|
|
|
|
|
|
|
class SLOType(str, Enum): |
|
|
"""Types of SLO metrics.""" |
|
|
AVAILABILITY = "availability" |
|
|
LATENCY = "latency" |
|
|
ERROR_RATE = "error_rate" |
|
|
THROUGHPUT = "throughput" |
|
|
CUSTOM = "custom" |
|
|
|
|
|
|
|
|
class AlertSeverity(str, Enum): |
|
|
"""Alert severity levels.""" |
|
|
INFO = "info" |
|
|
WARNING = "warning" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
class TimeWindow(str, Enum): |
|
|
"""Time windows for SLO calculations.""" |
|
|
MINUTE_1 = "1m" |
|
|
MINUTE_5 = "5m" |
|
|
MINUTE_15 = "15m" |
|
|
HOUR_1 = "1h" |
|
|
HOUR_4 = "4h" |
|
|
HOUR_24 = "24h" |
|
|
DAY_7 = "7d" |
|
|
DAY_30 = "30d" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SLOTarget: |
|
|
"""SLO target definition.""" |
|
|
name: str |
|
|
slo_type: SLOType |
|
|
target_value: float |
|
|
time_window: TimeWindow |
|
|
description: str |
|
|
warning_threshold: float = 95.0 |
|
|
critical_threshold: float = 90.0 |
|
|
|
|
|
def get_warning_value(self) -> float: |
|
|
"""Get warning threshold value.""" |
|
|
return self.target_value * (self.warning_threshold / 100) |
|
|
|
|
|
def get_critical_value(self) -> float: |
|
|
"""Get critical threshold value.""" |
|
|
return self.target_value * (self.critical_threshold / 100) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SLOMetric: |
|
|
"""Single SLO metric measurement.""" |
|
|
timestamp: datetime |
|
|
value: float |
|
|
success: bool = True |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SLOViolation: |
|
|
"""SLO violation record.""" |
|
|
slo_name: str |
|
|
violation_time: datetime |
|
|
actual_value: float |
|
|
target_value: float |
|
|
severity: AlertSeverity |
|
|
duration_minutes: float |
|
|
details: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
|
|
|
class SLOCalculator: |
|
|
"""Calculator for different types of SLO metrics.""" |
|
|
|
|
|
@staticmethod |
|
|
def calculate_availability(metrics: List[SLOMetric]) -> float: |
|
|
"""Calculate availability percentage.""" |
|
|
if not metrics: |
|
|
return 0.0 |
|
|
|
|
|
successful = sum(1 for m in metrics if m.success) |
|
|
total = len(metrics) |
|
|
|
|
|
return (successful / total) * 100 |
|
|
|
|
|
@staticmethod |
|
|
def calculate_latency_percentile(metrics: List[SLOMetric], percentile: float = 95.0) -> float: |
|
|
"""Calculate latency percentile.""" |
|
|
if not metrics: |
|
|
return 0.0 |
|
|
|
|
|
values = [m.value for m in metrics] |
|
|
return statistics.quantiles(values, n=100)[int(percentile) - 1] if len(values) > 1 else values[0] |
|
|
|
|
|
@staticmethod |
|
|
def calculate_error_rate(metrics: List[SLOMetric]) -> float: |
|
|
"""Calculate error rate percentage.""" |
|
|
if not metrics: |
|
|
return 0.0 |
|
|
|
|
|
errors = sum(1 for m in metrics if not m.success) |
|
|
total = len(metrics) |
|
|
|
|
|
return (errors / total) * 100 |
|
|
|
|
|
@staticmethod |
|
|
def calculate_throughput(metrics: List[SLOMetric], time_window_minutes: float) -> float: |
|
|
"""Calculate throughput (requests per minute).""" |
|
|
if not metrics or time_window_minutes <= 0: |
|
|
return 0.0 |
|
|
|
|
|
return len(metrics) / time_window_minutes |
|
|
|
|
|
@staticmethod |
|
|
def calculate_custom(metrics: List[SLOMetric], calculation_func: Callable) -> float: |
|
|
"""Calculate custom metric using provided function.""" |
|
|
return calculation_func(metrics) |
|
|
|
|
|
|
|
|
class SLOMonitor: |
|
|
""" |
|
|
Monitor for tracking SLA/SLO compliance. |
|
|
|
|
|
Features: |
|
|
- Real-time SLO tracking |
|
|
- Automated violation detection |
|
|
- Alert generation |
|
|
- Historical trend analysis |
|
|
- Error budget tracking |
|
|
""" |
|
|
|
|
|
def __init__(self, max_history_size: int = 10000): |
|
|
""" |
|
|
Initialize SLO monitor. |
|
|
|
|
|
Args: |
|
|
max_history_size: Maximum number of metrics to keep in memory |
|
|
""" |
|
|
self.max_history_size = max_history_size |
|
|
|
|
|
|
|
|
self.slo_targets: Dict[str, SLOTarget] = {} |
|
|
|
|
|
|
|
|
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_size)) |
|
|
|
|
|
|
|
|
self.violations: Dict[str, List[SLOViolation]] = defaultdict(list) |
|
|
self.current_violations: Dict[str, SLOViolation] = {} |
|
|
|
|
|
|
|
|
self.alert_callbacks: List[Callable] = [] |
|
|
|
|
|
|
|
|
self.calculator = SLOCalculator() |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
"total_measurements": 0, |
|
|
"total_violations": 0, |
|
|
"alerts_sent": 0 |
|
|
} |
|
|
|
|
|
self.logger = get_structured_logger(__name__, component="slo_monitor") |
|
|
|
|
|
|
|
|
self._initialize_default_slos() |
|
|
|
|
|
def _initialize_default_slos(self): |
|
|
"""Initialize default SLO targets for Cidadão.AI.""" |
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="api_availability", |
|
|
slo_type=SLOType.AVAILABILITY, |
|
|
target_value=99.9, |
|
|
time_window=TimeWindow.HOUR_24, |
|
|
description="API endpoint availability over 24 hours", |
|
|
warning_threshold=98.0, |
|
|
critical_threshold=95.0 |
|
|
)) |
|
|
|
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="api_latency_p95", |
|
|
slo_type=SLOType.LATENCY, |
|
|
target_value=2000.0, |
|
|
time_window=TimeWindow.HOUR_1, |
|
|
description="95th percentile API response time under 2 seconds", |
|
|
warning_threshold=90.0, |
|
|
critical_threshold=80.0 |
|
|
)) |
|
|
|
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="investigation_success_rate", |
|
|
slo_type=SLOType.AVAILABILITY, |
|
|
target_value=95.0, |
|
|
time_window=TimeWindow.HOUR_4, |
|
|
description="Investigation completion success rate", |
|
|
warning_threshold=92.0, |
|
|
critical_threshold=88.0 |
|
|
)) |
|
|
|
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="agent_error_rate", |
|
|
slo_type=SLOType.ERROR_RATE, |
|
|
target_value=1.0, |
|
|
time_window=TimeWindow.HOUR_1, |
|
|
description="Agent task error rate under 1%", |
|
|
warning_threshold=80.0, |
|
|
critical_threshold=60.0 |
|
|
)) |
|
|
|
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="database_latency_p90", |
|
|
slo_type=SLOType.LATENCY, |
|
|
target_value=500.0, |
|
|
time_window=TimeWindow.MINUTE_15, |
|
|
description="90th percentile database query time under 500ms", |
|
|
warning_threshold=85.0, |
|
|
critical_threshold=70.0 |
|
|
)) |
|
|
|
|
|
|
|
|
self.register_slo(SLOTarget( |
|
|
name="anomaly_detection_accuracy", |
|
|
slo_type=SLOType.CUSTOM, |
|
|
target_value=90.0, |
|
|
time_window=TimeWindow.HOUR_24, |
|
|
description="Anomaly detection accuracy rate", |
|
|
warning_threshold=95.0, |
|
|
critical_threshold=90.0 |
|
|
)) |
|
|
|
|
|
def register_slo(self, slo_target: SLOTarget): |
|
|
"""Register an SLO target for monitoring.""" |
|
|
self.slo_targets[slo_target.name] = slo_target |
|
|
|
|
|
self.logger.info( |
|
|
f"Registered SLO: {slo_target.name}", |
|
|
operation="register_slo", |
|
|
slo_type=slo_target.slo_type.value, |
|
|
target_value=slo_target.target_value, |
|
|
time_window=slo_target.time_window.value |
|
|
) |
|
|
|
|
|
def register_alert_callback(self, callback: Callable[[SLOViolation], None]): |
|
|
"""Register callback for SLO violation alerts.""" |
|
|
self.alert_callbacks.append(callback) |
|
|
|
|
|
def record_metric( |
|
|
self, |
|
|
slo_name: str, |
|
|
value: float, |
|
|
success: bool = True, |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
): |
|
|
""" |
|
|
Record a metric for SLO monitoring. |
|
|
|
|
|
Args: |
|
|
slo_name: Name of the SLO |
|
|
value: Metric value |
|
|
success: Whether the operation was successful |
|
|
metadata: Additional metadata |
|
|
""" |
|
|
if slo_name not in self.slo_targets: |
|
|
self.logger.warning(f"Unknown SLO: {slo_name}") |
|
|
return |
|
|
|
|
|
metric = SLOMetric( |
|
|
timestamp=datetime.utcnow(), |
|
|
value=value, |
|
|
success=success, |
|
|
metadata=metadata or {} |
|
|
) |
|
|
|
|
|
self.metrics[slo_name].append(metric) |
|
|
self.stats["total_measurements"] += 1 |
|
|
|
|
|
|
|
|
asyncio.create_task(self._check_slo_compliance(slo_name)) |
|
|
|
|
|
self.logger.debug( |
|
|
f"Recorded metric for SLO: {slo_name}", |
|
|
operation="record_metric", |
|
|
value=value, |
|
|
success=success |
|
|
) |
|
|
|
|
|
async def _check_slo_compliance(self, slo_name: str): |
|
|
"""Check SLO compliance and trigger alerts if needed.""" |
|
|
if slo_name not in self.slo_targets: |
|
|
return |
|
|
|
|
|
slo_target = self.slo_targets[slo_name] |
|
|
current_value = await self.calculate_current_slo(slo_name) |
|
|
|
|
|
if current_value is None: |
|
|
return |
|
|
|
|
|
|
|
|
violation_severity = None |
|
|
|
|
|
if current_value < slo_target.get_critical_value(): |
|
|
violation_severity = AlertSeverity.CRITICAL |
|
|
elif current_value < slo_target.get_warning_value(): |
|
|
violation_severity = AlertSeverity.WARNING |
|
|
|
|
|
|
|
|
if violation_severity: |
|
|
await self._handle_violation(slo_name, current_value, violation_severity) |
|
|
else: |
|
|
|
|
|
await self._clear_violation(slo_name) |
|
|
|
|
|
async def _handle_violation( |
|
|
self, |
|
|
slo_name: str, |
|
|
actual_value: float, |
|
|
severity: AlertSeverity |
|
|
): |
|
|
"""Handle SLO violation.""" |
|
|
slo_target = self.slo_targets[slo_name] |
|
|
now = datetime.utcnow() |
|
|
|
|
|
|
|
|
if slo_name in self.current_violations: |
|
|
|
|
|
violation = self.current_violations[slo_name] |
|
|
violation.duration_minutes = (now - violation.violation_time).total_seconds() / 60 |
|
|
violation.actual_value = actual_value |
|
|
violation.severity = severity |
|
|
else: |
|
|
|
|
|
violation = SLOViolation( |
|
|
slo_name=slo_name, |
|
|
violation_time=now, |
|
|
actual_value=actual_value, |
|
|
target_value=slo_target.target_value, |
|
|
severity=severity, |
|
|
duration_minutes=0.0, |
|
|
details={ |
|
|
"slo_type": slo_target.slo_type.value, |
|
|
"time_window": slo_target.time_window.value, |
|
|
"description": slo_target.description |
|
|
} |
|
|
) |
|
|
|
|
|
self.current_violations[slo_name] = violation |
|
|
self.violations[slo_name].append(violation) |
|
|
self.stats["total_violations"] += 1 |
|
|
|
|
|
|
|
|
await self._send_alert(violation) |
|
|
|
|
|
self.logger.warning( |
|
|
f"SLO violation: {slo_name}", |
|
|
operation="slo_violation", |
|
|
actual_value=actual_value, |
|
|
target_value=slo_target.target_value, |
|
|
severity=severity.value, |
|
|
duration_minutes=violation.duration_minutes |
|
|
) |
|
|
|
|
|
async def _clear_violation(self, slo_name: str): |
|
|
"""Clear SLO violation when back in compliance.""" |
|
|
if slo_name in self.current_violations: |
|
|
violation = self.current_violations.pop(slo_name) |
|
|
|
|
|
self.logger.info( |
|
|
f"SLO violation cleared: {slo_name}", |
|
|
operation="slo_violation_cleared", |
|
|
total_duration_minutes=violation.duration_minutes |
|
|
) |
|
|
|
|
|
async def _send_alert(self, violation: SLOViolation): |
|
|
"""Send alert for SLO violation.""" |
|
|
for callback in self.alert_callbacks: |
|
|
try: |
|
|
if asyncio.iscoroutinefunction(callback): |
|
|
await callback(violation) |
|
|
else: |
|
|
callback(violation) |
|
|
|
|
|
self.stats["alerts_sent"] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error( |
|
|
f"Failed to send alert for {violation.slo_name}", |
|
|
operation="send_alert", |
|
|
error=e |
|
|
) |
|
|
|
|
|
async def calculate_current_slo(self, slo_name: str) -> Optional[float]: |
|
|
"""Calculate current SLO value.""" |
|
|
if slo_name not in self.slo_targets: |
|
|
return None |
|
|
|
|
|
slo_target = self.slo_targets[slo_name] |
|
|
metrics = self._get_metrics_for_window(slo_name, slo_target.time_window) |
|
|
|
|
|
if not metrics: |
|
|
return None |
|
|
|
|
|
|
|
|
if slo_target.slo_type == SLOType.AVAILABILITY: |
|
|
return self.calculator.calculate_availability(metrics) |
|
|
|
|
|
elif slo_target.slo_type == SLOType.LATENCY: |
|
|
|
|
|
under_threshold = sum(1 for m in metrics if m.value <= slo_target.target_value) |
|
|
return (under_threshold / len(metrics)) * 100 |
|
|
|
|
|
elif slo_target.slo_type == SLOType.ERROR_RATE: |
|
|
error_rate = self.calculator.calculate_error_rate(metrics) |
|
|
|
|
|
return max(0, 100 - error_rate) |
|
|
|
|
|
elif slo_target.slo_type == SLOType.THROUGHPUT: |
|
|
time_window_minutes = self._get_time_window_minutes(slo_target.time_window) |
|
|
throughput = self.calculator.calculate_throughput(metrics, time_window_minutes) |
|
|
|
|
|
return min(100, (throughput / slo_target.target_value) * 100) |
|
|
|
|
|
elif slo_target.slo_type == SLOType.CUSTOM: |
|
|
|
|
|
if metrics: |
|
|
return statistics.mean(m.value for m in metrics) |
|
|
|
|
|
return None |
|
|
|
|
|
def _get_metrics_for_window(self, slo_name: str, time_window: TimeWindow) -> List[SLOMetric]: |
|
|
"""Get metrics for the specified time window.""" |
|
|
if slo_name not in self.metrics: |
|
|
return [] |
|
|
|
|
|
now = datetime.utcnow() |
|
|
window_minutes = self._get_time_window_minutes(time_window) |
|
|
cutoff_time = now - timedelta(minutes=window_minutes) |
|
|
|
|
|
return [ |
|
|
metric for metric in self.metrics[slo_name] |
|
|
if metric.timestamp > cutoff_time |
|
|
] |
|
|
|
|
|
def _get_time_window_minutes(self, time_window: TimeWindow) -> float: |
|
|
"""Convert time window to minutes.""" |
|
|
window_map = { |
|
|
TimeWindow.MINUTE_1: 1, |
|
|
TimeWindow.MINUTE_5: 5, |
|
|
TimeWindow.MINUTE_15: 15, |
|
|
TimeWindow.HOUR_1: 60, |
|
|
TimeWindow.HOUR_4: 240, |
|
|
TimeWindow.HOUR_24: 1440, |
|
|
TimeWindow.DAY_7: 10080, |
|
|
TimeWindow.DAY_30: 43200 |
|
|
} |
|
|
return window_map.get(time_window, 60) |
|
|
|
|
|
def get_slo_status(self, slo_name: str) -> Dict[str, Any]: |
|
|
"""Get current status of an SLO.""" |
|
|
if slo_name not in self.slo_targets: |
|
|
return {"error": f"Unknown SLO: {slo_name}"} |
|
|
|
|
|
slo_target = self.slo_targets[slo_name] |
|
|
current_value = asyncio.run(self.calculate_current_slo(slo_name)) |
|
|
|
|
|
|
|
|
if current_value is not None: |
|
|
error_budget_consumed = max(0, (slo_target.target_value - current_value) / slo_target.target_value * 100) |
|
|
else: |
|
|
error_budget_consumed = 0 |
|
|
|
|
|
|
|
|
recent_violations = [ |
|
|
{ |
|
|
"timestamp": v.violation_time.isoformat(), |
|
|
"severity": v.severity.value, |
|
|
"duration_minutes": v.duration_minutes, |
|
|
"actual_value": v.actual_value |
|
|
} |
|
|
for v in self.violations[slo_name][-10:] |
|
|
] |
|
|
|
|
|
return { |
|
|
"slo_name": slo_name, |
|
|
"target": slo_target.target_value, |
|
|
"current_value": current_value, |
|
|
"compliance_percentage": (current_value / slo_target.target_value * 100) if current_value else 0, |
|
|
"error_budget_consumed_percentage": error_budget_consumed, |
|
|
"status": "compliant" if slo_name not in self.current_violations else "violated", |
|
|
"time_window": slo_target.time_window.value, |
|
|
"description": slo_target.description, |
|
|
"current_violation": ( |
|
|
{ |
|
|
"severity": self.current_violations[slo_name].severity.value, |
|
|
"duration_minutes": self.current_violations[slo_name].duration_minutes, |
|
|
"started_at": self.current_violations[slo_name].violation_time.isoformat() |
|
|
} |
|
|
if slo_name in self.current_violations else None |
|
|
), |
|
|
"recent_violations": recent_violations, |
|
|
"total_metrics": len(self.metrics[slo_name]) |
|
|
} |
|
|
|
|
|
def get_all_slo_status(self) -> Dict[str, Any]: |
|
|
"""Get status of all SLOs.""" |
|
|
slo_statuses = {} |
|
|
|
|
|
for slo_name in self.slo_targets.keys(): |
|
|
slo_statuses[slo_name] = self.get_slo_status(slo_name) |
|
|
|
|
|
|
|
|
compliant_slos = sum( |
|
|
1 for status in slo_statuses.values() |
|
|
if status.get("status") == "compliant" |
|
|
) |
|
|
|
|
|
total_slos = len(slo_statuses) |
|
|
overall_compliance = (compliant_slos / total_slos * 100) if total_slos > 0 else 100 |
|
|
|
|
|
return { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"overall_compliance_percentage": overall_compliance, |
|
|
"total_slos": total_slos, |
|
|
"compliant_slos": compliant_slos, |
|
|
"violated_slos": total_slos - compliant_slos, |
|
|
"statistics": self.stats, |
|
|
"slo_details": slo_statuses |
|
|
} |
|
|
|
|
|
def get_error_budget_report(self) -> Dict[str, Any]: |
|
|
"""Get error budget consumption report.""" |
|
|
report = { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"error_budgets": {} |
|
|
} |
|
|
|
|
|
for slo_name, slo_target in self.slo_targets.items(): |
|
|
current_value = asyncio.run(self.calculate_current_slo(slo_name)) |
|
|
|
|
|
if current_value is not None: |
|
|
|
|
|
budget_remaining = max(0, current_value - slo_target.target_value) |
|
|
budget_consumed = max(0, slo_target.target_value - current_value) |
|
|
budget_total = 100 - slo_target.target_value |
|
|
|
|
|
consumption_percentage = (budget_consumed / budget_total * 100) if budget_total > 0 else 0 |
|
|
|
|
|
report["error_budgets"][slo_name] = { |
|
|
"target": slo_target.target_value, |
|
|
"current_value": current_value, |
|
|
"budget_consumed": budget_consumed, |
|
|
"budget_remaining": budget_remaining, |
|
|
"consumption_percentage": consumption_percentage, |
|
|
"status": ( |
|
|
"healthy" if consumption_percentage < 50 |
|
|
else "warning" if consumption_percentage < 80 |
|
|
else "critical" |
|
|
), |
|
|
"time_window": slo_target.time_window.value |
|
|
} |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
|
|
|
slo_monitor = SLOMonitor() |
|
|
|
|
|
|
|
|
|
|
|
def record_api_request(endpoint: str, response_time_ms: float, success: bool): |
|
|
"""Record API request metrics for SLO monitoring.""" |
|
|
slo_monitor.record_metric("api_availability", 1.0, success, {"endpoint": endpoint}) |
|
|
slo_monitor.record_metric("api_latency_p95", response_time_ms, success, {"endpoint": endpoint}) |
|
|
|
|
|
|
|
|
def record_investigation_result(investigation_id: str, success: bool, duration_ms: float): |
|
|
"""Record investigation result for SLO monitoring.""" |
|
|
slo_monitor.record_metric("investigation_success_rate", 1.0, success, { |
|
|
"investigation_id": investigation_id, |
|
|
"duration_ms": duration_ms |
|
|
}) |
|
|
|
|
|
|
|
|
def record_agent_task(agent_name: str, task_type: str, success: bool, duration_ms: float): |
|
|
"""Record agent task for SLO monitoring.""" |
|
|
slo_monitor.record_metric("agent_error_rate", 1.0, success, { |
|
|
"agent_name": agent_name, |
|
|
"task_type": task_type, |
|
|
"duration_ms": duration_ms |
|
|
}) |
|
|
|
|
|
|
|
|
def record_database_query(query_type: str, duration_ms: float, success: bool): |
|
|
"""Record database query for SLO monitoring.""" |
|
|
slo_monitor.record_metric("database_latency_p90", duration_ms, success, { |
|
|
"query_type": query_type |
|
|
}) |
|
|
|
|
|
|
|
|
def record_anomaly_detection(true_positive: bool, false_positive: bool): |
|
|
"""Record anomaly detection accuracy.""" |
|
|
if true_positive: |
|
|
accuracy = 100.0 |
|
|
elif false_positive: |
|
|
accuracy = 0.0 |
|
|
else: |
|
|
accuracy = 50.0 |
|
|
|
|
|
slo_monitor.record_metric("anomaly_detection_accuracy", accuracy, True, { |
|
|
"true_positive": true_positive, |
|
|
"false_positive": false_positive |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
async def default_alert_callback(violation: SLOViolation): |
|
|
"""Default alert callback that logs violations.""" |
|
|
logger.warning( |
|
|
f"SLO VIOLATION ALERT: {violation.slo_name}", |
|
|
operation="slo_alert", |
|
|
severity=violation.severity.value, |
|
|
actual_value=violation.actual_value, |
|
|
target_value=violation.target_value, |
|
|
duration_minutes=violation.duration_minutes |
|
|
) |
|
|
|
|
|
|
|
|
BusinessMetrics.record_investigation_created() |
|
|
|
|
|
|
|
|
|
|
|
slo_monitor.register_alert_callback(default_alert_callback) |