|
|
""" |
|
|
Audit routes for Cidadão.AI API |
|
|
Security audit logging and monitoring endpoints |
|
|
""" |
|
|
|
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, List |
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Response |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel |
|
|
import io |
|
|
|
|
|
from src.core.audit import ( |
|
|
audit_logger, |
|
|
AuditFilter, |
|
|
AuditEvent, |
|
|
AuditEventType, |
|
|
AuditSeverity, |
|
|
AuditStatistics |
|
|
) |
|
|
from src.api.auth import get_current_user, require_admin, User |
|
|
|
|
|
router = APIRouter(prefix="/audit", tags=["audit"]) |
|
|
|
|
|
|
|
|
class AuditEventResponse(BaseModel): |
|
|
"""Audit event response model.""" |
|
|
|
|
|
id: str |
|
|
timestamp: datetime |
|
|
event_type: str |
|
|
severity: str |
|
|
message: str |
|
|
user_id: Optional[str] = None |
|
|
user_email: Optional[str] = None |
|
|
user_role: Optional[str] = None |
|
|
resource_type: Optional[str] = None |
|
|
resource_id: Optional[str] = None |
|
|
resource_name: Optional[str] = None |
|
|
success: bool |
|
|
error_code: Optional[str] = None |
|
|
error_message: Optional[str] = None |
|
|
details: dict = {} |
|
|
context: Optional[dict] = None |
|
|
|
|
|
|
|
|
class AuditQueryRequest(BaseModel): |
|
|
"""Audit query request model.""" |
|
|
|
|
|
start_date: Optional[datetime] = None |
|
|
end_date: Optional[datetime] = None |
|
|
event_types: Optional[List[AuditEventType]] = None |
|
|
severity_levels: Optional[List[AuditSeverity]] = None |
|
|
user_id: Optional[str] = None |
|
|
user_email: Optional[str] = None |
|
|
resource_type: Optional[str] = None |
|
|
resource_id: Optional[str] = None |
|
|
success_only: Optional[bool] = None |
|
|
ip_address: Optional[str] = None |
|
|
limit: int = 100 |
|
|
offset: int = 0 |
|
|
|
|
|
|
|
|
@router.get("/events", response_model=List[AuditEventResponse]) |
|
|
async def get_audit_events( |
|
|
start_date: Optional[datetime] = Query(None, description="Start date filter"), |
|
|
end_date: Optional[datetime] = Query(None, description="End date filter"), |
|
|
event_type: Optional[AuditEventType] = Query(None, description="Event type filter"), |
|
|
severity: Optional[AuditSeverity] = Query(None, description="Severity filter"), |
|
|
user_email: Optional[str] = Query(None, description="User email filter"), |
|
|
resource_type: Optional[str] = Query(None, description="Resource type filter"), |
|
|
success_only: Optional[bool] = Query(None, description="Success only filter"), |
|
|
limit: int = Query(100, le=1000, description="Result limit"), |
|
|
offset: int = Query(0, ge=0, description="Result offset"), |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get audit events (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
filter_options = AuditFilter( |
|
|
start_date=start_date, |
|
|
end_date=end_date, |
|
|
event_types=[event_type] if event_type else None, |
|
|
severity_levels=[severity] if severity else None, |
|
|
user_email=user_email, |
|
|
resource_type=resource_type, |
|
|
success_only=success_only, |
|
|
limit=limit, |
|
|
offset=offset |
|
|
) |
|
|
|
|
|
|
|
|
events = await audit_logger.query_events(filter_options) |
|
|
|
|
|
|
|
|
response_events = [] |
|
|
for event in events: |
|
|
response_events.append(AuditEventResponse( |
|
|
id=event.id, |
|
|
timestamp=event.timestamp, |
|
|
event_type=event.event_type.value, |
|
|
severity=event.severity.value, |
|
|
message=event.message, |
|
|
user_id=event.user_id, |
|
|
user_email=event.user_email, |
|
|
user_role=event.user_role, |
|
|
resource_type=event.resource_type, |
|
|
resource_id=event.resource_id, |
|
|
resource_name=event.resource_name, |
|
|
success=event.success, |
|
|
error_code=event.error_code, |
|
|
error_message=event.error_message, |
|
|
details=event.details, |
|
|
context=event.context.model_dump() if event.context else None |
|
|
)) |
|
|
|
|
|
return response_events |
|
|
|
|
|
|
|
|
@router.post("/events/query", response_model=List[AuditEventResponse]) |
|
|
async def query_audit_events( |
|
|
query_request: AuditQueryRequest, |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Query audit events with advanced filters (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
filter_options = AuditFilter(**query_request.model_dump()) |
|
|
|
|
|
|
|
|
events = await audit_logger.query_events(filter_options) |
|
|
|
|
|
|
|
|
response_events = [] |
|
|
for event in events: |
|
|
response_events.append(AuditEventResponse( |
|
|
id=event.id, |
|
|
timestamp=event.timestamp, |
|
|
event_type=event.event_type.value, |
|
|
severity=event.severity.value, |
|
|
message=event.message, |
|
|
user_id=event.user_id, |
|
|
user_email=event.user_email, |
|
|
user_role=event.user_role, |
|
|
resource_type=event.resource_type, |
|
|
resource_id=event.resource_id, |
|
|
resource_name=event.resource_name, |
|
|
success=event.success, |
|
|
error_code=event.error_code, |
|
|
error_message=event.error_message, |
|
|
details=event.details, |
|
|
context=event.context.model_dump() if event.context else None |
|
|
)) |
|
|
|
|
|
return response_events |
|
|
|
|
|
|
|
|
@router.get("/statistics", response_model=AuditStatistics) |
|
|
async def get_audit_statistics( |
|
|
start_date: Optional[datetime] = Query(None, description="Start date filter"), |
|
|
end_date: Optional[datetime] = Query(None, description="End date filter"), |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get audit statistics (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
statistics = await audit_logger.get_statistics( |
|
|
start_date=start_date, |
|
|
end_date=end_date |
|
|
) |
|
|
|
|
|
return statistics |
|
|
|
|
|
|
|
|
@router.get("/export") |
|
|
async def export_audit_events( |
|
|
format: str = Query("json", regex="^(json|csv)$", description="Export format"), |
|
|
start_date: Optional[datetime] = Query(None, description="Start date filter"), |
|
|
end_date: Optional[datetime] = Query(None, description="End date filter"), |
|
|
event_type: Optional[AuditEventType] = Query(None, description="Event type filter"), |
|
|
severity: Optional[AuditSeverity] = Query(None, description="Severity filter"), |
|
|
user_email: Optional[str] = Query(None, description="User email filter"), |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Export audit events (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
filter_options = AuditFilter( |
|
|
start_date=start_date, |
|
|
end_date=end_date, |
|
|
event_types=[event_type] if event_type else None, |
|
|
severity_levels=[severity] if severity else None, |
|
|
user_email=user_email, |
|
|
limit=10000 |
|
|
) |
|
|
|
|
|
|
|
|
exported_data = await audit_logger.export_events(filter_options, format) |
|
|
|
|
|
|
|
|
if format == "json": |
|
|
media_type = "application/json" |
|
|
filename = f"audit_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
|
else: |
|
|
media_type = "text/csv" |
|
|
filename = f"audit_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" |
|
|
|
|
|
|
|
|
return StreamingResponse( |
|
|
io.StringIO(exported_data), |
|
|
media_type=media_type, |
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"} |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/events/{event_id}", response_model=AuditEventResponse) |
|
|
async def get_audit_event( |
|
|
event_id: str, |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get specific audit event (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
event = None |
|
|
for e in audit_logger.events: |
|
|
if e.id == event_id: |
|
|
event = e |
|
|
break |
|
|
|
|
|
if not event: |
|
|
raise HTTPException(status_code=404, detail="Audit event not found") |
|
|
|
|
|
return AuditEventResponse( |
|
|
id=event.id, |
|
|
timestamp=event.timestamp, |
|
|
event_type=event.event_type.value, |
|
|
severity=event.severity.value, |
|
|
message=event.message, |
|
|
user_id=event.user_id, |
|
|
user_email=event.user_email, |
|
|
user_role=event.user_role, |
|
|
resource_type=event.resource_type, |
|
|
resource_id=event.resource_id, |
|
|
resource_name=event.resource_name, |
|
|
success=event.success, |
|
|
error_code=event.error_code, |
|
|
error_message=event.error_message, |
|
|
details=event.details, |
|
|
context=event.context.model_dump() if event.context else None |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/integrity") |
|
|
async def verify_audit_integrity( |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Verify audit log integrity (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
integrity_report = await audit_logger.verify_integrity() |
|
|
|
|
|
return integrity_report |
|
|
|
|
|
|
|
|
@router.get("/event-types") |
|
|
async def get_audit_event_types( |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get available audit event types (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
event_types = [ |
|
|
{ |
|
|
"value": event_type.value, |
|
|
"name": event_type.name, |
|
|
"description": event_type.value.replace(".", " ").replace("_", " ").title() |
|
|
} |
|
|
for event_type in AuditEventType |
|
|
] |
|
|
|
|
|
return {"event_types": event_types} |
|
|
|
|
|
|
|
|
@router.get("/severity-levels") |
|
|
async def get_audit_severity_levels( |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get available audit severity levels (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
severity_levels = [ |
|
|
{ |
|
|
"value": severity.value, |
|
|
"name": severity.name, |
|
|
"description": severity.value.title() |
|
|
} |
|
|
for severity in AuditSeverity |
|
|
] |
|
|
|
|
|
return {"severity_levels": severity_levels} |
|
|
|
|
|
|
|
|
@router.get("/dashboard") |
|
|
async def get_audit_dashboard( |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Get audit dashboard data (admin only).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
|
|
|
now = datetime.utcnow() |
|
|
last_24h = now - timedelta(hours=24) |
|
|
last_7d = now - timedelta(days=7) |
|
|
last_30d = now - timedelta(days=30) |
|
|
|
|
|
stats_24h = await audit_logger.get_statistics(start_date=last_24h) |
|
|
stats_7d = await audit_logger.get_statistics(start_date=last_7d) |
|
|
stats_30d = await audit_logger.get_statistics(start_date=last_30d) |
|
|
|
|
|
|
|
|
high_severity_filter = AuditFilter( |
|
|
severity_levels=[AuditSeverity.HIGH, AuditSeverity.CRITICAL], |
|
|
start_date=last_24h, |
|
|
limit=10 |
|
|
) |
|
|
recent_alerts = await audit_logger.query_events(high_severity_filter) |
|
|
|
|
|
|
|
|
failed_events_filter = AuditFilter( |
|
|
success_only=False, |
|
|
start_date=last_24h, |
|
|
limit=10 |
|
|
) |
|
|
recent_failures = await audit_logger.query_events(failed_events_filter) |
|
|
|
|
|
return { |
|
|
"statistics": { |
|
|
"last_24h": stats_24h, |
|
|
"last_7d": stats_7d, |
|
|
"last_30d": stats_30d |
|
|
}, |
|
|
"recent_alerts": [ |
|
|
{ |
|
|
"id": event.id, |
|
|
"timestamp": event.timestamp, |
|
|
"event_type": event.event_type.value, |
|
|
"severity": event.severity.value, |
|
|
"message": event.message, |
|
|
"user_email": event.user_email |
|
|
} |
|
|
for event in recent_alerts |
|
|
], |
|
|
"recent_failures": [ |
|
|
{ |
|
|
"id": event.id, |
|
|
"timestamp": event.timestamp, |
|
|
"event_type": event.event_type.value, |
|
|
"message": event.message, |
|
|
"error_message": event.error_message, |
|
|
"user_email": event.user_email |
|
|
} |
|
|
for event in recent_failures |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
@router.post("/test-event") |
|
|
async def create_test_audit_event( |
|
|
current_user: User = Depends(get_current_user) |
|
|
): |
|
|
"""Create a test audit event (admin only, for testing purposes).""" |
|
|
|
|
|
require_admin(current_user) |
|
|
|
|
|
from src.core.audit import AuditContext |
|
|
|
|
|
|
|
|
test_context = AuditContext( |
|
|
ip_address="127.0.0.1", |
|
|
user_agent="Test Agent", |
|
|
host="localhost" |
|
|
) |
|
|
|
|
|
|
|
|
event = await audit_logger.log_event( |
|
|
event_type=AuditEventType.ADMIN_ACTION, |
|
|
message="Test audit event created by administrator", |
|
|
severity=AuditSeverity.LOW, |
|
|
user_id=current_user.id, |
|
|
user_email=current_user.email, |
|
|
user_role=current_user.role, |
|
|
resource_type="audit", |
|
|
resource_id="test", |
|
|
details={"test": True, "created_by": current_user.email}, |
|
|
context=test_context |
|
|
) |
|
|
|
|
|
return { |
|
|
"message": "Test audit event created successfully", |
|
|
"event_id": event.id |
|
|
} |