|
|
""" |
|
|
Chaos engineering endpoints for testing system resilience. |
|
|
|
|
|
This module provides controlled failure injection for testing |
|
|
the robustness of the Cidadão.AI system. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import random |
|
|
import time |
|
|
from typing import Dict, Any, Optional |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, BackgroundTasks |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.api.auth import get_current_user |
|
|
from src.infrastructure.observability import get_structured_logger |
|
|
|
|
|
logger = get_structured_logger(__name__, component="chaos_engineering") |
|
|
|
|
|
router = APIRouter(prefix="/api/v1/chaos", tags=["Chaos Engineering"]) |
|
|
|
|
|
|
|
|
|
|
|
class ChaosExperimentRequest(BaseModel): |
|
|
"""Request model for chaos experiments.""" |
|
|
experiment_type: str |
|
|
duration_seconds: int = 30 |
|
|
intensity: float = 0.5 |
|
|
target_component: Optional[str] = None |
|
|
description: str = "" |
|
|
|
|
|
|
|
|
class LatencyInjectionRequest(BaseModel): |
|
|
"""Request model for latency injection.""" |
|
|
min_delay_ms: int = 100 |
|
|
max_delay_ms: int = 2000 |
|
|
probability: float = 0.1 |
|
|
duration_seconds: int = 60 |
|
|
|
|
|
|
|
|
class ErrorInjectionRequest(BaseModel): |
|
|
"""Request model for error injection.""" |
|
|
error_rate: float = 0.05 |
|
|
error_type: str = "HTTP_500" |
|
|
duration_seconds: int = 30 |
|
|
|
|
|
|
|
|
|
|
|
chaos_config = { |
|
|
"latency_injection": { |
|
|
"enabled": False, |
|
|
"min_delay": 0, |
|
|
"max_delay": 0, |
|
|
"probability": 0.0, |
|
|
"expires_at": None |
|
|
}, |
|
|
"error_injection": { |
|
|
"enabled": False, |
|
|
"error_rate": 0.0, |
|
|
"error_type": "HTTP_500", |
|
|
"expires_at": None |
|
|
}, |
|
|
"memory_pressure": { |
|
|
"enabled": False, |
|
|
"intensity": 0.0, |
|
|
"expires_at": None |
|
|
}, |
|
|
"cpu_pressure": { |
|
|
"enabled": False, |
|
|
"intensity": 0.0, |
|
|
"expires_at": None |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
chaos_stats = { |
|
|
"experiments_run": 0, |
|
|
"latency_injections": 0, |
|
|
"errors_injected": 0, |
|
|
"total_chaos_time_seconds": 0 |
|
|
} |
|
|
|
|
|
|
|
|
@router.get("/status", response_model=Dict[str, Any]) |
|
|
async def get_chaos_status(current_user = Depends(get_current_user)): |
|
|
""" |
|
|
Get current chaos engineering status. |
|
|
|
|
|
Returns: |
|
|
Current chaos configuration and statistics |
|
|
""" |
|
|
|
|
|
now = datetime.utcnow() |
|
|
for experiment_type, config in chaos_config.items(): |
|
|
if config.get("expires_at") and now > config["expires_at"]: |
|
|
config["enabled"] = False |
|
|
config["expires_at"] = None |
|
|
|
|
|
return { |
|
|
"timestamp": now.isoformat(), |
|
|
"active_experiments": { |
|
|
name: config for name, config in chaos_config.items() |
|
|
if config.get("enabled", False) |
|
|
}, |
|
|
"configuration": chaos_config, |
|
|
"statistics": chaos_stats |
|
|
} |
|
|
|
|
|
|
|
|
@router.post("/inject/latency", response_model=Dict[str, Any]) |
|
|
async def inject_latency( |
|
|
request: LatencyInjectionRequest, |
|
|
current_user = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Inject artificial latency into API responses. |
|
|
|
|
|
Args: |
|
|
request: Latency injection configuration |
|
|
|
|
|
Returns: |
|
|
Confirmation of latency injection activation |
|
|
""" |
|
|
try: |
|
|
expires_at = datetime.utcnow() + timedelta(seconds=request.duration_seconds) |
|
|
|
|
|
chaos_config["latency_injection"].update({ |
|
|
"enabled": True, |
|
|
"min_delay": request.min_delay_ms, |
|
|
"max_delay": request.max_delay_ms, |
|
|
"probability": request.probability, |
|
|
"expires_at": expires_at |
|
|
}) |
|
|
|
|
|
chaos_stats["experiments_run"] += 1 |
|
|
chaos_stats["total_chaos_time_seconds"] += request.duration_seconds |
|
|
|
|
|
logger.warning( |
|
|
"Latency injection experiment started", |
|
|
operation="chaos_experiment", |
|
|
experiment_type="latency_injection", |
|
|
min_delay_ms=request.min_delay_ms, |
|
|
max_delay_ms=request.max_delay_ms, |
|
|
probability=request.probability, |
|
|
duration_seconds=request.duration_seconds |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "Latency injection activated", |
|
|
"experiment_type": "latency_injection", |
|
|
"configuration": { |
|
|
"min_delay_ms": request.min_delay_ms, |
|
|
"max_delay_ms": request.max_delay_ms, |
|
|
"probability": request.probability, |
|
|
"duration_seconds": request.duration_seconds |
|
|
}, |
|
|
"expires_at": expires_at.isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to inject latency: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to inject latency") |
|
|
|
|
|
|
|
|
@router.post("/inject/errors", response_model=Dict[str, Any]) |
|
|
async def inject_errors( |
|
|
request: ErrorInjectionRequest, |
|
|
current_user = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Inject artificial errors into API responses. |
|
|
|
|
|
Args: |
|
|
request: Error injection configuration |
|
|
|
|
|
Returns: |
|
|
Confirmation of error injection activation |
|
|
""" |
|
|
try: |
|
|
expires_at = datetime.utcnow() + timedelta(seconds=request.duration_seconds) |
|
|
|
|
|
chaos_config["error_injection"].update({ |
|
|
"enabled": True, |
|
|
"error_rate": request.error_rate, |
|
|
"error_type": request.error_type, |
|
|
"expires_at": expires_at |
|
|
}) |
|
|
|
|
|
chaos_stats["experiments_run"] += 1 |
|
|
chaos_stats["total_chaos_time_seconds"] += request.duration_seconds |
|
|
|
|
|
logger.warning( |
|
|
"Error injection experiment started", |
|
|
operation="chaos_experiment", |
|
|
experiment_type="error_injection", |
|
|
error_rate=request.error_rate, |
|
|
error_type=request.error_type, |
|
|
duration_seconds=request.duration_seconds |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "Error injection activated", |
|
|
"experiment_type": "error_injection", |
|
|
"configuration": { |
|
|
"error_rate": request.error_rate, |
|
|
"error_type": request.error_type, |
|
|
"duration_seconds": request.duration_seconds |
|
|
}, |
|
|
"expires_at": expires_at.isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to inject errors: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to inject errors") |
|
|
|
|
|
|
|
|
@router.post("/experiments/memory-pressure", response_model=Dict[str, Any]) |
|
|
async def create_memory_pressure( |
|
|
intensity: float = Query(default=0.3, ge=0.1, le=0.8, description="Memory pressure intensity"), |
|
|
duration_seconds: int = Query(default=60, ge=10, le=300, description="Duration in seconds"), |
|
|
current_user = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Create memory pressure to test system behavior under resource constraints. |
|
|
|
|
|
Args: |
|
|
intensity: Memory pressure intensity (0.1 to 0.8) |
|
|
duration_seconds: Duration of the experiment |
|
|
|
|
|
Returns: |
|
|
Memory pressure experiment status |
|
|
""" |
|
|
try: |
|
|
expires_at = datetime.utcnow() + timedelta(seconds=duration_seconds) |
|
|
|
|
|
chaos_config["memory_pressure"].update({ |
|
|
"enabled": True, |
|
|
"intensity": intensity, |
|
|
"expires_at": expires_at |
|
|
}) |
|
|
|
|
|
|
|
|
asyncio.create_task(_create_memory_pressure(intensity, duration_seconds)) |
|
|
|
|
|
chaos_stats["experiments_run"] += 1 |
|
|
chaos_stats["total_chaos_time_seconds"] += duration_seconds |
|
|
|
|
|
logger.warning( |
|
|
"Memory pressure experiment started", |
|
|
operation="chaos_experiment", |
|
|
experiment_type="memory_pressure", |
|
|
intensity=intensity, |
|
|
duration_seconds=duration_seconds |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "Memory pressure experiment activated", |
|
|
"experiment_type": "memory_pressure", |
|
|
"intensity": intensity, |
|
|
"duration_seconds": duration_seconds, |
|
|
"expires_at": expires_at.isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create memory pressure: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to create memory pressure") |
|
|
|
|
|
|
|
|
@router.post("/experiments/cpu-pressure", response_model=Dict[str, Any]) |
|
|
async def create_cpu_pressure( |
|
|
intensity: float = Query(default=0.3, ge=0.1, le=0.8, description="CPU pressure intensity"), |
|
|
duration_seconds: int = Query(default=60, ge=10, le=300, description="Duration in seconds"), |
|
|
current_user = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Create CPU pressure to test system behavior under high CPU load. |
|
|
|
|
|
Args: |
|
|
intensity: CPU pressure intensity (0.1 to 0.8) |
|
|
duration_seconds: Duration of the experiment |
|
|
|
|
|
Returns: |
|
|
CPU pressure experiment status |
|
|
""" |
|
|
try: |
|
|
expires_at = datetime.utcnow() + timedelta(seconds=duration_seconds) |
|
|
|
|
|
chaos_config["cpu_pressure"].update({ |
|
|
"enabled": True, |
|
|
"intensity": intensity, |
|
|
"expires_at": expires_at |
|
|
}) |
|
|
|
|
|
|
|
|
asyncio.create_task(_create_cpu_pressure(intensity, duration_seconds)) |
|
|
|
|
|
chaos_stats["experiments_run"] += 1 |
|
|
chaos_stats["total_chaos_time_seconds"] += duration_seconds |
|
|
|
|
|
logger.warning( |
|
|
"CPU pressure experiment started", |
|
|
operation="chaos_experiment", |
|
|
experiment_type="cpu_pressure", |
|
|
intensity=intensity, |
|
|
duration_seconds=duration_seconds |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "CPU pressure experiment activated", |
|
|
"experiment_type": "cpu_pressure", |
|
|
"intensity": intensity, |
|
|
"duration_seconds": duration_seconds, |
|
|
"expires_at": expires_at.isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create CPU pressure: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to create CPU pressure") |
|
|
|
|
|
|
|
|
@router.post("/stop/{experiment_type}", response_model=Dict[str, Any]) |
|
|
async def stop_experiment( |
|
|
experiment_type: str, |
|
|
current_user = Depends(get_current_user) |
|
|
): |
|
|
""" |
|
|
Stop a running chaos experiment. |
|
|
|
|
|
Args: |
|
|
experiment_type: Type of experiment to stop |
|
|
|
|
|
Returns: |
|
|
Confirmation of experiment termination |
|
|
""" |
|
|
try: |
|
|
if experiment_type not in chaos_config: |
|
|
raise HTTPException(status_code=404, detail=f"Experiment type '{experiment_type}' not found") |
|
|
|
|
|
if not chaos_config[experiment_type].get("enabled", False): |
|
|
raise HTTPException(status_code=400, detail=f"Experiment '{experiment_type}' is not running") |
|
|
|
|
|
chaos_config[experiment_type]["enabled"] = False |
|
|
chaos_config[experiment_type]["expires_at"] = None |
|
|
|
|
|
logger.info( |
|
|
f"Chaos experiment stopped: {experiment_type}", |
|
|
operation="chaos_experiment_stopped", |
|
|
experiment_type=experiment_type |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": f"Experiment '{experiment_type}' stopped", |
|
|
"experiment_type": experiment_type, |
|
|
"stopped_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to stop experiment {experiment_type}: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to stop experiment") |
|
|
|
|
|
|
|
|
@router.post("/stop-all", response_model=Dict[str, Any]) |
|
|
async def stop_all_experiments(current_user = Depends(get_current_user)): |
|
|
""" |
|
|
Stop all running chaos experiments. |
|
|
|
|
|
Returns: |
|
|
Summary of stopped experiments |
|
|
""" |
|
|
try: |
|
|
stopped_experiments = [] |
|
|
|
|
|
for experiment_type, config in chaos_config.items(): |
|
|
if config.get("enabled", False): |
|
|
config["enabled"] = False |
|
|
config["expires_at"] = None |
|
|
stopped_experiments.append(experiment_type) |
|
|
|
|
|
logger.info( |
|
|
"All chaos experiments stopped", |
|
|
operation="chaos_all_experiments_stopped", |
|
|
stopped_count=len(stopped_experiments) |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": f"Stopped {len(stopped_experiments)} experiments", |
|
|
"stopped_experiments": stopped_experiments, |
|
|
"stopped_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to stop all experiments: {e}") |
|
|
raise HTTPException(status_code=500, detail="Failed to stop all experiments") |
|
|
|
|
|
|
|
|
|
|
|
async def apply_chaos_latency(): |
|
|
"""Apply latency chaos if enabled.""" |
|
|
config = chaos_config["latency_injection"] |
|
|
|
|
|
if not config.get("enabled", False): |
|
|
return |
|
|
|
|
|
if config.get("expires_at") and datetime.utcnow() > config["expires_at"]: |
|
|
config["enabled"] = False |
|
|
return |
|
|
|
|
|
if random.random() < config.get("probability", 0.0): |
|
|
delay_ms = random.randint(config.get("min_delay", 0), config.get("max_delay", 0)) |
|
|
await asyncio.sleep(delay_ms / 1000.0) |
|
|
chaos_stats["latency_injections"] += 1 |
|
|
|
|
|
|
|
|
def apply_chaos_errors(): |
|
|
"""Apply error chaos if enabled.""" |
|
|
config = chaos_config["error_injection"] |
|
|
|
|
|
if not config.get("enabled", False): |
|
|
return None |
|
|
|
|
|
if config.get("expires_at") and datetime.utcnow() > config["expires_at"]: |
|
|
config["enabled"] = False |
|
|
return None |
|
|
|
|
|
if random.random() < config.get("error_rate", 0.0): |
|
|
chaos_stats["errors_injected"] += 1 |
|
|
error_type = config.get("error_type", "HTTP_500") |
|
|
|
|
|
if error_type == "HTTP_500": |
|
|
raise HTTPException(status_code=500, detail="Chaos: Internal Server Error") |
|
|
elif error_type == "HTTP_503": |
|
|
raise HTTPException(status_code=503, detail="Chaos: Service Unavailable") |
|
|
elif error_type == "TIMEOUT": |
|
|
raise HTTPException(status_code=408, detail="Chaos: Request Timeout") |
|
|
else: |
|
|
raise HTTPException(status_code=500, detail=f"Chaos: {error_type}") |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
async def _create_memory_pressure(intensity: float, duration_seconds: int): |
|
|
"""Create memory pressure for the specified duration.""" |
|
|
try: |
|
|
|
|
|
|
|
|
memory_chunks = [] |
|
|
chunk_size = int(1024 * 1024 * intensity * 10) |
|
|
|
|
|
start_time = time.time() |
|
|
while time.time() - start_time < duration_seconds: |
|
|
|
|
|
chunk = bytearray(chunk_size) |
|
|
memory_chunks.append(chunk) |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
if len(memory_chunks) > 20: |
|
|
memory_chunks.pop(0) |
|
|
|
|
|
|
|
|
memory_chunks.clear() |
|
|
chaos_config["memory_pressure"]["enabled"] = False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Memory pressure experiment failed: {e}") |
|
|
chaos_config["memory_pressure"]["enabled"] = False |
|
|
|
|
|
|
|
|
async def _create_cpu_pressure(intensity: float, duration_seconds: int): |
|
|
"""Create CPU pressure for the specified duration.""" |
|
|
try: |
|
|
start_time = time.time() |
|
|
|
|
|
while time.time() - start_time < duration_seconds: |
|
|
|
|
|
work_duration = intensity * 0.1 |
|
|
sleep_duration = (1 - intensity) * 0.1 |
|
|
|
|
|
|
|
|
work_start = time.time() |
|
|
while time.time() - work_start < work_duration: |
|
|
|
|
|
sum(i * i for i in range(1000)) |
|
|
|
|
|
await asyncio.sleep(sleep_duration) |
|
|
|
|
|
chaos_config["cpu_pressure"]["enabled"] = False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"CPU pressure experiment failed: {e}") |
|
|
chaos_config["cpu_pressure"]["enabled"] = False |