anderson-ufrj
feat: implement resilience patterns and monitoring endpoints
35d7096
raw
history blame
12.4 kB
"""
Resilience monitoring endpoints.
This module provides endpoints for monitoring circuit breakers,
bulkheads, and overall system resilience.
"""
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, Depends
from src.core import get_logger
from src.api.auth import get_current_user
from src.infrastructure.resilience import circuit_breaker_manager, bulkhead_manager
logger = get_logger(__name__)
router = APIRouter(prefix="/api/v1/resilience", tags=["Resilience"])
@router.get("/circuit-breakers", response_model=Dict[str, Any])
async def get_circuit_breaker_stats(
current_user = Depends(get_current_user)
):
"""
Get statistics for all circuit breakers.
Returns detailed statistics including:
- Current state
- Success/failure rates
- Request counts
- Recent state changes
"""
try:
stats = circuit_breaker_manager.get_all_stats()
health_status = circuit_breaker_manager.get_health_status()
return {
"circuit_breakers": stats,
"health_status": health_status,
"summary": {
"total_breakers": len(stats),
"healthy_services": len(health_status["healthy_services"]),
"degraded_services": len(health_status["degraded_services"]),
"failed_services": len(health_status["failed_services"])
}
}
except Exception as e:
logger.error(f"Failed to get circuit breaker stats: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve circuit breaker statistics")
@router.get("/circuit-breakers/{service_name}", response_model=Dict[str, Any])
async def get_circuit_breaker_stats_by_service(
service_name: str,
current_user = Depends(get_current_user)
):
"""
Get statistics for a specific circuit breaker.
Args:
service_name: Name of the service
Returns:
Detailed statistics for the specified circuit breaker
"""
try:
# Get or create circuit breaker to ensure it exists
breaker = circuit_breaker_manager.get_circuit_breaker(service_name)
stats = breaker.get_stats()
return {
"service_name": service_name,
"circuit_breaker": stats
}
except Exception as e:
logger.error(f"Failed to get circuit breaker stats for {service_name}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve circuit breaker statistics for {service_name}")
@router.post("/circuit-breakers/{service_name}/reset", response_model=Dict[str, Any])
async def reset_circuit_breaker(
service_name: str,
current_user = Depends(get_current_user)
):
"""
Reset a specific circuit breaker to closed state.
Args:
service_name: Name of the service
Returns:
Success confirmation
"""
try:
breaker = circuit_breaker_manager.get_circuit_breaker(service_name)
await breaker.reset()
logger.info(f"Circuit breaker for {service_name} reset by user {current_user['sub']}")
return {
"message": f"Circuit breaker for {service_name} reset successfully",
"service_name": service_name,
"new_state": breaker.state.value
}
except Exception as e:
logger.error(f"Failed to reset circuit breaker for {service_name}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reset circuit breaker for {service_name}")
@router.post("/circuit-breakers/reset-all", response_model=Dict[str, Any])
async def reset_all_circuit_breakers(
current_user = Depends(get_current_user)
):
"""
Reset all circuit breakers to closed state.
Returns:
Success confirmation
"""
try:
await circuit_breaker_manager.reset_all()
logger.warning(f"All circuit breakers reset by user {current_user['sub']}")
return {
"message": "All circuit breakers reset successfully",
"reset_by": current_user["sub"]
}
except Exception as e:
logger.error(f"Failed to reset all circuit breakers: {e}")
raise HTTPException(status_code=500, detail="Failed to reset all circuit breakers")
@router.get("/bulkheads", response_model=Dict[str, Any])
async def get_bulkhead_stats(
current_user = Depends(get_current_user)
):
"""
Get statistics for all bulkheads.
Returns detailed statistics including:
- Current utilization
- Active/queued operations
- Performance metrics
- Resource isolation status
"""
try:
stats = bulkhead_manager.get_all_stats()
utilization = bulkhead_manager.get_resource_utilization()
return {
"bulkheads": stats,
"resource_utilization": utilization,
"summary": {
"total_bulkheads": len(stats),
"overall_utilization": utilization["overall_utilization"],
"total_capacity": utilization["total_capacity"],
"total_active": utilization["total_active"],
"total_queued": utilization["total_queued"]
}
}
except Exception as e:
logger.error(f"Failed to get bulkhead stats: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve bulkhead statistics")
@router.get("/bulkheads/{resource_type}", response_model=Dict[str, Any])
async def get_bulkhead_stats_by_resource(
resource_type: str,
current_user = Depends(get_current_user)
):
"""
Get statistics for a specific bulkhead.
Args:
resource_type: Type of resource
Returns:
Detailed statistics for the specified bulkhead
"""
try:
# Get or create bulkhead to ensure it exists
bulkhead = bulkhead_manager.get_bulkhead(resource_type)
stats = bulkhead.get_stats()
return {
"resource_type": resource_type,
"bulkhead": stats
}
except Exception as e:
logger.error(f"Failed to get bulkhead stats for {resource_type}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve bulkhead statistics for {resource_type}")
@router.get("/health", response_model=Dict[str, Any])
async def get_resilience_health():
"""
Get overall resilience health status.
Returns:
Comprehensive health status of all resilience components
"""
try:
circuit_breaker_health = circuit_breaker_manager.get_health_status()
bulkhead_utilization = bulkhead_manager.get_resource_utilization()
# Determine overall health
overall_health = "healthy"
# Check circuit breaker health
if circuit_breaker_health["overall_health"] == "critical":
overall_health = "critical"
elif circuit_breaker_health["overall_health"] == "degraded":
overall_health = "degraded"
# Check bulkhead utilization
if bulkhead_utilization["overall_utilization"] > 0.9:
if overall_health == "healthy":
overall_health = "degraded"
elif bulkhead_utilization["overall_utilization"] > 0.95:
overall_health = "critical"
return {
"overall_health": overall_health,
"circuit_breakers": {
"health": circuit_breaker_health["overall_health"],
"healthy_services": len(circuit_breaker_health["healthy_services"]),
"failed_services": len(circuit_breaker_health["failed_services"]),
"health_score": circuit_breaker_health["health_score"]
},
"bulkheads": {
"utilization": bulkhead_utilization["overall_utilization"],
"active_operations": bulkhead_utilization["total_active"],
"total_capacity": bulkhead_utilization["total_capacity"],
"queued_operations": bulkhead_utilization["total_queued"]
},
"recommendations": _generate_health_recommendations(
circuit_breaker_health,
bulkhead_utilization
)
}
except Exception as e:
logger.error(f"Failed to get resilience health: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve resilience health status")
@router.get("/metrics", response_model=Dict[str, Any])
async def get_resilience_metrics(
current_user = Depends(get_current_user)
):
"""
Get comprehensive resilience metrics.
Returns:
Detailed metrics for monitoring and alerting
"""
try:
circuit_breaker_stats = circuit_breaker_manager.get_all_stats()
bulkhead_stats = bulkhead_manager.get_all_stats()
# Aggregate metrics
total_requests = 0
total_failures = 0
total_timeouts = 0
total_rejections = 0
for stats in circuit_breaker_stats.values():
total_requests += stats["stats"]["total_requests"]
total_failures += stats["stats"]["failed_requests"]
total_rejections += stats["stats"]["rejected_requests"]
for stats in bulkhead_stats.values():
total_requests += stats["stats"]["total_requests"]
total_failures += stats["stats"]["failed_requests"]
total_timeouts += stats["stats"]["timeout_requests"]
total_rejections += stats["stats"]["rejected_requests"]
success_rate = (
(total_requests - total_failures) / total_requests
if total_requests > 0 else 1.0
)
return {
"circuit_breakers": circuit_breaker_stats,
"bulkheads": bulkhead_stats,
"aggregate_metrics": {
"total_requests": total_requests,
"total_failures": total_failures,
"total_timeouts": total_timeouts,
"total_rejections": total_rejections,
"success_rate": success_rate,
"failure_rate": total_failures / total_requests if total_requests > 0 else 0,
"rejection_rate": total_rejections / total_requests if total_requests > 0 else 0
}
}
except Exception as e:
logger.error(f"Failed to get resilience metrics: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve resilience metrics")
def _generate_health_recommendations(
circuit_breaker_health: Dict[str, Any],
bulkhead_utilization: Dict[str, Any]
) -> list[str]:
"""Generate health recommendations based on current status."""
recommendations = []
# Circuit breaker recommendations
if circuit_breaker_health["failed_services"]:
recommendations.append(
f"⚠️ {len(circuit_breaker_health['failed_services'])} services have failing circuit breakers. "
f"Check: {', '.join(circuit_breaker_health['failed_services'])}"
)
if circuit_breaker_health["degraded_services"]:
recommendations.append(
f"⚡ {len(circuit_breaker_health['degraded_services'])} services are in recovery mode. "
f"Monitor: {', '.join(circuit_breaker_health['degraded_services'])}"
)
# Bulkhead recommendations
if bulkhead_utilization["overall_utilization"] > 0.8:
recommendations.append(
f"📊 High resource utilization ({bulkhead_utilization['overall_utilization']:.1%}). "
"Consider scaling or optimizing workloads."
)
high_util_resources = [
name for name, resource in bulkhead_utilization["resources"].items()
if resource["utilization"] > 0.9
]
if high_util_resources:
recommendations.append(
f"🔥 High utilization resources: {', '.join(high_util_resources)}. "
"Consider increasing capacity or load balancing."
)
if bulkhead_utilization["total_queued"] > 0:
recommendations.append(
f"⏳ {bulkhead_utilization['total_queued']} operations queued. "
"Monitor queue lengths and processing times."
)
if not recommendations:
recommendations.append("✅ All resilience components are healthy.")
return recommendations